栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink读取hbase的数据然后对数据进行业务操作以后写入到hbase中

Flink读取hbase的数据然后对数据进行业务操作以后写入到hbase中

import cn.flink.streaming.sink.VehicleDetailSinkFunction;
import cn.flink.streaming.source.VechileDetailSourceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple9;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

 //TODO 1)初始化flink流处理的运行环境
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 //TODO 2)设置检查点相关的参数(checkpoint周期,任务失败时是否结束job)
 env.enableCheckpointing(5000L);
 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 env.getCheckpointConfig().setCheckpointTimeout(30000);
 env.setStateBackend(new MemoryStateBackend());
 env.setRestartStrategy(RestartStrategies.noRestart());
 //TODO 3)添加数据源(加载hbase)的数据,表:itcast_src
 DataStreamSource> streamSource =
         env.addSource(new VechileDetailSourceFunction("itcast_src"));
 //TODO 3.1)继承自RichSourceFunction
 //TODO 3.2)实现open、run、close方法
 //TODO 4)实现数据的写入到hbase中
 //streamSource.printToErr();
 VehicleDetailSinkFunction vehicleDetailSinkFunction = new VehicleDetailSinkFunction("itcastsrc_vehicle_detail");
 streamSource.addSink(vehicleDetailSinkFunction);
 //TODO 4.1)创建表,使用gz压缩格式
 //TODO 4.2)自定义sink的实现
 //TODO 5)执行任务,查看明细数据ETL任务的结果
 env.execute();
	 }
 }
import cn.itcast.utils.ConfigLoader;
import org.apache.flink.api.java.tuple.Tuple9;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import javax.print.DocFlavor;
import java.io.IOException;
import java.util.Iterator;


public class VechileDetailSourceFunction extends RichSourceFunction<
        Tuple9> {
    //定义需要操作的表的对象
    private Table table = null;
    //创建connection对象
    private Connection connection = null;
    //定义表名
    private String tableName = null;
    //定义扫描器对象
    private Scan scan = null;
    //定义列族的字节数组对象
    private final byte[] family = "cf".getBytes(ConfigConstants.DEFAULT_CHARSET);

    
    public VechileDetailSourceFunction(String tableName){
        this.tableName = tableName;
    }

    
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        org.apache.hadoop.conf.Configuration configuration = HbaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", ConfigLoader.getProperty("zookeeper.quorum"));
        configuration.set("hbase.zookeeper.property.clientPort", ConfigLoader.getProperty("zookeeper.clientPort"));
        configuration.set(TableInputFormat.INPUT_TABLE, tableName);
        connection = ConnectionFactory.createConnection(configuration);
        table = connection.getTable(TableName.valueOf(tableName));
        scan = new Scan();
        //为scan对象添加列族
        scan.addFamily(family);
    }

    
    @Override
    public void close() throws Exception {
        super.close();
        if(table != null) table.close();
        if(connection!=null) connection.close();
    }

    
    @Override
    public void run(SourceContext> ctx) throws Exception {
        //执行扫描操作,获得scan扫描结果
        ResultScanner results = table.getScanner(scan);
        Iterator iterator = results.iterator();
        while (iterator.hasNext()){
            Result result = iterator.next();
            //获取rowkey
            String rowKey = Bytes.toString(result.getRow());
            //获取vin
            String vin = Bytes.toString(result.getValue(family, "vin".getBytes()));
            //终端时间
            String terminalTime = Bytes.toString(result.getValue(family, "terminalTime".getBytes()));
            //存储时间
            String processTime = Bytes.toString(result.getValue(family, "processTime".getBytes()));
            //电量百分比
            String currentElectricity = Bytes.toString(result.getValue(family, "currentElectricity".getBytes()));
            //当前电量
            String remainPower = Bytes.toString(result.getValue(family, "remainPower".getBytes()));
            //百公里油耗
            String fuelConsumption100km = Bytes.toString(result.getValue(family, "fuelConsumption100km".getBytes()));
            //发动机速度
            String engineSpeed = Bytes.toString(result.getValue(family, "engineSpeed".getBytes()));
            //车辆速度
            String vehicleSpeed = Bytes.toString(result.getValue(family, "vehicleSpeed".getBytes()));
            //返回获取到的字段
            ctx.collect(Tuple9.of(
                    rowKey,vin,terminalTime,currentElectricity,remainPower,fuelConsumption100km,engineSpeed,vehicleSpeed,processTime
            ));
        }
    }

    
    @Override
    public void cancel() {
        try {
            if(table != null) table.close();
            if(connection!=null) connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/278436.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号