栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

大数据之实时数仓建设(二)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

大数据之实时数仓建设(二)

如果数据量大,维度多,用keyBy并不方便,建议写到外部实时数仓里,Clickhouse擅长实时查询,flink擅长实时处理。

一、多维度复杂统计(使用Clickhouse)

使用是clickhouse的ReplacingMergeTree,可以将同一个分区中,ID相同的数据进行merge,可以保留最新的数据,可以使用这个特点实现Flink + Clickhouse(勉强)实现数据一致性。

存在的问题:写入到clickhouse中的数据不能立即merge,需要手动optimize或后台自动合并。

解决方案:查询时在表名的后面加上final关键字,就只查最新的数据数据,但是效率变低了。

如何设计clickhouse的表?

1.可以支持维度查询(大宽表)
2.按照时间段进行查询(将时间作为表的字段并且建分区表)
3.可以统计出PV、UV(去重查询)
4.支持分区(按照时间进行分区)
5.支持覆盖(ReplacingMergeTree)(对查询结果准确性要求高的,表名后面加final)
6.如果生成一个唯一的ID (在Kafka中生成唯一的ID,topic+分区+偏移量)
7.相同的数据要进入到相同的分区(按照数据的时间即EventTime进行分区)

1、建表

CREATE TABLE tb_user_event
(
    `id` String comment '数据唯一id,使用Kafka的topic+分区+偏移量', 
    `deviceId` String comment '设备ID',
    `eventId` String comment '事件ID',
    `isNew` UInt8 comment '是否是新用户1为新,0为老',
    `os` String comment '系统名称',
    `province` String comment '省份',
    `channel` String comment '下载渠道',
    `eventTime` DateTime64 comment '数据中所携带的时间',
    `date` String comment 'eventTime转成YYYYMMDD格式',
    `hour` String comment 'eventTime转成HH格式列席',
    `processTime` DateTime comment '插入到数据库时的系统时间'
)
ENGINE = ReplacingMergeTree(processTime)
PARTITION BY (date, hour)
ORDER BY id;

2、自定义Kafka反序列器生成唯一ID

MyKafkaStringDeserializationSchema

package cn._51doit.kafka;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.nio.charset.StandardCharsets;


public class MyKafkaStringDeserializationSchema implements KafkaDeserializationSchema> {


    @Override
    public boolean isEndOfStream(Tuple2 nextElement) {
        return false;
    }

    @Override
    public Tuple2 deserialize(ConsumerRecord record) throws Exception {
        String topic = record.topic();
        int partition = record.partition();
        long offset = record.offset();
        String id = topic + "-" + partition + "-" + offset;
        String value = new String(record.value(), StandardCharsets.UTF_8);
        return Tuple2.of(id, value);
    }

    @Override
    public TypeInformation> getProducedType() {
        return TypeInformation.of(new TypeHint>() {});
    }
}

FlinkUtils新增createKafkaStreamV2

package cn._51doit.utils;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class FlinkUtils {

    public static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    public static ParameterTool parameterTool;

    public static  DataStream createKafkaStream(String[] args, Class> deserializer) throws Exception {

        parameterTool = ParameterTool.fromPropertiesFile(args[0]);

        long checkpointInterval = parameterTool.getLong("checkpoint.interval", 30000L);
        String checkpointPath = parameterTool.getRequired("checkpoint.path");
        env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);

        env.setStateBackend(new RocksDBStateBackend(checkpointPath, true));

        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        List topics = Arrays.asList(parameterTool.getRequired("kafka.input.topics").split(","));

        Properties properties = parameterTool.getProperties();

        //从Kafka中读取数据

        FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(topics, deserializer.newInstance(), properties);

        kafkaConsumer.setCommitOffsetsOnCheckpoints(false);

        return env.addSource(kafkaConsumer);

    }


    
    public static  DataStream createKafkaStreamV2(String[] args, Class> deserializer) throws Exception {

        parameterTool = ParameterTool.fromPropertiesFile(args[0]);

        long checkpointInterval = parameterTool.getLong("checkpoint.interval", 30000L);
        String checkpointPath = parameterTool.getRequired("checkpoint.path");
        env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);
        //env.setStateBackend(new FsStateBackend(checkpointPath));
        env.setStateBackend(new RocksDBStateBackend(checkpointPath, true));
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        List topics = Arrays.asList(parameterTool.getRequired("kafka.input.topics").split(","));

        Properties properties = parameterTool.getProperties();

        //从Kafka中读取数据

        FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(topics, deserializer.newInstance(), properties);

        kafkaConsumer.setCommitOffsetsOnCheckpoints(false);

        return env.addSource(kafkaConsumer);

    }
}

调用TestKafkaId

package cn._51doit.test;

import cn._51doit.kafka.MyKafkaStringDeserializationSchema;
import cn._51doit.utils.FlinkUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;

public class TestKafkaId {


    public static void main(String[] args) throws Exception {

        DataStream> kafkaStream = FlinkUtils.createKafkaStreamV2(args, MyKafkaStringDeserializationSchema.class);

        kafkaStream.print();

        FlinkUtils.env.execute();

    }
}

3、使用JdbcSink将数据写入clickhouse

导入clickhouse驱动


    org.apache.flink
    flink-connector-jdbc_${scala.binary.version}
    ${flink.version}




    ru.yandex.clickhouse
    clickhouse-jdbc
    0.2.4

package cn._51doit.jobs;

import cn._51doit.constant.EventID;
import cn._51doit.kafka.MyKafkaStringDeserializationSchema;
import cn._51doit.pojo.DataBean;
import cn._51doit.udf.IsNewUserFunctionV2;
import cn._51doit.udf.JsonToBeanFunc;
import cn._51doit.udf.JsonToBeanFuncV2;
import cn._51doit.utils.FlinkUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import java.text.SimpleDateFormat;
import java.util.Date;


public class DataToClickhouse {

    public static void main(String[] args) throws Exception {

        DataStream> kafkaStream = FlinkUtils.createKafkaStreamV2(args, MyKafkaStringDeserializationSchema.class);

        //解析数据
        SingleOutputStreamOperator beanStream = kafkaStream.process(new JsonToBeanFuncV2());

        beanStream.map(new MapFunction() {

            private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd-HH");

            @Override
            public DataBean map(DataBean bean) throws Exception {
                Long timestamp = bean.getTimestamp();
                String format = dateFormat.format(new Date(timestamp));
                String[] fields = format.split("-");
                bean.setDate(fields[0]);
                bean.setHour(fields[1]);
                return bean;
            }
        }).addSink(JdbcSink.sink(
                "insert into tb_user_event2 values (?,?,?,?,?,?,?,?,?,?,?,?)",
                (ps, bean) -> {
                    ps.setString(1, bean.getId());
                    ps.setString(2, bean.getDeviceId());
                    ps.setString(3, bean.getEventId());
                    ps.setInt(4, bean.getIsN());
                    ps.setString(5, bean.getProvince());
                    ps.setString(6, bean.getOsName());
                    ps.setString(7, bean.getReleaseChannel());
                    ps.setString(8, bean.getDeviceType());
                    ps.setLong(9, bean.getTimestamp());
                    ps.setString(10, bean.getDate());
                    ps.setString(11, bean.getHour());
                    ps.setString(12, "2021-03-18 00:00:00");
                },
                JdbcExecutionOptions.builder().withBatchSize(FlinkUtils.parameterTool.getInt("clickhouse.batch.size"))
                        .withBatchIntervalMs(FlinkUtils.parameterTool.getInt("clickhouse.batch.interval"))
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(FlinkUtils.parameterTool.getRequired("clickhouse.url"))
                        .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
                        .build()));

        FlinkUtils.env.execute();

    }
}

JsonToBeanFuncV2

package cn._51doit.udf;

import cn._51doit.pojo.DataBean;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;


public class JsonToBeanFuncV2 extends ProcessFunction, DataBean> {

    @Override
    public void processElement(Tuple2 tp, Context ctx, Collector out) throws Exception {

        try {
            String id = tp.f0; //数据唯一ID,topic-partition-offset
            DataBean dataBean = JSON.parseObject(tp.f1, DataBean.class);
            dataBean.setId(id);
            out.collect(dataBean);
        } catch (Exception e) {
            //e.printStackTrace();
            //TODO 将有问题的数据保存起来
        }

    }
}

二、观看直播人数统计需求

a)实时统计累计观众(实时展示,直接用flink keyBy统计)
b)实时统计各个直播间在线人数(实时展示,直接用flink keyBy统计)
c)查看多个维度的明细(将数据写入到clickhouse中)

1、观看直播人数统计实现

实现方式一:
a)将数据来一条就写入到Redis/MySQL或大屏展示(延迟低、但是效率低、对数据库压力大)
b)再写一个job将各种明细数据写入到ClickHouse中(提交了2个job、数据重复计算)

package cn._51doit.jobs;

import cn._51doit.pojo.DataBean;
import cn._51doit.udf.AnchorDistinctTotalAudienceFunc;
import cn._51doit.udf.JsonToBeanFunc;
import cn._51doit.utils.FlinkUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class LiveAudienceCountV2 {
    public static void main(String[] args) throws Exception {

        DataStream kafkaStream = FlinkUtils.createKafkaStream(args, SimpleStringSchema.class);

        SingleOutputStreamOperator beanStream = kafkaStream.process(new JsonToBeanFunc());

        SingleOutputStreamOperator liveDataStream = beanStream.filter(new FilterFunction() {
            @Override
            public boolean filter(DataBean bean) throws Exception {
                return bean.getEventId().startsWith("live");
            }
        });

        //按照进入直播间的事件ID进行过滤
        SingleOutputStreamOperator enterStream = liveDataStream.filter(new FilterFunction() {
            @Override
            public boolean filter(DataBean value) throws Exception {
                return "liveEnter".equals(value.getEventId()) || "liveLeave".equals(value.getEventId());
            }
        });
        //统计各个主播的累计观看人数
        //按照主播ID(按照直播间)
        //统计各个主播直播间实时在线人数
        KeyedStream keyedEnterStream = enterStream.keyBy(bean -> bean.getProperties().get("anchor_id").toString());

        SingleOutputStreamOperator> res = keyedEnterStream.process(new AnchorDistinctTotalAudienceFunc());

        res.print();

        FlinkUtils.env.execute();

    }

}

AnchorDistinctTotalAudienceFunc

package cn._51doit.udf;

import cn._51doit.pojo.DataBean;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter;
import org.apache.flink.shaded.guava18.com.google.common.hash.Funnels;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;


public class AnchorDistinctTotalAudienceFunc extends KeyedProcessFunction> {

    private transient ValueState uvState;
    private transient ValueState pvState;
    private transient ValueState> bloomFilterState;
    private transient ValueState onLineUserState;

    @Override
    public void open(Configuration parameters) throws Exception {

        //设置状态的TTL
        StateTtlConfig stateTtlConfig = StateTtlConfig
                .newBuilder(Time.hours(6))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .neverReturnExpired()
                .build(AnchorDistinctTotalAudienceFunc);

        ValueStateDescriptor uvStateDescriptor = new ValueStateDescriptor<>("uv-state", Integer.class);
        uvStateDescriptor.enableTimeToLive(stateTtlConfig);

        ValueStateDescriptor pvStateDescriptor = new ValueStateDescriptor<>("pv-state", Integer.class);
        pvStateDescriptor.enableTimeToLive(stateTtlConfig);
        ValueStateDescriptor> bloomFilterStateDescriptor =
                new ValueStateDescriptor>("bloom-filter-state", TypeInformation.of(new TypeHint>() {
                }));
        bloomFilterStateDescriptor.enableTimeToLive(stateTtlConfig);

        ValueStateDescriptor onLineUserStateDescriptor = new ValueStateDescriptor<>("uv-state", Integer.class);
        onLineUserStateDescriptor.enableTimeToLive(stateTtlConfig);

        uvState = getRuntimeContext().getState(uvStateDescriptor);
        pvState = getRuntimeContext().getState(pvStateDescriptor);
        bloomFilterState = getRuntimeContext().getState(bloomFilterStateDescriptor);
        onLineUserState = getRuntimeContext().getState(onLineUserStateDescriptor);

    }

    @Override
    public void processElement(DataBean bean, Context ctx, Collector> out) throws Exception {

        Integer onLineUserCount = onLineUserState.value();
        String deviceId = bean.getDeviceId();
        Integer uv = uvState.value();
        Integer pv = pvState.value();
        BloomFilter bloomFilter = bloomFilterState.value();
        String eventId = bean.getEventId();
        if(onLineUserCount ==null) {
            onLineUserCount = 0;
        }
        if ("liveEnter".equals(eventId)) {
            if (bloomFilter == null) {
                bloomFilter = BloomFilter.create(Funnels.unencodedCharsFunnel(), 1000000);
                pv = 0;
                uv = 0;
            }
            if (!bloomFilter.mightContain(deviceId)) {
                bloomFilter.put(deviceId);
                uv++;
                bloomFilterState.update(bloomFilter);
                uvState.update(uv);
            }
            pv++;
            pvState.update(pv);

            //累计在线人数
            onLineUserCount++;
            onLineUserState.update(onLineUserCount);
        } else {
            onLineUserCount--;
            onLineUserState.update(onLineUserCount);
        }
        out.collect(Tuple4.of(ctx.getCurrentKey(), uv, pv, onLineUserCount));
    }
}

实现方式二:
将数据攒起来批量(不是简单的增量聚合,不能使用窗口,而是使用定时器)写入到Redis/MySQL(延迟高、效率高、对数据库的压力小)
在同一个job中,将数据写入到Clickhouse中(同一个主题(类型)的数据尽量在一个job中完成,将不同的数据打上不同的标签,侧流输出,可以节省集群资源。避免数据重复读取和计算)

package cn._51doit.jobs;

import cn._51doit.kafka.MyKafkaStringDeserializationSchema;
import cn._51doit.pojo.DataBean;
import cn._51doit.udf.AnchorDistinctTotalAudienceFunc;
import cn._51doit.udf.JsonToBeanFunc;
import cn._51doit.udf.JsonToBeanFuncV2;
import cn._51doit.utils.FlinkUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;

import java.util.Optional;

public class LiveAudienceCountV2 {
    public static void main(String[] args) throws Exception {

        DataStream> kafkaStream = FlinkUtils.createKafkaStreamV2(args, MyKafkaStringDeserializationSchema.class);

        SingleOutputStreamOperator beanStream = kafkaStream.process(new JsonToBeanFuncV2());

        SingleOutputStreamOperator liveDataStream = beanStream.filter(new FilterFunction() {
            @Override
            public boolean filter(DataBean bean) throws Exception {
                return bean.getEventId().startsWith("live");
            }
        });

        //按照进入直播间的事件ID进行过滤
        SingleOutputStreamOperator enterStream = liveDataStream.filter(new FilterFunction() {
            @Override
            public boolean filter(DataBean value) throws Exception {
                return "liveEnter".equals(value.getEventId()) || "liveLeave".equals(value.getEventId());
            }
        });
        //统计各个主播的累计观看人数
        //按照主播ID(按照直播间)
        //统计各个主播直播间实时在线人数
        KeyedStream keyedEnterStream = enterStream.keyBy(bean -> bean.getProperties().get("anchor_id").toString());

        AnchorDistinctTotalAudienceFunc anchorFunc = new AnchorDistinctTotalAudienceFunc();

        //主流写入clickhouse
        SingleOutputStreamOperator mainStream = keyedEnterStream.process(anchorFunc);

        //非主流(聚合的数据),写入redis
        DataStream> aggDataStream = mainStream.getSideOutput(anchorFunc.getAggTag());


        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPassword("123456").setDatabase(5).build();

        aggDataStream.addSink(new RedisSink<>(conf, new AudienceCountMapper()));

        mainStream.addSink(JdbcSink.sink(
                "insert into tb_anchor_audience_count (id, anchorId, deviceId, eventId, province, os, channel, deviceType, eventTime, date, hour) values (?,?,?,?,?,?,?,?,?,?,?)",
                (ps, bean) -> {
                    ps.setString(1, bean.getId());
                    ps.setString(2, bean.getProperties().get("anchor_id").toString());
                    ps.setString(3, bean.getDeviceId());
                    ps.setString(4, bean.getEventId());
                    ps.setString(5, bean.getProvince());
                    ps.setString(6, bean.getOsName());
                    ps.setString(7, bean.getReleaseChannel());
                    ps.setString(8, bean.getDeviceType());
                    ps.setLong(9, bean.getTimestamp());
                    ps.setString(10, bean.getDate());
                    ps.setString(11, bean.getHour());
                },
                JdbcExecutionOptions.builder().withBatchSize(FlinkUtils.parameterTool.getInt("clickhouse.batch.size"))
                        .withBatchIntervalMs(FlinkUtils.parameterTool.getInt("clickhouse.batch.interval"))
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(FlinkUtils.parameterTool.getRequired("clickhouse.url"))
                        .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
                        .build()));



        FlinkUtils.env.execute();

    }


    public static class AudienceCountMapper implements RedisMapper> {


        //WORD_COUNT -> {(spark,5), (flink,6)}
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "audience-count");
        }

        @Override
        public String getKeyFromData(Tuple4 tp) {
            return tp.f0; //aid_20210321
        }

        @Override
        public String getValueFromData(Tuple4 tp) {
            return tp.f1 + "," + tp.f2 + "," + tp.f3;
        }
    }


}

AnchorDistinctTotalAudienceFunc

package cn._51doit.udf;

import cn._51doit.pojo.DataBean;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter;
import org.apache.flink.shaded.guava18.com.google.common.hash.Funnels;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;


public class AnchorDistinctTotalAudienceFunc extends KeyedProcessFunction {

    private transient ValueState uvState;
    private transient ValueState pvState;
    private transient ValueState> bloomFilterState;
    private transient ValueState onLineUserState;

    private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd-HH");

    private OutputTag> aggTag = new OutputTag>("agg-tag"){};

    @Override
    public void open(Configuration parameters) throws Exception {

        //设置状态的TTL
        StateTtlConfig stateTtlConfig = StateTtlConfig
                .newBuilder(Time.hours(6))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .neverReturnExpired()
                .build();

        ValueStateDescriptor uvStateDescriptor = new ValueStateDescriptor<>("uv-state", Integer.class);
        uvStateDescriptor.enableTimeToLive(stateTtlConfig);

        ValueStateDescriptor pvStateDescriptor = new ValueStateDescriptor<>("pv-state", Integer.class);
        pvStateDescriptor.enableTimeToLive(stateTtlConfig);
        ValueStateDescriptor> bloomFilterStateDescriptor =
                new ValueStateDescriptor>("bloom-filter-state", TypeInformation.of(new TypeHint>() {
                }));
        bloomFilterStateDescriptor.enableTimeToLive(stateTtlConfig);

        ValueStateDescriptor onLineUserStateDescriptor = new ValueStateDescriptor<>("uv-state", Integer.class);
        onLineUserStateDescriptor.enableTimeToLive(stateTtlConfig);

        uvState = getRuntimeContext().getState(uvStateDescriptor);
        pvState = getRuntimeContext().getState(pvStateDescriptor);
        bloomFilterState = getRuntimeContext().getState(bloomFilterStateDescriptor);
        onLineUserState = getRuntimeContext().getState(onLineUserStateDescriptor);

    }


    @Override
    public void processElement(DataBean bean, Context ctx, Collector out) throws Exception {

        Integer onLineUserCount = onLineUserState.value();
        String deviceId = bean.getDeviceId();
        Integer uv = uvState.value();
        Integer pv = pvState.value();
        BloomFilter bloomFilter = bloomFilterState.value();
        String eventId = bean.getEventId();

        //注册定时器
        long currentProcessingTime = ctx.timerService().currentProcessingTime();
        long fireTime = currentProcessingTime - currentProcessingTime % 10000 + 10000;
        ctx.timerService().registerProcessingTimeTimer(fireTime);

        if(onLineUserCount ==null) {
            onLineUserCount = 0;
        }
        if ("liveEnter".equals(eventId)) {
            if (bloomFilter == null) {
                bloomFilter = BloomFilter.create(Funnels.unencodedCharsFunnel(), 1000000);
                pv = 0;
                uv = 0;
            }
            if (!bloomFilter.mightContain(deviceId)) {
                bloomFilter.put(deviceId);
                uv++;
                bloomFilterState.update(bloomFilter);
                uvState.update(uv);
            }
            pv++;
            pvState.update(pv);

            //累计在线人数
            onLineUserCount++;
            onLineUserState.update(onLineUserCount);
        } else {
            onLineUserCount--;
            onLineUserState.update(onLineUserCount);
        }

        String format = dateFormat.format(bean.getTimestamp());
        String[] fields = format.split("-");
        bean.setDate(fields[0]);
        bean.setHour(fields[1]);
        //输出明细数据(主流)
        out.collect(bean);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {

        String date = dateFormat.format(timestamp).split("-")[0];

        //out.collect(Tuple4.of(ctx.getCurrentKey(), uvState.value(), pvState.value(), onLineUserState.value()));

        ctx.output(aggTag, Tuple4.of(ctx.getCurrentKey() + "_" + date, uvState.value(), pvState.value(), onLineUserState.value()));

    }


    public OutputTag> getAggTag() {
        return aggTag;
    }
}

2、直播人气值计算

在直播间中至少停留1分钟在30分钟之内,同一设备ID频繁进入该直播间,算一个用户的人气值

实现思路:
按照EventTime划分滑动窗口
使用processFunction注册定时器

package cn._51doit.jobs;

import cn._51doit.pojo.DataBean;
import cn._51doit.udf.JsonToBeanFunc;
import cn._51doit.utils.FlinkUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class LiveAudienceCount {
    public static void main(String[] args) throws Exception {

        DataStream kafkaStream = FlinkUtils.createKafkaStream(args, SimpleStringSchema.class);

        SingleOutputStreamOperator beanStream = kafkaStream.process(new JsonToBeanFunc());

        KeyedStream> keyed = beanStream.keyBy(new KeySelector>() {
            @Override
            public Tuple2 getKey(DataBean bean) throws Exception {
                String deviceId = bean.getDeviceId();
                String anchor_id = bean.getProperties().get("anchor_id").toString();
                return Tuple2.of(anchor_id, deviceId);
            }
        });

        keyed.process(new KeyedProcessFunction, DataBean, Tuple2>() {

            private transient ValueState inState; //进来对应时间

            private transient ValueState outState; //出去对应时间

            @Override
            public void open(Configuration parameters) throws Exception {

                ValueStateDescriptor inStateDescriptor = new ValueStateDescriptor<>("in-state", Long.class);
                inState = getRuntimeContext().getState(inStateDescriptor);
                ValueStateDescriptor outStateDescriptor = new ValueStateDescriptor<>("out-state", Long.class);
                outState = getRuntimeContext().getState(outStateDescriptor);
            }

            @Override
            public void processElement(DataBean bean, Context ctx, Collector> out) throws Exception {

                Long timestamp = bean.getTimestamp();
                String eventId = bean.getEventId();
                if ("liveEnter".equals(eventId)) {
                    inState.update(timestamp);
					//添加定时器
                    ctx.timerService().registerProcessingTimeTimer(timestamp + 60000 + 1);
                } else if ("liveLeave".equals(eventId)) {
                    Long inTime = inState.value();
                    outState.update(timestamp);
                    if (timestamp - inTime < 60000) {
                        //删除定时器
                        ctx.timerService().deleteProcessingTimeTimer(inTime + 60000 + 1);
                    }

                }

            }

            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector> out) throws Exception {

                Long outTime = outState.value();

                if (outTime == null) {
                    out.collect(Tuple2.of(ctx.getCurrentKey().f0, 1));
                } else {
                    Long inTime = inState.value();
                    if (inTime - outTime > 30 * 6000) {
                        out.collect(Tuple2.of(ctx.getCurrentKey().f0, 1));
                    }
                }
                //14:00 -> 14:29
                //15:00
            }
        });

    }

}

3、观看直播人数统计结果保存

建表:

CREATE TABLE tb_user_event2
(
    `id` String comment '数据唯一id', 
	`anchorId` string  comment '主播ID',
    `deviceId` String comment '用户ID',
    `eventId` String comment '事件ID',
    `os` String comment '系统名称',
    `province` String comment '省份',
    `channel` String comment '下载渠道',
    `deviceType` String comment '设备类型',
    `eventTime` DateTime64 comment '数据中所携带的时间',
    `date` String comment 'eventTime转成YYYYMM格式',
    `hour` String comment 'eventTime转成HH格式列席',
    `processTime` DateTime DEFAULT now()
)
ENGINE = ReplacingMergeTree(processTime)
PARTITION BY (date, hour)
ORDER BY id;

写入代码在上面的方式二

二、打赏礼物需求分析

在MySQL中还有一种礼物表(维表),需要进行关联,关联维表通常的解决方案:

a) 每来一条数据查一次数据库(慢、吞吐量低)
b) 可以使用异步IO(相对快,消耗资源多)
c) 广播State(最快、适用于少量数据、数据可以变化的)

1、按照主播(直播间)统计礼物的积分(抖币)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/781915.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号