若本文对你有帮助,请记得点赞、关注我喔!
2)hbase结合MapReduce
- hdfs传输文件到hbase
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class FileToHbase extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new FileToHbase(), args);
System.exit(run);
}
public static class HDFSToHbaseMapper extends Mapper {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}
public static class HDFSToHbaseReducer extends TableReducer {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
String[] split = key.toString().split(",");
Put put = new Put(split[0].getBytes()); //确定行号
put.addColumn("basic_info".getBytes(), "name".getBytes(), split[1].getBytes());
put.addColumn("basic_info".getBytes(), "gender".getBytes(), split[2].getBytes());
put.addColumn("basic_info".getBytes(), "age".getBytes(), split[3].getBytes());
put.addColumn("school_info".getBytes(), "major".getBytes(), split[4].getBytes());
context.write(null, put);
}
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = HbaseConfiguration.create();
FileSystem fs = FileSystem.get(conf);
Job job = Job.getInstance(conf);
job.setJarByClass(FileToHbase.class);
job.setMapperClass(HDFSToHbaseMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
Path inputPath = new Path("hdfs://localhost:9000/student");
FileInputFormat.addInputPath(job, inputPath);//设置源文件的地址
TableMapReduceUtil.initTableReducerJob("student", HDFSToHbaseReducer.class, job);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
boolean isDone = job.waitForCompletion(true);
return isDone ? 0 : 1;
}
}
2)从HBaes表读取数据并输出到HDFS文件
package HbaseWithMapReducer;
import java.io.IOException;
import java.util.Scanner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.fs.FileSystem;
public class HbaseToHdfs extends Configured implements Tool {
public static void main(String[] args) throws Exception {
ToolRunner.run(new HbaseToHdfs(),args);
}
public static final byte[] family = "basic_info".getBytes();
public static final byte[] column = "age".getBytes();
public static class MyMapper extends TableMapper {
protected void map(ImmutableBytesWritable key, Result result, Context context) throws IOException, InterruptedException {
String major = Bytes.toString(result.getValue(Bytes.toBytes("school_info"), Bytes.toBytes("major")));
byte[] value = result.getValue(family, column); //这一行的age
int age = Integer.parseInt(Bytes.toString(value));//将这一行age转为Int
context.write(new Text(major), new IntWritable(age));//把这一行读入到context
}
}
public static class MyReducer extends Reducer {
public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
int sum = 0;
int num = 0;
for (IntWritable value : values) {
sum += value.get();
num++;
}
context.write(key, new DoubleWritable(sum * 1.0 / num));
}
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = HbaseConfiguration.create();
Job job = Job.getInstance(conf);
Scan scan = new Scan();
scan.setCaching(300);
scan.setCacheBlocks(false);
//设置Mapper
//此时TableMapReduceUtil.initTableMapperJob设置的是Mapper,因为是Mapper对Hbase表操作
//其中确定了输出的类型
String tName = "student";
TableMapReduceUtil.initTableMapperJob(tName, scan, MyMapper.class, Text.class, IntWritable.class, job);
//设置Reducer
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
String strOutputPath = "hdfs://localhost:9000/output";
FileOutputFormat.setOutputPath(job,new Path(strOutputPath));
return job.waitForCompletion(true) ? 0 : 1;
}
}
3)从Hbase表中读出数据,并输出结果到Hbase表中
import java.io.IOException;
import java.util.Scanner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
public class Task extends Configured implements Tool {
public static class MyMapper extends TableMapper {
public static final byte[] family = "bacin_info".getBytes();
public static final byte[] column = "age".getBytes();
@Override
protected void map(ImmutableBytesWritable rowKey, Result result,Context context)
throws IOException,InterruptedException{
String major = Bytes.toString(result.getValue("school_info".getBytes(),
"major".getBytes()));
byte[] value = result.getValue(family,column);
String str = Bytes.toString(value);
int age = Integer.parseInt(Bytes.toString(value));
context.write(new Text(major),new IntWritable(age));
}
}
public static class MyTableReducer extends TableReducer {
@Override
int sum =0;
int num = 0 ;
for(IntWritable value :values){
sum+=value.get();
num++;
}
double avg =0;
avg=sum*1.0/num;
String i,j;
i=x+"";
j=avg+"";
Put put =new Put(i.getBytes());
x++;
put.addColumn("average_infos".getBytes(),"age".getBytes(),key.getBytes());
put.addColumn("average_infos".getBytes(),"major".getBytes(),j.getBytes());
context.write(null,put);
}
public int run(String[] args) throws Exception {
//配置Job
Configuration conf = HbaseConfiguration.create();
Job job = Job.getInstance(conf);
job.setJarByClass(Task.class);
Scan scan = new Scan();
scan.setCaching(300);
scan.setCacheBlocks(false);
String strTableName = "student";
TableMapReduceUtil.initTableMapperJob(strTableName,scan,MyMapper.class,Text.class,IntWritable.class,job);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyTableReducer.class);
TableMapReduceUtil.initTableReducerJob("t_avg",MyTableReducer.class,job);
boolean inDone = job.waitForCompletion(true);
return isDone ? 0 : 1;
}
public static void main(String[] args) {
try {
int r = new Task().run(args);
if (r != 0) {
System.exit(r);
}
// 扫描't_avg'表读取map-reduce的结果,以下代码勿动
Configuration config = new Configuration();
Connection conn = ConnectionFactory.createConnection(config);
TableName tableName = TableName.valueOf("t_avg");
Table table = conn.getTable(tableName);
Scan scan = new Scan();
ResultScanner scanner = table.getScanner(scan);
for(Result result : scanner){
for (Cell kv : result.rawCells()) {
String value = Bytes.toString(CellUtil.clonevalue(kv));
System.out.println(value);
}
}
scanner.close();
conn.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
马上就会整理~



