最基本的MapReduecr程序的写法
流程:将数据从本地文件导入,经过MapReduecr数据分析,将分析结果存储到HDFS
案例代码 Mapper代码import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class TestMapper extends MapperReducer代码{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 逻辑代码... } }
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class TestReducer extends ReducerTool代码{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // 逻辑代码... } }
import com.cw.ct.analysis.mapper.TestMapper;
import com.cw.ct.analysis.reducer.TestReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.util.Tool;
public class TestTool implements Tool {
private Configuration configuration = null;
public int run(String[] strings) throws Exception {
// 初始化job任务
Job job = Job.getInstance();
job.setJarByClass(TestTool.class);
//设置运行哪个map Task
job.setMapperClass(TestMapper.class);
//设置运行哪个reduce Task
job.setReducerClass(TestReducer.class);
//设置map Task的输出的(key,value)的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置reduce Task的输出的(key,value)的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定要处理的数据所在的位置
FileInputFormat.setInputPaths(job,"hdfs://192.168.91.101:8020/wordcount/input/big.txt");
//指定处理之后的结果数据保存位置
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.91.101:8020/wordcount/output"));
boolean result = job.waitForCompletion(true);
if (result){
return JobStatus.State.SUCCEEDED.getValue();
}else{
return JobStatus.State.FAILED.getValue();
}
}
public void setConf(Configuration configuration) {
this.configuration = configuration
}
public Configuration getConf() {
return this.configuration;
}
}
启动类
public class Bootstrap {
public static void main(String[] args) throws Exception {
// 写法一:
int result = ToolRunner.run(new TestTool(), args);
// 写法二:
// new Configuration():将MapReducer程序运行的环境中的配置文件自动加载(core.xml,hbase.xml...)
// new Configuration()其实写不写都可以,ToolRunner会检测tool中的Configuration是否不为null值,为null值自动创建一个Configuration
int result = ToolRunner.run(new Configuration(),new TestTool(), args);
if (result == JobStatus.State.SUCCEEDED.getValue()){
System.out.println("运行成功!");
System.exit(0);
}else{
System.out.println("运行失败!");
System.exit(1);
}
}
}
模板2
Hbase集成MapReducer
流程从HDFS中读取文件数据,将数据存储到Hbase
案例代码 Mapper代码public class TestMapper extends MapperReducer代码{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 逻辑代码... } }
public class TestReducer extends TableReducertool代码{ @Override protected void reduce(LongWritable key, Iterable values, Context context) throws IOException, InterruptedException { // 示范代码,仅供参考 for(Text value:values){ //获取每行的数据 String [] fields=value.toString().split("t"); //构建Put对象 Put put = new Put(Bytes.toBytes(fields[0])); //4.给Put对象赋值 put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(fields[1])); put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("color"),Bytes.toBytes(fields[2])); context.write(NullWritable.get(),put); } } }
public class TestTool implements Tool {
//定义一个Configuration
private Configuration configuration = null;
public int run(String[] args) throws Exception {
//1.获取Job对象
Job job = Job.getInstance(configuration);
//2.设置驱动类路径
job.setJarByClass(TestTool.class);
//3.设置Mapper和Mapper输出的KV类型
job.setMapperClass(TestMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
//4.设置Reducer类
TableMapReduceUtil.initTableReducerJob(args[1],TestReducer.class,job);
//5.设置输入参数
FileInputFormat.setInputPaths(job,new Path(args[0]));
//6.提交任务
boolean result = job.waitForCompletion(true);
return result?0:1;
}
public void setConf(Configuration configuration) {
this.configuration=configuration;
}
public Configuration getConf() {
return configuration;
}
}
启动类
public class Bootstrap {
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(cnew TestTool(), args);
System.exit(run);
}
}
模板3
流程
从Hbase中读取数据,再将数据存储到Hbase
案例代码 Mapper代码public class TestMapper extends TableMapperReducer代码{ protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { // 以下为示范代码 //构建Put对象 Put put = new Put(key.get()); //获取数据 for (Cell cell:value.rawCells()){ //给Put对象赋值 put.add(cell); } //写出 context.write(key,put); } }
public class TestReducer extends TableReducertool代码{ @Override protected void reduce(ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException { for(Put put:values){ context.write(NullWritable.get(),put); } } }
public class TestTool implements Tool {
private Configuration configuration = null;
public int run(String[] args) throws Exception {
Job job = Job.getInstance(configuration);
job.setJarByClass(FruitDriver2.class);
// 设置Mapper,fruit1为Hbase表名,new Scan()为全局扫描读取数据
TableMapReduceUtil.initTableMapperJob("fruit1",new Scan(),TestMapper.class,ImmutableBytesWritable.class,Put.class,job);
// 设置Reducer,fruit2为Hbase表名
TableMapReduceUtil.initTableReducerJob("fruit2",TestReducer.class,job);
boolean result = job.waitForCompletion(true);
return result ? 0:1;
}
public void setConf(Configuration configuration) {
this.configuration=configuration;
}
public Configuration getConf() {
return configuration;
}
}
启动类
public class Bootstrap{
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(configuration, new FruitDriver2(), args);
System.exit(run);
}
}
模板4
将MapReducer与Mysql和Hbase集成
流程MapReducer从Hbase中读取数据,然后数据分析,将分析结果保存到Mysql
案例代码 Mapper代码public class TestMapper extends TableMapperReducer代码{ @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { // 逻辑代码... } }
public class TestReducer extends Reducer自定义输出格式{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // 逻辑代码... } }
public class MySqlTestOutputFormat extends OutputFormattool代码{ private FileOutputCommitter committer = null; // 实现一个静态内部类 protected static class MySQLRecordWrite extends RecordWriter { public MySQLRecordWrite() { } @Override public void write(Text text, Text text2) throws IOException, InterruptedException { // 实现将数据存储到Mysql中 } @Override public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { } } @Override public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { return new MySQLRecordWrite(); } @Override public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException { } // 以下的方法代码照搬即可 @Override public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { if (this.committer == null) { Path output = getOutputPath(taskAttemptContext); this.committer = new FileOutputCommitter(output,taskAttemptContext); } return this.committer; } private static Path getOutputPath(JobContext job) { String name = job.getConfiguration().get(FileOutputFormat.OUTDIR); return name == null ? null: new Path(name); } }
public class TestTool implements Tool {
private Configuration configuration = null;
public int run(String[] strings) throws Exception {
Job job = Job.getInstance();
job.setJarByClass(AnalysisTextTool.class);
// 设置mapper
TableMapReduceUtil.initTableMapperJob(
Names.TABLE.getValue(),
new Scan(),
TestMapper.class,
Text.class,
Text.class,
job
);
// 设置reducer
job.setReducerClass(TestReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 设置自定义的输出格式
// outputformat
job.setOutputFormatClass(MySqlTextOutputFormat.class);
boolean result = job.waitForCompletion(true);
return result ? 0:1;
}
public void setConf(Configuration configuration) {
this.configuration = configuration
}
public Configuration getConf() {
return this.configuration;
}
}
启动类
public class Bootstrap{
public static void main(String[] args) throws Exception {
int run = ToolRunner.run( new TestTool(), args);
System.exit(run);
}
}
自定义数据类型
有时,在MapReducer中的数据类型是无法满足我们的需求的,因此我们要定义一些自定义数据对象
模板 自定义Key类型public class TestKey implements WritableComparable自定义Value类型{ private String tel; private String date; public TestKey() { } public TestKey(String tel, String date) { this.tel = tel; this.date = date; } public String getTel() { return tel; } public void setTel(String tel) { this.tel = tel; } public String getDate() { return date; } public void setDate(String date) { this.date = date; } public int compareTo(TestKey key) { // 以下为示范代码 int result = tel.compareTo(key.getTel()); // 如果tel相同则比较date if (result == 0){ result = date.compareTo(key.getDate()); } // 返回0是相同的意思 return result; } public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(tel); dataOutput.writeUTF(date); } public void readFields(DataInput dataInput) throws IOException { // 读取顺序要和写数据一致 tel = dataInput.readUTF(); date = dataInput.readUTF(); } }
public class TestValue implements Writable {
private String sumCall;
private String sumDuration;
public TestValue() {
}
public TestValue(String sumCall, String sumDuration) {
this.sumCall = sumCall;
this.sumDuration = sumDuration;
}
public String getSumCall() {
return sumCall;
}
public void setSumCall(String sumCall) {
this.sumCall = sumCall;
}
public String getSumDuration() {
return sumDuration;
}
public void setSumDuration(String sumDuration) {
this.sumDuration = sumDuration;
}
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(sumCall);
dataOutput.writeUTF(sumDuration);
}
public void readFields(DataInput dataInput) throws IOException {
// 读取顺序要和写数据一致
sumCall = dataInput.readUTF();
sumDuration = dataInput.readUTF();
}
}



