栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

MapReducer中WordCount案例实操

MapReducer中WordCount案例实操

 一、Mapper源码


package org.apache.hadoop.mapreduce;

import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.task.MapContextImpl;


@InterfaceAudience.Public
@InterfaceStability.Stable
public class Mapper {

  
  public abstract class Context
    implements MapContext {
  }
  
  
  protected void setup(Context context
                       ) throws IOException, InterruptedException {
    // NOTHING
  }

  
  @SuppressWarnings("unchecked")
  protected void map(KEYIN key, VALUEIN value, 
                     Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);
  }

  
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
  }
  
  
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }
}

1.context抽象方法是连接上下文作用,用于连接Mapper和Reducer,是交互桥梁

2.setup()方法和cleanup()方法都是Called Once,只调用一次

3.run()方法一般都会被重写,在run()方法中,会调用map方法,map()方法会被每一对k,v所调用,意味着map()会被调用很多次。

二、WordCountMapper类

Mapper源码中形参是,对于wordcount,

例如

lz lz

lili lili

输入的应该是0 lz; 4 lz;5 li

输出的应该是lz,1;lz,1;li,1;li,1

所以

KEYIN:每个单词前面的偏移量 LongWritable

VALUEIN:单词 Text

KEYOUT: 单词 Text

VALUEOUT: 数量 IntWritable 

package com.atguigu.mapreduce.wordcount2;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper {

    Text k = new Text();
    IntWritable v = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //获取一行
        String line = value.toString();

        //切割单词
        String[] words = line.split(" ");

        //输出
        for(String word:words){

            k.set(word);
            context.write(k,v);
        }

    }
}

重写map()方法

三、Reducer源码


package org.apache.hadoop.mapreduce;

import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.task.annotation.Checkpointable;

import java.util.Iterator;


@Checkpointable
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Reducer {

  
  public abstract class Context 
    implements ReduceContext {
  }

  
  protected void setup(Context context
                       ) throws IOException, InterruptedException {
    // NOTHING
  }

  
  @SuppressWarnings("unchecked")
  protected void reduce(KEYIN key, Iterable values, Context context
                        ) throws IOException, InterruptedException {
    for(VALUEIN value: values) {
      context.write((KEYOUT) key, (VALUEOUT) value);
    }
  }

  
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
  }

  
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKey()) {
        reduce(context.getCurrentKey(), context.getValues(), context);
        // If a back up store is used, reset it
        Iterator iter = context.getValues().iterator();
        if(iter instanceof ReduceContext.ValueIterator) {
          ((ReduceContext.ValueIterator)iter).resetBackupStore();        
        }
      }
    } finally {
      cleanup(context);
    }
  }
}

setup()、cleanup()方法类似于Mapper中的方法

reducer()方法传送的形参是 values,Context context>

Iterable类似于一个集合,里面存放的是每个value,集合在一起成为values

四、WordCountReducer

package com.atguigu.mapreduce.wordcount2;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer {
    int sum;
    IntWritable v = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

        sum = 0;
        for (IntWritable count : values) {
            sum += count.get();
        }

        v.set(sum);
        context.write(key,v);
    }
}

Mapper中的输出既是Reducer中的输入,即为Text,IntWritable

五、Driver驱动类

package com.atguigu.mapreduce.wordcount2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //1.获取配置信息及获取job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //2.关联本Driver程序的jar
        job.setJarByClass(WordCountDriver.class);

        //3.关联Mapper和Reducer的jar
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        //4.设置Mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //5.设置最终输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //6.设置输入和输出路径
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        //7.提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result?0:1);


    }
}

分为八部:

一、获取配置信息以及job对象

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

二、关联Driver程序的jar

job.setJarByClass(WordCountDriver.class);此方法通过这个类来反射当前这个jar包在什么位置

三、关联Mapper和Reducer程序的jar

job.setMapperClass

job.setReducerClass

四、设置Mapper输出的kv类型

job.setMapOutputKeyClass()

job.setMapOutputValueClass()

五、设置最终输出的kv类型

job.setOutputKeyClass()

job.setOutputValueClass()

六、设置输入和输出路径

FileInputFormat.setInputPaths(job,new Path(args[0]))

FileOutputFormat.setOutputPaths(job,new Path(args[0]))

用args的形式来实现动态输出

输出路径不能存在

七、提交job

boolean result = job.waitForCompletion(true);

System.exit(resule?0:1);

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/743182.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号