上文 >>>大数据项目之Flink实时数仓(数据采集/ODS层)
接着:
上一篇文章中简单把实时数仓数据采集以及ODS层搭建完成,开始搭建DWD层
DWD层搭建思路:从kafka的ods层读取用户行为数据和业务数据,进行简单处理,再写入到kafka dwd层
首先分析ods层的用户日志数据,分为页面日志,启动日志,曝光日志,三类数据结构不同,需要进行拆分,将拆分后数据再写回到kafka,作为日志dwd层
页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光侧输出流
public static FlinkKafkaConsumerFlink调用消费者读取数据并进行处理:getKafkaSource(String topic, String groupId) { //给配置信息对象添加配置项 properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); //获取 KafkaSource return new FlinkKafkaConsumer (topic, new SimpleStringSchema(), properties); }
数据流:web/app -> Nginx -> SpringBoot -> Kafka(ods) -> FlinkApp -> Kafka(dwd)
程 序:mockLog -> Nginx -> Logger.sh -> Kafka(ZK) -> baseLogApp -> kafka
//TODO 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//TODO 2.消费 ods_base_log 主题数据创建流
String sourceTopic = "ods_base_log";
String groupId = "base_log_app_210325";
DataStreamSource kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));
//TODO 3.将每行数据转换为JSON对象(过滤脏数据)
OutputTag outputTag = new OutputTag("Dirty") {};
SingleOutputStreamOperator jsonObjDS = kafkaDS.process(new ProcessFunction() {
@Override
public void processElement(String value, Context ctx, Collector out) throws Exception {
try {
//将value转为Json格式
JSONObject jsonObject = JSON.parseObject(value);
out.collect(jsonObject);
} catch (Exception e) {
//发生异常,将数据写入侧输出流
ctx.output(outputTag, value);
}
}
});
//TODO 4.新老用户校验 状态编程
SingleOutputStreamOperator jsonObjWithNewFlagDS = jsonObjDS.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"))
.map(new RichMapFunction() {
private ValueState valueState;
@Override
public void open(Configuration parameters) throws Exception {
valueState = getRuntimeContext().getState(new ValueStateDescriptor("value-state", String.class));
}
@Override
public JSONObject map(JSONObject value) throws Exception {
//获取数据中的"is_new"标记
String isNew = value.getJSONObject("common").getString("is_new");
//判断isNew标记是否为"1"
if ("1".equals(isNew)) {
//获取状态数据
String state = valueState.value();
if (state != null) {
//修改isNew标记
value.getJSONObject("common").put("is_new", "0");
} else {
valueState.update("1");
}
}
return value;
}
});
//TODO 5.分流 侧输出流 页面:主流 启动:侧输出流 曝光:侧输出流
OutputTag startTag = new OutputTag("start") {};
OutputTag displayTag = new OutputTag("display") {};
SingleOutputStreamOperator pageDS = jsonObjWithNewFlagDS.process(new ProcessFunction() {
@Override
public void processElement(JSONObject value, Context ctx, Collector out) throws Exception {
//获取启动日志字段
String start = value.getString("start");
if (start != null && start.length() > 0) {
//将数据写入启动日志侧输出流
ctx.output(startTag, value.toJSONString());
} else {
//将数据写入页面日志主流
out.collect(value.toJSONString());
//取出数据中的曝光数据
JSONArray displays = value.getJSONArray("displays");
if (displays != null && displays.size() > 0) {
//获取页面ID
String pageId = value.getJSONObject("page").getString("page_id");
for (int i = 0; i < displays.size(); i++) {
JSONObject display = displays.getJSONObject(i);
//添加页面id
display.put("page_id", pageId);
//将输出写出到曝光侧输出流
ctx.output(displayTag, display.toJSONString());
}
}
}
}
});
//TODO 6.提取侧输出流
DataStream startDS = pageDS.getSideOutput(startTag);
DataStream displayDS = pageDS.getSideOutput(displayTag);
//TODO 7.将三个流进行打印并输出到对应的Kafka主题中
startDS.print("Start>>>>>>>>>>>");
pageDS.print("Page>>>>>>>>>>>");
displayDS.print("Display>>>>>>>>>>>>");
startDS.addSink(MyKafkaUtil.getKafkaProducer("dwd_start_log"));
pageDS.addSink(MyKafkaUtil.getKafkaProducer("dwd_page_log"));
displayDS.addSink(MyKafkaUtil.getKafkaProducer("dwd_display_log"));
//TODO 8.启动任务
env.execute("baseLogApp");
业务数据(DWD)
分析业务数据
业务数据的变化,可以通过FlinkCDC采集到,但是FlinkCDC将数据统一写入一个Topic中,但是数据包含事实数据和维度数据,数据是从kafka业务数据ods层读取数据,经过处理后将维度数据保存在hbase中, 将事实数据写入kafka作为业务数据的dwd层。
封装Sink数据到Kafka主题方法:public static接受Kafka数据进行处理作为业务数据DWD层:FlinkKafkaProducer getKafkaSinkBySchema(KafkaSerializationSchema kafkaSerializationSchema) { properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 5 * 60 * 1000 + ""); return new FlinkKafkaProducer (DEFAULT_TOPIC, kafkaSerializationSchema, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); }
//TODO 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//TODO 2.消费Kafka ods_base_db 主题数据创建流
String sourceTopic = "ods_base_db";
String groupId = "base_db_app_210325";
DataStreamSource kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));
//TODO 3.将每行数据转换为JSON对象并过滤(delete) 主流
SingleOutputStreamOperator jsonObjDS = kafkaDS.map(JSON::parseObject)
.filter(new FilterFunction() {
@Override
public boolean filter(JSONObject value) throws Exception {
//取出数据的操作类型
String type = value.getString("type");
return !"delete".equals(type);
}
});
//TODO 4.使用FlinkCDC消费配置表并处理成 广播流
DebeziumSourceFunction sourceFunction = MySQLSource.builder()
.hostname("master")
.port(3306)
.username("root")
.password("000000")
.databaseList("mall2021_realtime")
.tableList("mall2021_realtime.table_process")
.startupOptions(StartupOptions.initial())
.deserializer(new CustomerDeserialization())
.build();
DataStreamSource tableProcessStrDS = env.addSource(sourceFunction);
MapStateDescriptor mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, TableProcess.class);
BroadcastStream broadcastStream = tableProcessStrDS.broadcast(mapStateDescriptor);
//TODO 5.连接主流和广播流
BroadcastConnectedStream connectedStream = jsonObjDS.connect(broadcastStream);
//TODO 6.分流 处理数据 广播流数据,主流数据(根据广播流数据进行处理)
OutputTag hbaseTag = new OutputTag("hbase-tag") {
};
SingleOutputStreamOperator kafka = connectedStream.process(new TableProcessFunction(hbaseTag, mapStateDescriptor));
//TODO 7.提取Kafka流数据和Hbase流数据
DataStream hbase = kafka.getSideOutput(hbaseTag);
//TODO 8.将Kafka数据写入Kafka主题,将Hbase数据写入Phoenix表
kafka.print("Kafka>>>>>>>>");
hbase.print("Hbase>>>>>>>>");
hbase.addSink(new DimSinkFunction());
kafka.addSink(MyKafkaUtil.getKafkaProducer(new KafkaSerializationSchema() {
@Override
public ProducerRecord serialize(JSONObject element, @Nullable Long timestamp) {
return new ProducerRecord(element.getString("sinkTable"),
element.getString("after").getBytes());
}
}));
//TODO 9.启动任务
env.execute("baseDBApp");
注意这里有个实体类:
@Data
public class TableProcess {
//动态分流 Sink 常量
public static final String SINK_TYPE_Hbase = "hbase";
public static final String SINK_TYPE_KAFKA = "kafka";
public static final String SINK_TYPE_CK = "clickhouse"; //暂时用不到
//来源表
String sourceTable;
//操作类型 insert,update,delete
String operateType;
//输出类型 hbase kafka
String sinkType;
//输出表(主题)
String sinkTable;
//输出字段
String sinkColumns;
//主键字段
String sinkPk;
//建表扩展
String sinkExtend;
}
常用配置常量值封装
//Phoenix 库名 public static final String Hbase_SCHEMA = "MALL2021_REALTIME"; //Phoenix 驱动 public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver"; //Phoenix 连接参数 public static final String PHOENIX_SERVER = "jdbc:phoenix:master,slave,slave1:2181";TableProcessFunction
public TableProcessFunction(OutputTagTableProcessFunction-openobjectOutputTag, MapStateDescriptor mapStateDescriptor) { this.objectOutputTag = objectOutputTag; this.mapStateDescriptor = mapStateDescriptor; }
public void open(Configuration parameters) throws Exception {
Class.forName(GmallConfig.PHOENIX_DRIVER);
connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
}
TableProcessFunction-processBroadcastElement
//value:{“db”:"",“tn”:"",“before”:{},“after”:{},“type”:""} //数据格式
public void processBroadcastElement(String value, Context ctx, CollectorTableProcessFunction-checkTableout) throws Exception { //1.获取并解析数据 JSONObject jsonObject = JSON.parseObject(value); String data = jsonObject.getString("after"); TableProcess tableProcess = JSON.parseObject(data, TableProcess.class); //2.建表 if (TableProcess.SINK_TYPE_Hbase.equals(tableProcess.getSinkType())) { checkTable(tableProcess.getSinkTable(), tableProcess.getSinkColumns(), tableProcess.getSinkPk(), tableProcess.getSinkExtend()); } //3.写入状态,广播出去 BroadcastState broadcastState = ctx.getBroadcastState(mapStateDescriptor); String key = tableProcess.getSourceTable() + "-" + tableProcess.getOperateType(); broadcastState.put(key, tableProcess); }
create table if not exists db.tn(id varchar primary key,tm_name varchar) xxx; //建表语句
private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {
PreparedStatement preparedStatement = null;
try {
if (sinkPk == null) {
sinkPk = "id";
}
if (sinkExtend == null) {
sinkExtend = "";
}
StringBuffer createTableSQL = new StringBuffer("create table if not exists ")
.append(GmallConfig.Hbase_SCHEMA)
.append(".")
.append(sinkTable)
.append("(");
String[] fields = sinkColumns.split(",");
for (int i = 0; i < fields.length; i++) {
String field = fields[i];
//判断是否为主键
if (sinkPk.equals(field)) {
createTableSQL.append(field).append(" varchar primary key ");
} else {
createTableSQL.append(field).append(" varchar ");
}
//判断是否为最后一个字段,如果不是,则添加","
if (i < fields.length - 1) {
createTableSQL.append(",");
}
}
createTableSQL.append(")").append(sinkExtend);
//打印建表语句
System.out.println(createTableSQL);
//预编译SQL
preparedStatement = connection.prepareStatement(createTableSQL.toString());
//执行
preparedStatement.execute();
} catch (SQLException e) {
throw new RuntimeException("Phoenix表" + sinkTable + "建表失败!");
} finally {
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
TableProcessFunction-processElement
value:{“db”:"",“tn”:"",“before”:{},“after”:{},“type”:""}
public void processElement(JSONObject value, ReadOnlyContext ctx, CollectorTableProcessFunction-filterColumnout) throws Exception { //1.获取状态数据 ReadOnlyBroadcastState broadcastState = ctx.getBroadcastState(mapStateDescriptor); String key = value.getString("tableName") + "-" + value.getString("type"); TableProcess tableProcess = broadcastState.get(key); if (tableProcess != null) { //2.过滤字段 JSONObject data = value.getJSONObject("after"); filterColumn(data, tableProcess.getSinkColumns()); //3.分流 //将输出表/主题信息写入Value value.put("sinkTable", tableProcess.getSinkTable()); String sinkType = tableProcess.getSinkType(); if (TableProcess.SINK_TYPE_KAFKA.equals(sinkType)) { //Kafka数据,写入主流 out.collect(value); } else if (TableProcess.SINK_TYPE_Hbase.equals(sinkType)) { //Hbase数据,写入侧输出流 ctx.output(objectOutputTag, value); } } else { System.out.println("该组合Key:" + key + "不存在!"); } }
@param data
{“id”:“11”,“tm_name”:“keven”,“logo_url”:“aaa”}
@param sinkColumns id,tm_name
{“id”:“11”,“tm_name”:“keven”}
private void filterColumn(JSONObject data, String sinkColumns) {
String[] fields = sinkColumns.split(",");
List columns = Arrays.asList(fields);
data.entrySet().removeIf(next -> !columns.contains(next.getKey()));
}
baseDBApp 中调用 TableProcessFunction 进行分流
//TODO 5.连接主流和广播流
BroadcastConnectedStream connectedStream = jsonObjDS.connect(broadcastStream);
//TODO 6.分流 处理数据 广播流数据,主流数据(根据广播流数据进行处理)
OutputTag hbaseTag = new OutputTag("hbase-tag") {
};
SingleOutputStreamOperator kafka = connectedStream.process(new TableProcessFunction(hbaseTag, mapStateDescriptor));
//TODO 7.提取Kafka流数据和Hbase流数据
DataStream hbase = kafka.getSideOutput(hbaseTag);
维度数据(DIM)
分析维度数据
DimSink 继承了 RickSinkFunction,这个 function 得分两条时间线。
一条是任务启动时执行 open 操作,可以把连接的初始化工作放在此处一次性执行。
另一条是随着每条数据的到达反复执行 invoke(),实
现数据的保存,主要策略就是根据数据组合成 sql 提交给 hbase。
public class DimSinkFunction extends RichSinkFunction分流Sink业务数据保存Kafka{ private Connection connection; @Override public void open(Configuration parameters) throws Exception { Class.forName(mallConfig.PHOENIX_DRIVER); connection = DriverManager.getConnection(mallConfig.PHOENIX_SERVER); connection.setAutoCommit(true); } //value:{"sinkTable":"dim_base_trademark","database":"mall-210325-flink","before":{"tm_name":"keven","id":12},"after":{"tm_name":"Keven","id":12},"type":"update","tableName":"base_trademark"} //SQL:upsert into db.tn(id,tm_name) values('...','...') @Override public void invoke(JSONObject value, Context context) throws Exception { PreparedStatement preparedStatement = null; try { //获取SQL语句 String sinkTable = value.getString("sinkTable"); JSONObject after = value.getJSONObject("after"); String upsertSql = genUpsertSql(sinkTable, after); System.out.println(upsertSql); //预编译SQL preparedStatement = connection.prepareStatement(upsertSql); //执行插入操作 preparedStatement.executeUpdate(); } catch (SQLException e) { e.printStackTrace(); } finally { if (preparedStatement != null) { preparedStatement.close(); } } } //data:{"tm_name":"kevenhe","id":12} //SQL:upsert into db.tn(id,tm_name,aa,bb) values('...','...','...','...') private String genUpsertSql(String sinkTable, JSONObject data) { Set keySet = data.keySet(); Collection
封装sink方法:
public static总结(DWD)FlinkKafkaProducer getKafkaSinkBySchema(KafkaSerializationSchema kafkaSerializationSchema) { properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 5 * 60 * 1000 + ""); return new FlinkKafkaProducer (DEFAULT_TOPIC, kafkaSerializationSchema, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); }
DWD的实时计算核心是数据分流,再就是状态识别,看一下使用过的一些算子RichMapFunction,ProcessFunction,RichSinkFunction。关于这些算子的选择,可以参考下面:
| Function | 可转换结构 | 可过滤数据 | 测输出 | open方法 | 可以使用状态 | 输出至 |
|---|---|---|---|---|---|---|
| MapFunction | yes | no | no | no | no | 下游算子 |
| FilterFunction | no | yes | no | no | no | 下游算子 |
| RichMapFunction | yes | no | no | yes | yes | 下游算子 |
| RichFilterFunction | no | yes | no | yes | yes | 下游算子 |
| ProcessFunction | yes | yes | yes | yes | yes | 下游算子 |
| SinkFunction | yes | yes | no | no | no | 外部 |
| RichSinkFunction | yes | yes | no | yes | yes | 外部 |
可以看出Rich函数功能强大,Processfunction功能更强大,但是功能越全面的函数使用越繁琐。



