栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

MapReduce经典程序【WordCount, TopN, MapJoin, ReduceJoin】(个人总结)

MapReduce经典程序【WordCount, TopN, MapJoin, ReduceJoin】(个人总结)

    声明: 1. 本文为我的个人复习总结, 并非那种从零基础开始普及知识 内容详细全面, 言辞官方的文章
              2. 由于是个人总结, 所以用最精简的话语来写文章
              3. 若有错误不当之处, 请指出

WordCount:
// Driver
public class WordCountDriver {
    
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //1.获取job
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        //2.设置jar
        job.setJarByClass(WordCountDriver.class);

        //3.关联Mapper和Reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        //4.设置Mapper输出的key和value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        //5.设置Reducer输出的key和value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //6.设置数据的输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path("D:/data"));
        FileOutputFormat.setOutputPath(job, new Path("D:/data/out"));

        job.setNumReduceTasks(3);

        //7.提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}


// Mapper
public class WordCountMapper extends Mapper {

    // 定义到map外面,复用此对象
    private Text word = new Text();
    private LongWritable count = new LongWritable(1);

    
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        System.out.println(key);
        String line = value.toString();
        String[] splits = line.split("\s+");
        for (String split : splits) {
            word.set(split);
            // 用IO流拷贝出去了, 输出的是key-value (word,1)
            context.write(word, count);
        }
    }
}



// Reducer
public class WordCountReducer extends Reducer {
    private LongWritable count = new LongWritable(1);

    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        count.set(IteratorUtils.toList(values.iterator()).size());
        context.write(key, count);
    }
}
TopN:

找出各省份常用词汇中前TopN的Word

第一步: 进行WordCount

第二步: 按省份进行分组 求各省份使用的高频词的前Top10, 如下:

分组的TopN:

解决办法: 在Reducer中对values集合排序后取前N名, 维护一个容量为N的大顶堆, 然后输出堆的全部内容

​ 注意: Hadoop为了优化内存, values便利得到的所有value其实全是同一个value,

​ 为了堆中存的value不受后面遍历的value的写操作影响, 应该往堆中放value的复制品

全局TopN:

法1: 就只设置1个Reduce分区

​ 缺点: 只有一个Reduce分区, 并行度太低

法2: 先用第一个MapReduceTask过滤出各个分组的前TopN, 再启动第二个MapReduceTask设置1个Reduce分区 对第一个Task过滤出的数据求 前TopN


// Bean
public class WordCountBean implements Cloneable, Writable {
    private String word;
    private int count;

    public String getWord() {
        return word;
    }

    public int getCount() {
        return count;
    }

    public void setWord(String word) {
        this.word = word;
    }

    public void setCount(int count) {
        this.count = count;
    }

    @Override
    public String toString() {
        return "(" + word + "," + count + ")";
    }

    @Override
    public WordCountBean clone() throws CloneNotSupportedException {
        return (WordCountBean) super.clone();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(word);
        out.writeInt(count);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        word = in.readUTF();
        count = in.readInt();
    }
}


// Mapper
public class Top10_Mapper extends Mapper {
    // 定义到map外面,复用此对象
    private Text province = new Text();
    private WordCountBean wordCountBean = new WordCountBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split(" ");
        province.set(split[0]);
        wordCountBean.setWord(split[1]);
        wordCountBean.setCount(split[2]);
        context.write(province, wordCountBean);
    }
}

//Reducer
public class Top10_Reducer extends Reducer {
    // 定义到map外面,复用此对象
    private Text province = new Text();
    private WordCountBean wordCountBean = new WordCountBean();
    // 小顶堆
    PriorityQueue heap = new PriorityQueue<>(new Comparator() {
        @Override
        public int compare(WordCountBean o1, WordCountBean o2) {
            return o1.getCount() - o2.getCount();
        }
    });

    @SneakyThrows
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        heap.clear();
        // 这样会造成bean.clone values.size次,太浪费空间
//        for (WordCountBean bean : values) {
//            heap.add(bean.clone());
//            if (heap.size() > 10) {
//                heap.poll();
//            }
//        }
        // 优化:
        // 复用poll出来的对象
        WordCountBean waste = null;
        for (WordCountBean bean : values) {
            if (temp == null) {
                heap.add(bean.clone());
            } else {
                waste.setWord(bean.getWord());
                waste.setCount(bean.getCount());
                heap.add(waste);
            }
            if (heap.size() > 10) {
                temp = heap.poll();
            }
        }
        // 逆序输出堆
        int size = heap.size();
        WordCountBean[] wordCountBeanArr = new WordCountBean[size];
        for (int i = 0; i < size; i++) {
            wordCountBeanArr[i] = heap.poll();
        }
        for (int i = size - 1; i >= 0; i--) {
            context.write(key, wordCountBeanArr[i]);
        }
    }
}

Join:

将order表 与 pd表进行join

order表字段:

    idpidamount

pd表字段:

    pidname
MapJoin:
public class MapJoinMapper extends Mapper {
    private final Map map = new ConcurrentHashMap<>();

    @Override
    protected void setup(Context context) throws IOException {
        URI pdFile = context.getCacheFiles()[0];
        Path path = new Path(pdFile);
        FSDataInputStream fis = null;
        BufferedReader reader = null;
        try {
            fis = FileSystem.get(context.getConfiguration()).open(path);
            reader = new BufferedReader(new InputStreamReader(fis, StandardCharsets.UTF_8));
            String str;
            while (StringUtils.isNotEmpty(str = reader.readLine())) {
                String[] split = str.split("\s+");
                map.put(split[0], split[1]);
            }
        } finally {
            IOUtils.closeStream(reader);
            IOUtils.closeStream(fis);
        }

    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split("\s+");
        context.write(new Text(split[0] + "t" + map.get(split[1]) + "t" + split[2]), NullWritable.get());
    }
}
ReduceJoin:
// Bean
public class OrderPdBean implements WritableComparable {
    private Integer id = -1;
    private String pid = "";
    private Integer amount = -1;
    private String pname = "";
    private String flag = "";

    @Override
    public int compareTo(OrderPdBean o) {
        return this.id - o.id != 0 ? this.id - o.id : this.pname.compareTo(o.pname);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(id);
        out.writeUTF(pid);
        out.writeInt(amount);
        out.writeUTF(pname);
        out.writeUTF(flag);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        id = in.readInt();
        pid = in.readUTF();
        amount = in.readInt();
        pname = in.readUTF();
        flag = in.readUTF();
    }
}


// Mapper
public class ReduceJoinMapper extends Mapper {
    private String fileName;
    private OrderPdBean outV = new OrderPdBean();

    @Override
    protected void setup(Context context) {
        fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split("\s+");
        if ("order.txt".equals(fileName)) {
            outV.setFlag("order");
            outV.setId(Integer.valueOf(split[0]));
            outV.setPid(split[1]);
            outV.setAmount(Integer.valueOf(split[2]));
        } else if ("pd.txt".equals(fileName)) {
            outV.setFlag("pd");
            outV.setPid(split[0]);
            outV.setPname(split[1]);
        }
        context.write(new Text(outV.getPid()), outV);
    }
}

// Reducer
public class ReduceJoinReducer extends Reducer {
    private BeanCopier beanCopier = BeanCopier.create(OrderPdBean.class, OrderPdBean.class, false);
    private List orderList = new linkedList<>();
    private OrderPdBean pdBean = new OrderPdBean();

    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        orderList.clear();
        for (OrderPdBean orderPdBean : values) {
            if ("order".equals(orderPdBean.getFlag())) {
                OrderPdBean orderPdCopy = new OrderPdBean();
                beanCopier.copy(orderPdBean, orderPdCopy, null);
                orderList.add(orderPdCopy);
            } else {
                beanCopier.copy(orderPdBean, pdBean, null);
            }
        }

        for (OrderPdBean orderPdBean : orderList) {
            if ("order".equals(orderPdBean.getFlag())) {
                orderPdBean.setPname(pdBean.getPname());
            }
            context.write(orderPdBean, NullWritable.get());
        }

    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758066.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号