十五、MapReduce--自定义output输出
发表于:2024-11-25 作者:热门IT资讯网编辑
编辑最后更新 2024年11月25日,我们要自定义输出时,首先继承两个抽象类,一个是 OutputFormat,一个是 RecordWriter。前者是主要是创建RecordWriter,后者就是主要实现 write方法来将kv写入文件。
我们要自定义输出时,首先继承两个抽象类,一个是 OutputFormat,一个是 RecordWriter
。前者是主要是创建RecordWriter,后者就是主要实现 write方法来将kv写入文件。
1、需求
将reduce输出的KV中,如果key中包含特定字符串,则将其输出到一个文件中,剩下的KV则输出到另外的文件中。
2、源码
源数据
http://cn.bing.comhttp://www.baidu.comhttp://www.google.comhttp://www.itstar.comhttp://www.itstar1.comhttp://www.itstar2.comhttp://www.itstar3.comhttp://www.baidu.comhttp://www.sin2a.comhttp://www.sin2a.comw.google.comhttp://www.sin2desa.comhttp://www.sin2desa.comw.google.comhttp://www.sina.comhttp://www.sindsafa.comhttp://www.sohu.com
outputFormat
public class MyOutputFormat extends FileOutputFormat { @Override public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { return new MyRecordWriter(taskAttemptContext); }}
RecordWriter
public class MyRecordWriter extends RecordWriter { private FSDataOutputStream startOut; private FSDataOutputStream otherOut; public MyRecordWriter(TaskAttemptContext job) { try { FileSystem fs = FileSystem.get(job.getConfiguration()); startOut = fs.create(new Path("G:\\test\\date\\A\\itstarlog\\logdir\\startout.log")); otherOut = fs.create(new Path("G:\\test\\date\\A\\itstarlog\\logdir\\otherout.log")); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { String line = key.toString(); //如果key中包含itstar就写入到另外一个文件中 if (line.contains("itstar")) { this.startOut.writeUTF(line); } else { this.otherOut.writeUTF(line); } } @Override public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { this.startOut.close(); this.otherOut.close(); }}
mapper
public class MyOutputMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value, NullWritable.get()); }}
reducer
public class MyOutputReducer extends Reducer { Text k = new Text(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { String line = key.toString(); line = line + "\r\n"; k.set(line); context.write(k, NullWritable.get()); }}
driver
ublic class MyDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"G:\\test\\date\\A\\itstarlog\\A\\other.log", "G:\\test\\date\\A\\itstarlog\\logresult\\"}; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MyDriver.class); job.setMapperClass(MyOutputMapper.class); job.setReducerClass(MyOutputReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //自定义输出的实现子类,也是继承FileOutputFormat job.setOutputFormatClass(MyOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); //这个路径输出的是job的执行成功successs文件的输出路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); }}