热门IT资讯网

十五、MapReduce--自定义output输出

发表于:2024-11-25 作者:热门IT资讯网编辑
编辑最后更新 2024年11月25日,我们要自定义输出时,首先继承两个抽象类,一个是 OutputFormat,一个是 RecordWriter。前者是主要是创建RecordWriter,后者就是主要实现 write方法来将kv写入文件。

我们要自定义输出时,首先继承两个抽象类,一个是 OutputFormat,一个是 RecordWriter
。前者是主要是创建RecordWriter,后者就是主要实现 write方法来将kv写入文件。

1、需求
将reduce输出的KV中,如果key中包含特定字符串,则将其输出到一个文件中,剩下的KV则输出到另外的文件中。

2、源码
源数据

http://cn.bing.comhttp://www.baidu.comhttp://www.google.comhttp://www.itstar.comhttp://www.itstar1.comhttp://www.itstar2.comhttp://www.itstar3.comhttp://www.baidu.comhttp://www.sin2a.comhttp://www.sin2a.comw.google.comhttp://www.sin2desa.comhttp://www.sin2desa.comw.google.comhttp://www.sina.comhttp://www.sindsafa.comhttp://www.sohu.com

outputFormat

public class MyOutputFormat extends FileOutputFormat {    @Override    public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {        return new MyRecordWriter(taskAttemptContext);    }}

RecordWriter

public class MyRecordWriter extends RecordWriter {    private FSDataOutputStream startOut;    private FSDataOutputStream otherOut;    public MyRecordWriter(TaskAttemptContext job) {        try {            FileSystem fs = FileSystem.get(job.getConfiguration());            startOut = fs.create(new Path("G:\\test\\date\\A\\itstarlog\\logdir\\startout.log"));            otherOut = fs.create(new Path("G:\\test\\date\\A\\itstarlog\\logdir\\otherout.log"));        } catch (IOException e) {            e.printStackTrace();        }    }    @Override    public void write(Text key, NullWritable value) throws IOException, InterruptedException {        String line = key.toString();        //如果key中包含itstar就写入到另外一个文件中        if (line.contains("itstar")) {            this.startOut.writeUTF(line);        } else {            this.otherOut.writeUTF(line);        }    }    @Override    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {        this.startOut.close();        this.otherOut.close();    }}

mapper

public class MyOutputMapper extends Mapper {    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        context.write(value, NullWritable.get());    }}

reducer

public class MyOutputReducer extends Reducer {    Text k = new Text();    @Override    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {        String line = key.toString();        line = line + "\r\n";        k.set(line);        context.write(k, NullWritable.get());    }}

driver

ublic class MyDriver {    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        args = new String[]{"G:\\test\\date\\A\\itstarlog\\A\\other.log", "G:\\test\\date\\A\\itstarlog\\logresult\\"};        Configuration conf = new Configuration();        Job job = Job.getInstance(conf);        job.setJarByClass(MyDriver.class);        job.setMapperClass(MyOutputMapper.class);        job.setReducerClass(MyOutputReducer.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(NullWritable.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(NullWritable.class);        //自定义输出的实现子类,也是继承FileOutputFormat        job.setOutputFormatClass(MyOutputFormat.class);        FileInputFormat.setInputPaths(job, new Path(args[0]));        //这个路径输出的是job的执行成功successs文件的输出路径        FileOutputFormat.setOutputPath(job, new Path(args[1]));        job.waitForCompletion(true);    }}
0