栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink任务通过RoaringBitmap实现去重

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink任务通过RoaringBitmap实现去重

背景

日常开发中,经常会有一些场景需要进行实时去重,实现方式多种多样,各有利弊,关键在于如何结合自己的业务场景选择适合自己的方式,咱今主要讨论如何通过RoaringBitmap实现实时去重。先来模拟一个简单的业务场景

需求:实时计算每个账户累计消费金额,通过累计消费金额触发后续动作,务必保证结果准确
条件:

  • 数据源为kafka中的binlog日志
  • 账户id和订单id均为bitint类型,金额为int类型
  • 采集端发送到kafka中的binlog保证at-least-once
  • 结果存储在kv存储中
流程梳理

根据上述信息,我们了解到了要做的事,以及做这件事需要注意的问题。整个流程大致如下:

首先从kafka中获取数据,解析数据;需求是要统计每个账户的消费金额,所以根据账户id进行分组统计;由于采集端存在重复发送数据的可能,因此要判断订单是否已经计算过了,确保计算结果准确;在计算每笔新的消费时,需要知道历史消费了多少,所以需要对累加结果进行存储;将计算结果写入外部存储。
大致的流程咱们梳理完了,接下来就是考虑具体怎么实现,主要讨论下如何判断订单id是否处理过。由于账户数和订单数会一直增加且基数庞大,如果使用List或者Set的方式存放订单id进行判断的方法会需要很大的内存空间;需求对结果准确性有严格要求,如果使用布隆过滤器对订单id进行判断的话会存在误判的可能,因为布隆过滤器特性是一定不存在或者可能存在;使用外部存储进行判断需要保证事务;那什么是既节省存储空间,又能保证精确判断,并且不需要进行额外的事务保证呢?一顿分析下来,RoaringBitmap似乎是个不错的选择,不仅满足上述条件,并且扩展了对64位整数的支持。确定实现方式了,咱就开始写!

代码开发 依赖

	org.apache.flink
    flink-java
    ${flink.version}


    org.apache.flink
    flink-streaming-java_2.11
    ${flink.version}


    org.apache.flink
    flink-runtime_2.11
    ${flink.version}


    org.apache.flink
    flink-connector-kafka_2.11
    1.12.0

    org.roaringbitmap
    RoaringBitmap
    0.9.21

代码
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fyb.bigdata.demo.bean.BinlogColumn;
import com.fyb.bigdata.demo.bean.BinlogTable;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.roaringbitmap.longlong.Roaring64Bitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Properties;


public class RoaringBitmapTest {

    private static final Logger LOG = LoggerFactory.getLogger(RoaringBitmapTest.class);

    public static void main(String[] args) throws Exception {

        //Creates an execution environment that represents the context in which the program is currently executed
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //start a checkpoint every 1000 ms
        env.enableCheckpointing(1000);
        //set mode to exactly-once (this is the default)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        //configuration kafka information
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "111.111.111.111:9092");//comma separated list of Kafka brokers
        properties.put("group.id", "RoaringBitmapTest");//consumer group

        //creates a new Kafka streaming source consumer
        DataStream source = env.addSource(
                new FlinkKafkaConsumer<>("simulate-binlog", new SimpleStringSchema(), properties));

        StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.days(2))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        //create StateDescriptor
        ValueStateDescriptor bitmapDescriptor = new ValueStateDescriptor(
                "Roaring64Bitmap",
                TypeInformation.of(new TypeHint() {
                }));
        ValueStateDescriptor priceDescriptor = new ValueStateDescriptor<>(
                "AccountPrice",
                Integer.class
        );
        //set state time-to-live
        bitmapDescriptor.enableTimeToLive(ttlConfig);
        priceDescriptor.enableTimeToLive(ttlConfig);

        //parsing binlog data, group by user id
        ObjectMapper mapper = new ObjectMapper();
        source.map(new MapFunction>() {
            @Override
            public Tuple2 map(String value) throws Exception {
                BinlogTable binlog = mapper.readValue(value, BinlogTable.class);
                List columns = binlog.getColumns();
                Long uid = Long.valueOf(columns.get(0).getValue());
                return new Tuple2<>(uid, binlog);
            }
        })
                .keyBy(0)
                .process(new ProcessFunction, Object>() {

                    private transient ValueState bitmapState;
                    private transient ValueState priceState;

                    @Override
                    public void open(Configuration parameters) {
                        bitmapState = getRuntimeContext().getState(bitmapDescriptor);
                        priceState = getRuntimeContext().getState(priceDescriptor);
                    }

                    @Override
                    public void processElement(Tuple2 value, Context ctx, Collector out) throws Exception {
                        
                        Roaring64Bitmap bitmap = bitmapState.value();
                        if (bitmap == null) {
                            bitmap = new Roaring64Bitmap();
                        }
                        Integer price = priceState.value();
                        if (price == null) {
                            price = 0;
                        }
                        BinlogTable binlog = value.f1;
                        List columns = binlog.getColumns();
                        Long orderId = Long.valueOf(columns.get(1).getValue());
                        Integer orderPrice = Integer.valueOf(columns.get(2).getValue());
                        if (!bitmap.contains(orderId)) {
                            bitmap.addLong(orderId);
                            //accumulation price
                            price += orderPrice;
                            //update state
                            priceState.update(price);
                            bitmapState.update(bitmap);
                        }
                        out.collect(new Tuple2<>(value.f0, price));
                    }
                })
                .print();//Simplified logic, print instead

        //Triggers the program execution
        env.execute();
    }
}

 

binlog的解析方法为了省事直接指定的索引,实际使用时根据自己的binlog模型进行解析,模拟几条数据测试下去重和计算功能,结果正确

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号