栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

MapReduce分区

MapReduce分区

分区步骤

step1 定义Mapper
step2 自定义Partitioner
step3 定义Reduce逻辑
step4 主类中设置分区类和ReduceTask个数

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;


public class Partition {
    public static class PartitionMapper extends Mapper {
        //    map方法将k1,v1转为 k2,v2
        @Override
        protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {

            context.write(value,NullWritable.get());
        }
    }
    public static class MyPartitioner extends Partitioner {
        
        @Override
        public int getPartition(Text text, NullWritable nullWritable, int i) {
//        1,拆分行文本数据(k2),获取中奖字段的值
            String[] split = text.toString().split(",");
            String num = split[3];
//        2,判断中奖字段的值和15的关系,然后返回对应的分区编号
            if(Integer.parseInt(num)>15){
                return 1;
            }else{
                return 0;
            }
        }
    }
    public static class PartitionReducer extends Reducer {
        @Override
        protected void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
            context.write(key,NullWritable.get());
        }

        public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
            //        1,创建job任务对象
            Job job = Job.getInstance();
            job.setJarByClass(Partition.class);
//        2,对job任务进行配置
            //设置输入和输出类型
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputKeyClass(TextOutputFormat.class);
            //设置map类和数据类型
            job.setMapperClass(PartitionMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(NullWritable.class);
            //指定分区类和 ReduceTask个数
            job.setPartitionerClass(MyPartitioner.class);
            job.setNumReduceTasks(2);
            //设置reduce类和数据类型
            job.setReducerClass(PartitionReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            //指定输入路径和输出路径
            String inpath = "./datas/cmdlist";
            String outpath = "./output001";
            TextInputFormat.addInputPath(job,new Path(inpath));
            TextOutputFormat.setOutputPath(job,new Path(outpath));
            //判断文件是否存在
            FileSystem fs = FileSystem.get(new Configuration());
            if(fs.exists(new Path(outpath))){
                fs.delete(new Path(outpath),true);
            }

            job.waitForCompletion(true);
        }
    }

}

cmdlist文件:

17365849247,1001,man,13
17365569247,1001,man,15
17365849247,1001,man,12
17365849247,1001,man,11
17366549247,1001,man,17
17365849247,1001,man,17
17365878247,1001,man,14
17365849247,1001,man,13
17365379247,1001,man,10
17365849247,1001,man,11
17365945247,1001,man,12
17365924547,1001,man,15
17365459247,1001,man,18
17361559247,1001,man,17
17365849247,1001,man,19
17365849247,1001,man,18
17365849247,1001,man,16
17365849247,1001,man,13
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/600263.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号