从 Kafka 的业务数据 ODS 层读取数据,经过处理后, 将维度数据保存到 Hbase,将事实数据写回 Kafka 作为业务数据的 DWD 层。
一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如Hbase,Redis,MySQL 等。这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。
这里使用广播流进行实现。
package com.yyds.app.dwd;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.yyds.app.function.MyDeserialization;
import com.yyds.app.function.TableProcessFunction;
import com.yyds.bean.TableProcess;
import com.yyds.utils.MyKafkaUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.OutputTag;
public class baseDBApp {
public static void main(String[] args) {
//TODO 1、获取执行环境
System.setProperty("HADOOP_USER_NAME","root");
// 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 开启 Checkpoint,每隔 5 秒钟做一次 Checkpoint
env.enableCheckpointing(5000L);
//指定 CK 的一致性语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置超时时间
//env.getCheckpointConfig().setAlignmentTimeout(10000L);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
// 重启策略
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L));
//设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// 设置状态后端
env.setStateBackend(new FsStateBackend("hdfs://centos01:8020/flinkCDC/ck"));
//TODO 2、消费kafka ods_base_db 主题数据创建流
String sourceTopic = "ods_base_db";
String groupId = "base_db_app_2022";
FlinkKafkaConsumer kafkaConsumer = MyKafkaUtils.getKafkaConsumer(sourceTopic, groupId);
DataStreamSource kafkaDS = env.addSource(kafkaConsumer);
//TODO 3、将每行数据转换为JSON对象并过滤 主流
SingleOutputStreamOperator jsonObjDS = kafkaDS.map(JSON::parseObject)
.filter(new FilterFunction() {
@Override
public boolean filter(JSONObject value) throws Exception {
String type = value.getString("type");
return !"delete".equals(type);
}
});
//TODO 4、使用flink cdc消费配置表 并处理为广播流
DebeziumSourceFunction mySQLSource = MySQLSource.builder()
.hostname("centos01")
.port(3306)
.username("root")
.password("123456")
.databaseList("flink-realtime")
.tableList("flink-realtime.table_process") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据 注意:指定的时候需要使用"db.table"的方式
.deserializer(new MyDeserialization())
.startupOptions(StartupOptions.initial())
.build();
DataStreamSource tableProcessDS = env.addSource(mySQLSource);
// 广播状态
MapStateDescriptor mapStateDescriptor = new MapStateDescriptor(
"map-state",
String.class,
TableProcess.class
);
BroadcastStream broadcastStream = tableProcessDS.broadcast(mapStateDescriptor);
//TODO 5、连接主流和广播流
BroadcastConnectedStream connectedStream = jsonObjDS.connect(broadcastStream);
//TODO 6、处理数据 (分流)
OutputTag hbaseTag = new OutputTag("hbase-tag"){};
SingleOutputStreamOperator kafkaDataStream = connectedStream.process(new TableProcessFunction(
hbaseTag,
mapStateDescriptor
));
//TODO 7、提取kafka流数据 和 Hbase流数据
DataStream hbaseDataStream = kafkaDataStream.getSideOutput(hbaseTag);
kafkaDataStream.print("kafka----------");
hbaseDataStream.print("hbase----------");
//TODO 8、kafka数据到kafka主题 和 Hbase流数据写入phoenix表
//TODO 9、启动任务
env.execute("baseDBApp");
}
}
(2)BroadcastProcessFunction
package com.yyds.app.function; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.yyds.bean.TableProcess; import com.yyds.common.FlinkConfig; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.Arrays; import java.util.List; public class TableProcessFunction extends BroadcastProcessFunction(3)其他{ private Connection connection; private OutputTag hbaseTag; private MapStateDescriptor mapStateDescriptor; public TableProcessFunction(OutputTag hbaseTag, MapStateDescriptor mapStateDescriptor) { this.hbaseTag = hbaseTag; this.mapStateDescriptor = mapStateDescriptor; } @Override public void open(Configuration parameters) throws Exception { Class.forName(FlinkConfig.PHOENIX_DRIVER); connection = DriverManager.getConnection( FlinkConfig.PHOENIX_SERVER ); } // 广播流 @Override public void processBroadcastElement(String value, Context ctx, Collector out) throws Exception { //{"database":"","before":{},"after":{},"type":"insert","tableName":""} // 1、解析数据 String data = JSONObject.parseObject(value).getString("after"); TableProcess tableProcess = JSON.parseObject(data, TableProcess.class); // 2、检查hbase表是否存在并建表 if (TableProcess.SINK_TYPE_Hbase.equals(tableProcess.getSinkType())) { checkTable( tableProcess.getSinkTable(), tableProcess.getSinkColumns(), tableProcess.getSinkPk(), tableProcess.getSinkExtend() ); } // 3、写入状态 广播出去 BroadcastState broadcastState = ctx.getBroadcastState(mapStateDescriptor); String key = tableProcess.getSourceTable() + "-" + tableProcess.getOperateType(); broadcastState.put(key, tableProcess); } // 主流 @Override public void processElement(JSONObject value, ReadOnlyContext ctx, Collector out) throws Exception { // 1、读取状态 ReadOnlyBroadcastState readonlyBroadcastState = ctx.getBroadcastState(mapStateDescriptor); //{"database":"","before":{},"after":{},"type":"insert","tableName":""} String tableName = value.getString("tableName"); String type = value.getString("type"); String key = tableName + "-" + type; TableProcess tableProcess = readOnlyBroadcastState.get(key); if (tableProcess != null) { // 2、过滤数据 JSONObject data = value.getJSONObject("after"); filterColumn(data, tableProcess.getSinkColumns()); // 3、分流 // 将hbase输出表/kafka主题信息写入到value中 value.put("sinkTable", tableProcess.getSinkTable()); String sinkType = tableProcess.getSinkType(); if (TableProcess.SINK_TYPE_KAFKA.equals(sinkType)) { //kafka数据,写入主流 out.collect(value); } else if (TableProcess.SINK_TYPE_Hbase.equals(sinkType)) { //hbase 写入侧输出流 ctx.output(hbaseTag, value); } } else { System.out.println("key = " + key + " ,不存在"); } } //过滤字段 private void filterColumn(JSONObject data, String sinkColumns) { String[] split = sinkColumns.split(","); List columns = Arrays.asList(split); // Iterator > iterator = data.entrySet().iterator(); // while (iterator.hasNext()){ // Map.Entry next = iterator.next(); // if(!columns.contains(next.getKey())){ // iterator.remove(); // } // } data.entrySet().removeIf(next -> !columns.contains(next.getKey())); } // 建表create table if not exists mydb.test(id varchar primary key,name varchar,sex varchar) private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) { PreparedStatement preparedStatement = null; try { //给主键以及扩展字段赋默认值 if (sinkPk == null) { sinkPk = "id"; } if (sinkExtend == null) { sinkExtend = ""; } StringBuffer sql = new StringBuffer("create table if not exists ") .append(FlinkConfig.Hbase_SCHEMA) .append(".") .append(sinkTable) .append(" ( "); String[] fields = sinkColumns.split(","); for (int i = 0; i < fields.length; i++) { String field = fields[i]; // 判断是否为主键 if (sinkPk.equals(field)) { sql.append(field).append(" varchar primary key"); } else { sql.append(field).append(" varchar"); } // 判断是否为最后一个字段 if (i < fields.length - 1) { sql.append(","); } } sql.append(")").append(sinkExtend); System.out.println(sql); //执行 preparedStatement = connection.prepareStatement(sql.toString()); preparedStatement.execute(); } catch (SQLException e) { e.printStackTrace(); throw new RuntimeException(sinkTable + " 建表失败!!!"); }finally { if(preparedStatement != null){ try { preparedStatement.close(); } catch (SQLException e) { e.printStackTrace(); } } } } }
package com.yyds.bean;
import lombok.Data;
@Data
public class TableProcess {
//动态分流 Sink 常量
public static final String SINK_TYPE_Hbase = "hbase";
public static final String SINK_TYPE_KAFKA = "kafka";
public static final String SINK_TYPE_CK = "clickhouse";
//来源表
String sourceTable;
//操作类型 insert,update,delete
String operateType;
//输出类型 hbase kafka
String sinkType;
//输出表(主题)
String sinkTable;
//输出字段
String sinkColumns;
//主键字段
String sinkPk;
//建表扩展
String sinkExtend;
}
package com.yyds.common;
public class FlinkConfig {
//Phoenix 库名
public static final String Hbase_SCHEMA = "Flink_REALTIME";
//Phoenix 驱动
public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
//Phoenix 连接参数
public static final String PHOENIX_SERVER =
"jdbc:phoenix:centos01,centos02,centos03:2181";
}



