import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
// 单词统计
public class MR01 {
public static class WordMapper extends Mapper{
@Override //快捷方式,输入map自动填写
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line=value.toString();//想要对每行数据处理 需要转化为String
int v=1;
context.write(new Text(line),new LongWritable(v));
}
}
public static class WordReducer extends Reducer{
@Override //快捷方式,输入reduce自动填写
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
int count=0;
for (LongWritable value : values) {
count+=value.get();
}
context.write(key,new LongWritable(count));
}
}
public static void main(String[] args) throws Exception{
// 配置mapreduce
Job job = Job.getInstance();
job.setJobName("第一个mr程序 单词统计");
job.setJarByClass(MR01.class);
//Map端所在类的位置
job.setMapperClass(WordMapper.class);
//reduce端所在类的位置
job.setReducerClass(WordReducer.class);
//指定map端kv的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//指定reduce端kv的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//指定路径
Path input = new Path("/words.txt");
Path out = new Path("/output");//输出路径不能已存在
FileInputFormat.addInputPath(job,input);
FileOutputFormat.setOutputPath(job,out);
//启动
job.waitForCompletion(true);
System.out.println("正在运行mr");
}
}
统计一行多个单词
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class mrtest02 {
public static class WordMapper extends Mapper {
//map端
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(",");
for (String word :words){
context.write(new Text(word),new LongWritable(1));
}
}
}
public static class WordReduce extends Reducer{
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
long count=0 ;//接收每个单词的累加结果 ,int接收类型太小,选择long
for (LongWritable value : values) {
count+=value.get();
}
context.write(key,new LongWritable(count));
}
}
// maia方法中构建mapreduce任务 通过Job类构建
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance();
job.setJobName("统计每行多个单词");
job.setJarByClass(MR02.class);
//map reduce 的输出格式
job.setMapperClass(WordMapper.class);//指定map端类
job.setMapOutputKeyClass(Text.class);//map端输出格式
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(WordReduce.class);//指定reduce端类
job.setOutputKeyClass(Text.class);//map端输出格式
job.setOutputValueClass(LongWritable.class);
//指定路径 注意:输出路径不能已存在
Path input = new Path("/words.txt");
FileInputFormat.addInputPath(job,input);
Path out = new Path("/output");
//路径不能已存在
// 手动加上 如果存在就删除 FileSystem
Path output = new Path("/output");
FileSystem fs = FileSystem.get(new Configuration());
if (fs.exists(output)){
fs.delete(output,true);
}
FileOutputFormat.setOutputPath(job,output);
//启动job
job.waitForCompletion(true);
System.out.println("统计一行多个单词");
//运行过程中手动指定具体的类
// hadoop jar xxxxx.jar com.shujia.mr.具体的类名
}
}
按照班级求age
//1500100981,经鹏涛,23,男,文科六班
求同学年龄和班级
分组,聚合
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class mrtest03 {
public static class ClazzMapper extends Mapper{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line =value.toString();
String[] split = line.split(",");
//1500100981,经鹏涛,23,男,文科六班
int age =Integer.parseInt(split[2]);
String clazz =split[4];
context.write(new Text(clazz),new LongWritable(age));
}
}
public static class ClazzReducer extends Reducer{
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
long agesum =0;
for (LongWritable value :values){
agesum+=value.get();
}
context.write(key,new LongWritable(agesum));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance();
job.setNumReduceTasks(2);//reduce数量可以更改,一个reduce产生一个处理文件
job.setJobName("按照班级求age");
job.setJarByClass(MR03CLazzAgeSum.class);
//map reduce 的输出格式
job.setMapperClass(MR03CLazzAgeSum.ClazzMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(MR03CLazzAgeSum.ClazzReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//指定路径 注意:输出路径不能已存在
Path input = new Path("/students.txt");
FileInputFormat.addInputPath(job,input);
//路径不能已存在
// 手动加上 如果存在就删除 FileSystem
Path output = new Path("/output");
FileSystem fs = FileSystem.get(new Configuration());
if(fs.exists(output)){
fs.delete(output,true);
}
FileOutputFormat.setOutputPath(job,output);
//启动job
job.waitForCompletion(true);
System.out.println("按照班级求age");
//运行过程中手动指定具体的类
// hadoop jar xxxxx.jar com.shujia.mr.具体的类名
}
}
筛选性别
不需要reduce端 ,job.setNumReduceTasks(0);设置为0
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MR04Sex {
public static class SexMapper extends Mapper{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(",");
String sex = split[3];
if("男".equals(sex)){
context.write(new Text(line),NullWritable.get());
}
}
}
public static void main(String[] args) throws Exception {
Job job = Job.getInstance();
job.setNumReduceTasks(0);
job.setJarByClass(MR04Sex.class);
job.setJobName("sex");
job.setMapperClass(SexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
Path input = new Path("/students.txt");
FileInputFormat.addInputPath(job,input);
//路径不能已存在
// 手动加上 如果存在就删除 FileSystem
Path output = new Path("/output");
FileSystem fs = FileSystem.get(new Configuration());
if(fs.exists(output)){
fs.delete(output,true);
}
FileOutputFormat.setOutputPath(job,output);
//启动job
job.waitForCompletion(true);
System.out.println("sex");
//运行过程中手动指定具体的类
// hadoop jar xxxxx.jar com.shujia.mr.具体的类名
}
}
combine 预聚合
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.Text;
import java.io.IOException;
public class mrtest05 {
public static class GenderMapper extends Mapper {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(",");
if("男".equals(split[3])){
context.write(new Text("男"),new LongWritable(1));
}
}
}
//combine 预聚合 一个发生在reduce之前reduce端
public static class CombineReducer extends Reducer{
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
long count =0;
for (LongWritable value :values){
count+=value.get();
}
context.write(key,new LongWritable(count));
}
}
public static class GenderReducer extends Reducer{
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
long count =0;
for (LongWritable value :values){
count+=value.get();
}
context.write(key,new LongWritable(count));
}
}
public static void main(String[] args) throws Exception {
Job job = Job.getInstance();
job.setJobName("combine 预聚合");
//map端
job.setJarByClass(mrtest05.class);
job.setMapperClass(GenderMapper.class);
job.setMapOutputKeyClass(Text.class);
//combine 预聚合
job.setCombinerClass(CombineReducer.class);
//reduce端
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(GenderReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
Path input = new Path("/students.txt");
FileInputFormat.addInputPath(job,input);
//路径不能已存在
// 手动加上 如果存在就删除 FileSystem
Path output = new Path("/output");
FileSystem fs =FileSystem.get(new Configuration());
if (fs.exists(output)){
fs.delete(output,true);
}
FileOutputFormat.setOutputPath(job,output);
//启动job
job.waitForCompletion(true);
System.out.println("combine 预聚合");
}
}
两表联查
有2个task任务,产生2个结果
package com.shujia.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class mrtest06 {
public static class FilesMaper extends Mapper{
@Override
protected void map(NullWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value,NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
Job job = Job.getInstance();
job.setNumReduceTasks(0);
job.setMapOutputValueClass(FilesMaper.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
Path input1 = new Path("/students.txt");
FileInputFormat.addInputPath(job,input1);
Path input2 = new Path("/score.txt");
FileInputFormat.addInputPath(job,input2);
//路径不能已存在
// 手动加上 如果存在就删除 FileSystem
Path output = new Path("/output");
FileSystem fs = FileSystem.get(new Configuration());
if(fs.exists(output)){
fs.delete(output,true);
}
FileOutputFormat.setOutputPath(job,output);
//启动job
job.waitForCompletion(true);
}
}



