import cn.flink.streaming.sink.VehicleDetailSinkFunction;
import cn.flink.streaming.source.VechileDetailSourceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple9;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
//TODO 1)初始化flink流处理的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //TODO 2)设置检查点相关的参数(checkpoint周期,任务失败时是否结束job) env.enableCheckpointing(5000L); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(30000); env.setStateBackend(new MemoryStateBackend()); env.setRestartStrategy(RestartStrategies.noRestart()); //TODO 3)添加数据源(加载hbase)的数据,表:itcast_src DataStreamSource> streamSource = env.addSource(new VechileDetailSourceFunction("itcast_src")); //TODO 3.1)继承自RichSourceFunction //TODO 3.2)实现open、run、close方法 //TODO 4)实现数据的写入到hbase中 //streamSource.printToErr(); VehicleDetailSinkFunction vehicleDetailSinkFunction = new VehicleDetailSinkFunction("itcastsrc_vehicle_detail"); streamSource.addSink(vehicleDetailSinkFunction); //TODO 4.1)创建表,使用gz压缩格式 //TODO 4.2)自定义sink的实现 //TODO 5)执行任务,查看明细数据ETL任务的结果 env.execute(); } }
import cn.itcast.utils.ConfigLoader;
import org.apache.flink.api.java.tuple.Tuple9;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import javax.print.DocFlavor;
import java.io.IOException;
import java.util.Iterator;
public class VechileDetailSourceFunction extends RichSourceFunction<
Tuple9> {
//定义需要操作的表的对象
private Table table = null;
//创建connection对象
private Connection connection = null;
//定义表名
private String tableName = null;
//定义扫描器对象
private Scan scan = null;
//定义列族的字节数组对象
private final byte[] family = "cf".getBytes(ConfigConstants.DEFAULT_CHARSET);
public VechileDetailSourceFunction(String tableName){
this.tableName = tableName;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
org.apache.hadoop.conf.Configuration configuration = HbaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", ConfigLoader.getProperty("zookeeper.quorum"));
configuration.set("hbase.zookeeper.property.clientPort", ConfigLoader.getProperty("zookeeper.clientPort"));
configuration.set(TableInputFormat.INPUT_TABLE, tableName);
connection = ConnectionFactory.createConnection(configuration);
table = connection.getTable(TableName.valueOf(tableName));
scan = new Scan();
//为scan对象添加列族
scan.addFamily(family);
}
@Override
public void close() throws Exception {
super.close();
if(table != null) table.close();
if(connection!=null) connection.close();
}
@Override
public void run(SourceContext> ctx) throws Exception {
//执行扫描操作,获得scan扫描结果
ResultScanner results = table.getScanner(scan);
Iterator iterator = results.iterator();
while (iterator.hasNext()){
Result result = iterator.next();
//获取rowkey
String rowKey = Bytes.toString(result.getRow());
//获取vin
String vin = Bytes.toString(result.getValue(family, "vin".getBytes()));
//终端时间
String terminalTime = Bytes.toString(result.getValue(family, "terminalTime".getBytes()));
//存储时间
String processTime = Bytes.toString(result.getValue(family, "processTime".getBytes()));
//电量百分比
String currentElectricity = Bytes.toString(result.getValue(family, "currentElectricity".getBytes()));
//当前电量
String remainPower = Bytes.toString(result.getValue(family, "remainPower".getBytes()));
//百公里油耗
String fuelConsumption100km = Bytes.toString(result.getValue(family, "fuelConsumption100km".getBytes()));
//发动机速度
String engineSpeed = Bytes.toString(result.getValue(family, "engineSpeed".getBytes()));
//车辆速度
String vehicleSpeed = Bytes.toString(result.getValue(family, "vehicleSpeed".getBytes()));
//返回获取到的字段
ctx.collect(Tuple9.of(
rowKey,vin,terminalTime,currentElectricity,remainPower,fuelConsumption100km,engineSpeed,vehicleSpeed,processTime
));
}
}
@Override
public void cancel() {
try {
if(table != null) table.close();
if(connection!=null) connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}



