package com.educoder.bigData.sharedbicycle;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import com.educoder.bigData.util.HbaseUtil;
public class UsageRateMapReduce extends Configured implements Tool {
public static final byte[] family = "info".getBytes();
public static class MyMapper extends TableMapper
protected void map(ImmutableBytesWritable rowKey, Result result, Context context)
throws IOException, InterruptedException {
IntWritable doubleWritable = new IntWritable(1);
context.write(new Text("departure"), doubleWritable);
}
}
public static class MyTableReducer extends TableReducer
@Override
public void reduce(Text key, Iterable
throws IOException, InterruptedException {
int totalNum = 0;
for (IntWritable num : values) {
int d = num.get();
totalNum += d;
}
Put put = new Put(Bytes.toBytes(key.toString()));
put.addColumn(family, "usageRate".getBytes(), Bytes.toBytes(String.valueOf(totalNum)));
context.write(null, put);
}
}
public int run(String[] args) throws Exception {
// 配置Job
Configuration conf = HbaseUtil.conf;
// Scanner sc = new Scanner(System.in);
// String arg1 = sc.next();
// String arg2 = sc.next();
String arg1 = "t_shared_bicycle";
String arg2 = "t_bicycle_usagerate";
try {
HbaseUtil.createTable(arg2, new String[] { "info" });
} catch (Exception e) {
// 创建表失败
e.printStackTrace();
}
Job job = configureJob(conf, new String[] { arg1, arg2 });
return job.waitForCompletion(true) ? 0 : 1;
}
private Job configureJob(Configuration conf, String[] args) throws IOException {
String tablename = args[0];
String targetTable = args[1];
Job job = new Job(conf, tablename);
ArrayList
FastDateFormat instance = FastDateFormat.getInstance("yyyy-MM-dd");
Scan scan = new Scan();
scan.setCaching(300);
scan.setCacheBlocks(false);// 在mapreduce程序中千万不要设置允许缓存
try {
Filter destinationFilter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("beginTime"), CompareOperator.GREATER_OR_EQUAL, Bytes.toBytes(String.valueOf(instance.parse("2017-08-01").getTime())));
Filter departure = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("endTime"), CompareOperator.LESS_OR_EQUAL, Bytes.toBytes(String.valueOf(instance.parse("2017-09-01").getTime())));
listForFilters.add(departure);
listForFilters.add(destinationFilter);
}
catch (Exception e) {
e.printStackTrace();
return null;
}
Filter filters = new FilterList(listForFilters);
scan.setFilter(filters);
// 初始化Mapreduce程序
TableMapReduceUtil.initTableMapperJob(tablename, scan, MyMapper.class, Text.class, IntWritable.class, job);
// 初始化Reduce
TableMapReduceUtil.initTableReducerJob(targetTable, // output table
MyTableReducer.class, // reducer class
job);
job.setNumReduceTasks(1);
return job;
}
}



