如果数据量大,维度多,用keyBy并不方便,建议写到外部实时数仓里,Clickhouse擅长实时查询,flink擅长实时处理。
一、多维度复杂统计(使用Clickhouse)
使用是clickhouse的ReplacingMergeTree,可以将同一个分区中,ID相同的数据进行merge,可以保留最新的数据,可以使用这个特点实现Flink + Clickhouse(勉强)实现数据一致性。
存在的问题:写入到clickhouse中的数据不能立即merge,需要手动optimize或后台自动合并。
解决方案:查询时在表名的后面加上final关键字,就只查最新的数据数据,但是效率变低了。
如何设计clickhouse的表?
1.可以支持维度查询(大宽表)
2.按照时间段进行查询(将时间作为表的字段并且建分区表)
3.可以统计出PV、UV(去重查询)
4.支持分区(按照时间进行分区)
5.支持覆盖(ReplacingMergeTree)(对查询结果准确性要求高的,表名后面加final)
6.如果生成一个唯一的ID (在Kafka中生成唯一的ID,topic+分区+偏移量)
7.相同的数据要进入到相同的分区(按照数据的时间即EventTime进行分区)
1、建表
CREATE TABLE tb_user_event
(
`id` String comment '数据唯一id,使用Kafka的topic+分区+偏移量',
`deviceId` String comment '设备ID',
`eventId` String comment '事件ID',
`isNew` UInt8 comment '是否是新用户1为新,0为老',
`os` String comment '系统名称',
`province` String comment '省份',
`channel` String comment '下载渠道',
`eventTime` DateTime64 comment '数据中所携带的时间',
`date` String comment 'eventTime转成YYYYMMDD格式',
`hour` String comment 'eventTime转成HH格式列席',
`processTime` DateTime comment '插入到数据库时的系统时间'
)
ENGINE = ReplacingMergeTree(processTime)
PARTITION BY (date, hour)
ORDER BY id;
2、自定义Kafka反序列器生成唯一ID
MyKafkaStringDeserializationSchema
package cn._51doit.kafka; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.kafka.clients.consumer.ConsumerRecord; import java.nio.charset.StandardCharsets; public class MyKafkaStringDeserializationSchema implements KafkaDeserializationSchema> { @Override public boolean isEndOfStream(Tuple2 nextElement) { return false; } @Override public Tuple2 deserialize(ConsumerRecord record) throws Exception { String topic = record.topic(); int partition = record.partition(); long offset = record.offset(); String id = topic + "-" + partition + "-" + offset; String value = new String(record.value(), StandardCharsets.UTF_8); return Tuple2.of(id, value); } @Override public TypeInformation > getProducedType() { return TypeInformation.of(new TypeHint >() {}); } }
FlinkUtils新增createKafkaStreamV2
package cn._51doit.utils;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class FlinkUtils {
public static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
public static ParameterTool parameterTool;
public static DataStream createKafkaStream(String[] args, Class extends DeserializationSchema> deserializer) throws Exception {
parameterTool = ParameterTool.fromPropertiesFile(args[0]);
long checkpointInterval = parameterTool.getLong("checkpoint.interval", 30000L);
String checkpointPath = parameterTool.getRequired("checkpoint.path");
env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new RocksDBStateBackend(checkpointPath, true));
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
List topics = Arrays.asList(parameterTool.getRequired("kafka.input.topics").split(","));
Properties properties = parameterTool.getProperties();
//从Kafka中读取数据
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(topics, deserializer.newInstance(), properties);
kafkaConsumer.setCommitOffsetsOnCheckpoints(false);
return env.addSource(kafkaConsumer);
}
public static DataStream createKafkaStreamV2(String[] args, Class extends KafkaDeserializationSchema> deserializer) throws Exception {
parameterTool = ParameterTool.fromPropertiesFile(args[0]);
long checkpointInterval = parameterTool.getLong("checkpoint.interval", 30000L);
String checkpointPath = parameterTool.getRequired("checkpoint.path");
env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);
//env.setStateBackend(new FsStateBackend(checkpointPath));
env.setStateBackend(new RocksDBStateBackend(checkpointPath, true));
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
List topics = Arrays.asList(parameterTool.getRequired("kafka.input.topics").split(","));
Properties properties = parameterTool.getProperties();
//从Kafka中读取数据
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(topics, deserializer.newInstance(), properties);
kafkaConsumer.setCommitOffsetsOnCheckpoints(false);
return env.addSource(kafkaConsumer);
}
}
调用TestKafkaId
package cn._51doit.test;
import cn._51doit.kafka.MyKafkaStringDeserializationSchema;
import cn._51doit.utils.FlinkUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
public class TestKafkaId {
public static void main(String[] args) throws Exception {
DataStream> kafkaStream = FlinkUtils.createKafkaStreamV2(args, MyKafkaStringDeserializationSchema.class);
kafkaStream.print();
FlinkUtils.env.execute();
}
}
3、使用JdbcSink将数据写入clickhouse
导入clickhouse驱动
org.apache.flink flink-connector-jdbc_${scala.binary.version} ${flink.version} ru.yandex.clickhouse clickhouse-jdbc 0.2.4
package cn._51doit.jobs;
import cn._51doit.constant.EventID;
import cn._51doit.kafka.MyKafkaStringDeserializationSchema;
import cn._51doit.pojo.DataBean;
import cn._51doit.udf.IsNewUserFunctionV2;
import cn._51doit.udf.JsonToBeanFunc;
import cn._51doit.udf.JsonToBeanFuncV2;
import cn._51doit.utils.FlinkUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.text.SimpleDateFormat;
import java.util.Date;
public class DataToClickhouse {
public static void main(String[] args) throws Exception {
DataStream> kafkaStream = FlinkUtils.createKafkaStreamV2(args, MyKafkaStringDeserializationSchema.class);
//解析数据
SingleOutputStreamOperator beanStream = kafkaStream.process(new JsonToBeanFuncV2());
beanStream.map(new MapFunction() {
private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd-HH");
@Override
public DataBean map(DataBean bean) throws Exception {
Long timestamp = bean.getTimestamp();
String format = dateFormat.format(new Date(timestamp));
String[] fields = format.split("-");
bean.setDate(fields[0]);
bean.setHour(fields[1]);
return bean;
}
}).addSink(JdbcSink.sink(
"insert into tb_user_event2 values (?,?,?,?,?,?,?,?,?,?,?,?)",
(ps, bean) -> {
ps.setString(1, bean.getId());
ps.setString(2, bean.getDeviceId());
ps.setString(3, bean.getEventId());
ps.setInt(4, bean.getIsN());
ps.setString(5, bean.getProvince());
ps.setString(6, bean.getOsName());
ps.setString(7, bean.getReleaseChannel());
ps.setString(8, bean.getDeviceType());
ps.setLong(9, bean.getTimestamp());
ps.setString(10, bean.getDate());
ps.setString(11, bean.getHour());
ps.setString(12, "2021-03-18 00:00:00");
},
JdbcExecutionOptions.builder().withBatchSize(FlinkUtils.parameterTool.getInt("clickhouse.batch.size"))
.withBatchIntervalMs(FlinkUtils.parameterTool.getInt("clickhouse.batch.interval"))
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(FlinkUtils.parameterTool.getRequired("clickhouse.url"))
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.build()));
FlinkUtils.env.execute();
}
}
JsonToBeanFuncV2
package cn._51doit.udf; import cn._51doit.pojo.DataBean; import com.alibaba.fastjson.JSON; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; public class JsonToBeanFuncV2 extends ProcessFunction, DataBean> { @Override public void processElement(Tuple2 tp, Context ctx, Collector out) throws Exception { try { String id = tp.f0; //数据唯一ID,topic-partition-offset DataBean dataBean = JSON.parseObject(tp.f1, DataBean.class); dataBean.setId(id); out.collect(dataBean); } catch (Exception e) { //e.printStackTrace(); //TODO 将有问题的数据保存起来 } } }
二、观看直播人数统计需求
a)实时统计累计观众(实时展示,直接用flink keyBy统计)
b)实时统计各个直播间在线人数(实时展示,直接用flink keyBy统计)
c)查看多个维度的明细(将数据写入到clickhouse中)
1、观看直播人数统计实现
实现方式一:
a)将数据来一条就写入到Redis/MySQL或大屏展示(延迟低、但是效率低、对数据库压力大)
b)再写一个job将各种明细数据写入到ClickHouse中(提交了2个job、数据重复计算)
package cn._51doit.jobs;
import cn._51doit.pojo.DataBean;
import cn._51doit.udf.AnchorDistinctTotalAudienceFunc;
import cn._51doit.udf.JsonToBeanFunc;
import cn._51doit.utils.FlinkUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class LiveAudienceCountV2 {
public static void main(String[] args) throws Exception {
DataStream kafkaStream = FlinkUtils.createKafkaStream(args, SimpleStringSchema.class);
SingleOutputStreamOperator beanStream = kafkaStream.process(new JsonToBeanFunc());
SingleOutputStreamOperator liveDataStream = beanStream.filter(new FilterFunction() {
@Override
public boolean filter(DataBean bean) throws Exception {
return bean.getEventId().startsWith("live");
}
});
//按照进入直播间的事件ID进行过滤
SingleOutputStreamOperator enterStream = liveDataStream.filter(new FilterFunction() {
@Override
public boolean filter(DataBean value) throws Exception {
return "liveEnter".equals(value.getEventId()) || "liveLeave".equals(value.getEventId());
}
});
//统计各个主播的累计观看人数
//按照主播ID(按照直播间)
//统计各个主播直播间实时在线人数
KeyedStream keyedEnterStream = enterStream.keyBy(bean -> bean.getProperties().get("anchor_id").toString());
SingleOutputStreamOperator> res = keyedEnterStream.process(new AnchorDistinctTotalAudienceFunc());
res.print();
FlinkUtils.env.execute();
}
}
AnchorDistinctTotalAudienceFunc
package cn._51doit.udf; import cn._51doit.pojo.DataBean; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter; import org.apache.flink.shaded.guava18.com.google.common.hash.Funnels; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; public class AnchorDistinctTotalAudienceFunc extends KeyedProcessFunction> { private transient ValueState uvState; private transient ValueState pvState; private transient ValueState > bloomFilterState; private transient ValueState onLineUserState; @Override public void open(Configuration parameters) throws Exception { //设置状态的TTL StateTtlConfig stateTtlConfig = StateTtlConfig .newBuilder(Time.hours(6)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .neverReturnExpired() .build(AnchorDistinctTotalAudienceFunc); ValueStateDescriptor uvStateDescriptor = new ValueStateDescriptor<>("uv-state", Integer.class); uvStateDescriptor.enableTimeToLive(stateTtlConfig); ValueStateDescriptor pvStateDescriptor = new ValueStateDescriptor<>("pv-state", Integer.class); pvStateDescriptor.enableTimeToLive(stateTtlConfig); ValueStateDescriptor > bloomFilterStateDescriptor = new ValueStateDescriptor >("bloom-filter-state", TypeInformation.of(new TypeHint >() { })); bloomFilterStateDescriptor.enableTimeToLive(stateTtlConfig); ValueStateDescriptor onLineUserStateDescriptor = new ValueStateDescriptor<>("uv-state", Integer.class); onLineUserStateDescriptor.enableTimeToLive(stateTtlConfig); uvState = getRuntimeContext().getState(uvStateDescriptor); pvState = getRuntimeContext().getState(pvStateDescriptor); bloomFilterState = getRuntimeContext().getState(bloomFilterStateDescriptor); onLineUserState = getRuntimeContext().getState(onLineUserStateDescriptor); } @Override public void processElement(DataBean bean, Context ctx, Collector > out) throws Exception { Integer onLineUserCount = onLineUserState.value(); String deviceId = bean.getDeviceId(); Integer uv = uvState.value(); Integer pv = pvState.value(); BloomFilter bloomFilter = bloomFilterState.value(); String eventId = bean.getEventId(); if(onLineUserCount ==null) { onLineUserCount = 0; } if ("liveEnter".equals(eventId)) { if (bloomFilter == null) { bloomFilter = BloomFilter.create(Funnels.unencodedCharsFunnel(), 1000000); pv = 0; uv = 0; } if (!bloomFilter.mightContain(deviceId)) { bloomFilter.put(deviceId); uv++; bloomFilterState.update(bloomFilter); uvState.update(uv); } pv++; pvState.update(pv); //累计在线人数 onLineUserCount++; onLineUserState.update(onLineUserCount); } else { onLineUserCount--; onLineUserState.update(onLineUserCount); } out.collect(Tuple4.of(ctx.getCurrentKey(), uv, pv, onLineUserCount)); } }
实现方式二:
将数据攒起来批量(不是简单的增量聚合,不能使用窗口,而是使用定时器)写入到Redis/MySQL(延迟高、效率高、对数据库的压力小)
在同一个job中,将数据写入到Clickhouse中(同一个主题(类型)的数据尽量在一个job中完成,将不同的数据打上不同的标签,侧流输出,可以节省集群资源。避免数据重复读取和计算)
package cn._51doit.jobs;
import cn._51doit.kafka.MyKafkaStringDeserializationSchema;
import cn._51doit.pojo.DataBean;
import cn._51doit.udf.AnchorDistinctTotalAudienceFunc;
import cn._51doit.udf.JsonToBeanFunc;
import cn._51doit.udf.JsonToBeanFuncV2;
import cn._51doit.utils.FlinkUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;
import java.util.Optional;
public class LiveAudienceCountV2 {
public static void main(String[] args) throws Exception {
DataStream> kafkaStream = FlinkUtils.createKafkaStreamV2(args, MyKafkaStringDeserializationSchema.class);
SingleOutputStreamOperator beanStream = kafkaStream.process(new JsonToBeanFuncV2());
SingleOutputStreamOperator liveDataStream = beanStream.filter(new FilterFunction() {
@Override
public boolean filter(DataBean bean) throws Exception {
return bean.getEventId().startsWith("live");
}
});
//按照进入直播间的事件ID进行过滤
SingleOutputStreamOperator enterStream = liveDataStream.filter(new FilterFunction() {
@Override
public boolean filter(DataBean value) throws Exception {
return "liveEnter".equals(value.getEventId()) || "liveLeave".equals(value.getEventId());
}
});
//统计各个主播的累计观看人数
//按照主播ID(按照直播间)
//统计各个主播直播间实时在线人数
KeyedStream keyedEnterStream = enterStream.keyBy(bean -> bean.getProperties().get("anchor_id").toString());
AnchorDistinctTotalAudienceFunc anchorFunc = new AnchorDistinctTotalAudienceFunc();
//主流写入clickhouse
SingleOutputStreamOperator mainStream = keyedEnterStream.process(anchorFunc);
//非主流(聚合的数据),写入redis
DataStream> aggDataStream = mainStream.getSideOutput(anchorFunc.getAggTag());
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPassword("123456").setDatabase(5).build();
aggDataStream.addSink(new RedisSink<>(conf, new AudienceCountMapper()));
mainStream.addSink(JdbcSink.sink(
"insert into tb_anchor_audience_count (id, anchorId, deviceId, eventId, province, os, channel, deviceType, eventTime, date, hour) values (?,?,?,?,?,?,?,?,?,?,?)",
(ps, bean) -> {
ps.setString(1, bean.getId());
ps.setString(2, bean.getProperties().get("anchor_id").toString());
ps.setString(3, bean.getDeviceId());
ps.setString(4, bean.getEventId());
ps.setString(5, bean.getProvince());
ps.setString(6, bean.getOsName());
ps.setString(7, bean.getReleaseChannel());
ps.setString(8, bean.getDeviceType());
ps.setLong(9, bean.getTimestamp());
ps.setString(10, bean.getDate());
ps.setString(11, bean.getHour());
},
JdbcExecutionOptions.builder().withBatchSize(FlinkUtils.parameterTool.getInt("clickhouse.batch.size"))
.withBatchIntervalMs(FlinkUtils.parameterTool.getInt("clickhouse.batch.interval"))
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(FlinkUtils.parameterTool.getRequired("clickhouse.url"))
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.build()));
FlinkUtils.env.execute();
}
public static class AudienceCountMapper implements RedisMapper> {
//WORD_COUNT -> {(spark,5), (flink,6)}
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "audience-count");
}
@Override
public String getKeyFromData(Tuple4 tp) {
return tp.f0; //aid_20210321
}
@Override
public String getValueFromData(Tuple4 tp) {
return tp.f1 + "," + tp.f2 + "," + tp.f3;
}
}
}
AnchorDistinctTotalAudienceFunc
package cn._51doit.udf; import cn._51doit.pojo.DataBean; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter; import org.apache.flink.shaded.guava18.com.google.common.hash.Funnels; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.text.SimpleDateFormat; public class AnchorDistinctTotalAudienceFunc extends KeyedProcessFunction{ private transient ValueState uvState; private transient ValueState pvState; private transient ValueState > bloomFilterState; private transient ValueState onLineUserState; private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd-HH"); private OutputTag > aggTag = new OutputTag >("agg-tag"){}; @Override public void open(Configuration parameters) throws Exception { //设置状态的TTL StateTtlConfig stateTtlConfig = StateTtlConfig .newBuilder(Time.hours(6)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .neverReturnExpired() .build(); ValueStateDescriptor uvStateDescriptor = new ValueStateDescriptor<>("uv-state", Integer.class); uvStateDescriptor.enableTimeToLive(stateTtlConfig); ValueStateDescriptor pvStateDescriptor = new ValueStateDescriptor<>("pv-state", Integer.class); pvStateDescriptor.enableTimeToLive(stateTtlConfig); ValueStateDescriptor > bloomFilterStateDescriptor = new ValueStateDescriptor >("bloom-filter-state", TypeInformation.of(new TypeHint >() { })); bloomFilterStateDescriptor.enableTimeToLive(stateTtlConfig); ValueStateDescriptor onLineUserStateDescriptor = new ValueStateDescriptor<>("uv-state", Integer.class); onLineUserStateDescriptor.enableTimeToLive(stateTtlConfig); uvState = getRuntimeContext().getState(uvStateDescriptor); pvState = getRuntimeContext().getState(pvStateDescriptor); bloomFilterState = getRuntimeContext().getState(bloomFilterStateDescriptor); onLineUserState = getRuntimeContext().getState(onLineUserStateDescriptor); } @Override public void processElement(DataBean bean, Context ctx, Collector out) throws Exception { Integer onLineUserCount = onLineUserState.value(); String deviceId = bean.getDeviceId(); Integer uv = uvState.value(); Integer pv = pvState.value(); BloomFilter bloomFilter = bloomFilterState.value(); String eventId = bean.getEventId(); //注册定时器 long currentProcessingTime = ctx.timerService().currentProcessingTime(); long fireTime = currentProcessingTime - currentProcessingTime % 10000 + 10000; ctx.timerService().registerProcessingTimeTimer(fireTime); if(onLineUserCount ==null) { onLineUserCount = 0; } if ("liveEnter".equals(eventId)) { if (bloomFilter == null) { bloomFilter = BloomFilter.create(Funnels.unencodedCharsFunnel(), 1000000); pv = 0; uv = 0; } if (!bloomFilter.mightContain(deviceId)) { bloomFilter.put(deviceId); uv++; bloomFilterState.update(bloomFilter); uvState.update(uv); } pv++; pvState.update(pv); //累计在线人数 onLineUserCount++; onLineUserState.update(onLineUserCount); } else { onLineUserCount--; onLineUserState.update(onLineUserCount); } String format = dateFormat.format(bean.getTimestamp()); String[] fields = format.split("-"); bean.setDate(fields[0]); bean.setHour(fields[1]); //输出明细数据(主流) out.collect(bean); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { String date = dateFormat.format(timestamp).split("-")[0]; //out.collect(Tuple4.of(ctx.getCurrentKey(), uvState.value(), pvState.value(), onLineUserState.value())); ctx.output(aggTag, Tuple4.of(ctx.getCurrentKey() + "_" + date, uvState.value(), pvState.value(), onLineUserState.value())); } public OutputTag > getAggTag() { return aggTag; } }
2、直播人气值计算
在直播间中至少停留1分钟在30分钟之内,同一设备ID频繁进入该直播间,算一个用户的人气值
实现思路:
按照EventTime划分滑动窗口
使用processFunction注册定时器
package cn._51doit.jobs;
import cn._51doit.pojo.DataBean;
import cn._51doit.udf.JsonToBeanFunc;
import cn._51doit.utils.FlinkUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class LiveAudienceCount {
public static void main(String[] args) throws Exception {
DataStream kafkaStream = FlinkUtils.createKafkaStream(args, SimpleStringSchema.class);
SingleOutputStreamOperator beanStream = kafkaStream.process(new JsonToBeanFunc());
KeyedStream> keyed = beanStream.keyBy(new KeySelector>() {
@Override
public Tuple2 getKey(DataBean bean) throws Exception {
String deviceId = bean.getDeviceId();
String anchor_id = bean.getProperties().get("anchor_id").toString();
return Tuple2.of(anchor_id, deviceId);
}
});
keyed.process(new KeyedProcessFunction, DataBean, Tuple2>() {
private transient ValueState inState; //进来对应时间
private transient ValueState outState; //出去对应时间
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor inStateDescriptor = new ValueStateDescriptor<>("in-state", Long.class);
inState = getRuntimeContext().getState(inStateDescriptor);
ValueStateDescriptor outStateDescriptor = new ValueStateDescriptor<>("out-state", Long.class);
outState = getRuntimeContext().getState(outStateDescriptor);
}
@Override
public void processElement(DataBean bean, Context ctx, Collector> out) throws Exception {
Long timestamp = bean.getTimestamp();
String eventId = bean.getEventId();
if ("liveEnter".equals(eventId)) {
inState.update(timestamp);
//添加定时器
ctx.timerService().registerProcessingTimeTimer(timestamp + 60000 + 1);
} else if ("liveLeave".equals(eventId)) {
Long inTime = inState.value();
outState.update(timestamp);
if (timestamp - inTime < 60000) {
//删除定时器
ctx.timerService().deleteProcessingTimeTimer(inTime + 60000 + 1);
}
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector> out) throws Exception {
Long outTime = outState.value();
if (outTime == null) {
out.collect(Tuple2.of(ctx.getCurrentKey().f0, 1));
} else {
Long inTime = inState.value();
if (inTime - outTime > 30 * 6000) {
out.collect(Tuple2.of(ctx.getCurrentKey().f0, 1));
}
}
//14:00 -> 14:29
//15:00
}
});
}
}
3、观看直播人数统计结果保存
建表:
CREATE TABLE tb_user_event2
(
`id` String comment '数据唯一id',
`anchorId` string comment '主播ID',
`deviceId` String comment '用户ID',
`eventId` String comment '事件ID',
`os` String comment '系统名称',
`province` String comment '省份',
`channel` String comment '下载渠道',
`deviceType` String comment '设备类型',
`eventTime` DateTime64 comment '数据中所携带的时间',
`date` String comment 'eventTime转成YYYYMM格式',
`hour` String comment 'eventTime转成HH格式列席',
`processTime` DateTime DEFAULT now()
)
ENGINE = ReplacingMergeTree(processTime)
PARTITION BY (date, hour)
ORDER BY id;
写入代码在上面的方式二
二、打赏礼物需求分析
在MySQL中还有一种礼物表(维表),需要进行关联,关联维表通常的解决方案:
a) 每来一条数据查一次数据库(慢、吞吐量低)
b) 可以使用异步IO(相对快,消耗资源多)
c) 广播State(最快、适用于少量数据、数据可以变化的)
1、按照主播(直播间)统计礼物的积分(抖币)



