1)WordCount
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class MyMapReducer {
public static class mapper extends Mapper {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
context.write(new Text(word), new IntWritable(1));
}
}
}
public static class reducer extends Reducer {
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count += value.get();//转化为java类型int
}
context.write(key, new Text(count + ""));//加一个""转化为string类型
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setMapperClass(mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//指定原始数据位置
FileInputFormat.setInputPaths(job,new Path("hdfs://localhost:9000/test"));
//指定处理的结果数据位置
FileOutputFormat.setOutputPath(job,new Path("hdfs://localhost:9000/result"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
2)hbase结合MapReduce
- hdfs传输文件到hbase
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class FileToHbase extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new FileToHbase(), args);
System.exit(run);
}
public static class HDFSToHbaseMapper extends Mapper {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}
public static class HDFSToHbaseReducer extends TableReducer {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
String[] split = key.toString().split(",");
Put put = new Put(split[0].getBytes()); //确定行号
put.addColumn("basic_info".getBytes(), "name".getBytes(), split[1].getBytes());
put.addColumn("basic_info".getBytes(), "gender".getBytes(), split[2].getBytes());
put.addColumn("basic_info".getBytes(), "age".getBytes(), split[3].getBytes());
put.addColumn("school_info".getBytes(), "major".getBytes(), split[4].getBytes());
context.write(null, put);
}
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = HbaseConfiguration.create();
FileSystem fs = FileSystem.get(conf);
Job job = Job.getInstance(conf);
job.setJarByClass(FileToHbase.class);
job.setMapperClass(HDFSToHbaseMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
Path inputPath = new Path("hdfs://localhost:9000/student");
FileInputFormat.addInputPath(job, inputPath);//设置源文件的地址
TableMapReduceUtil.initTableReducerJob("student", HDFSToHbaseReducer.class, job);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
boolean isDone = job.waitForCompletion(true);
return isDone ? 0 : 1;
}
}



