栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

hadoop 自定义OutputFormat

hadoop 自定义OutputFormat

前言

在某些业务场景下,需要对原始的数据进行合理的分类输出,减少后续的程序处理数据带来的麻烦,其实这也属于ETL中的一种,比如,我们收集到了一份原始的日志,主体字段为区域编码,需要根据这个编码将这份日志分割输出到不同的文件中

在前面的一篇自定义分区中,可以将原始的文件在自定义的Partioner类中结合实际业务需求,将数据最终输出到不同的分区文件下,这属于一种解决方案,但使用这种方式也有一定的弊端,因为分区数量增大了,必然带来MapTask 数量的增加,带来的是服务器资源的更多开销

与分区输出不同的是,自定义OutputFormat更加灵活,相当于是MapReduce之外的扩展,可以在自定义OutputFormat中编写更复杂的业务逻辑定制化业务场景,同时,更重要的是,真正做到按照业务字段,将原始的文件最终输出到不同的文件中去。

OutputFormat使用场景

输出数据到中间件,例如mysql,es,hbase等

自定义OutputFormat步骤

自定义一个OutputFormat类重写里面的RecordWriter方法,具体改造输出数据的write方法 业务场景

有下面的一个日志文件

需求是,将这份文件中,包含 "www.wangyi.com"的内容输出到一个文件,其他的输出到另一个文件

编码实现

1、自定义Mapper类

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class LogMapper extends Mapper {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.write(value,NullWritable.get());
    }
}

2、自定义Reduce类

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class LogReducer extends Reducer {

    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        for (NullWritable nullWritable : values){
            context.write(key,NullWritable.get());
        }
    }

}

3、自定义OutputFormat类

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class LogOutputFormat extends FileOutputFormat {

    @Override
    public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        LogRecordWriter recordWriter = new LogRecordWriter(taskAttemptContext);
        return recordWriter;
    }

}

LogRecordWriter

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class LogRecordWriter extends RecordWriter {

    private FSDataOutputStream data1 ;
    private FSDataOutputStream data2 ;

    public LogRecordWriter(TaskAttemptContext context){
        try {
            FileSystem fileSystem = FileSystem.get(context.getConfiguration());
            data1 = fileSystem.create(new Path("F:\网盘\csv\logs\wangyi.log"));
            data2 = fileSystem.create(new Path("F:\网盘\csv\logs\other.log"));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    
    @Override
    public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {
        String value = text.toString();
        if(value.trim().contains("www.wangyi.com")){
            data1.writeBytes(value.trim() + "n");
        }else {
            data2.writeBytes(value.trim() + "n");
        }
    }

    @Override
    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        IOUtils.closeStream(data1);
        IOUtils.closeStream(data2);
    }
}

4、job类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class LogJob {

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(LogJob.class);
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        //设置自定义的outputformat
        job.setOutputFormatClass(LogOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path("F:\网盘\csv\website.txt"));
        //虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat

        //而fileoutputformat要输出一个_SUCCESS文件,所以在这还得指定一个输出目录
        FileOutputFormat.setOutputPath(job, new Path("F:\网盘\csv\logs"));
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }

}

运行上面的程序,观察输出目录文件,这样就可以满足上面的需求

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/701560.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号