日常开发中,经常会有一些场景需要进行实时去重,实现方式多种多样,各有利弊,关键在于如何结合自己的业务场景选择适合自己的方式,咱今主要讨论如何通过RoaringBitmap实现实时去重。先来模拟一个简单的业务场景
流程梳理需求:实时计算每个账户累计消费金额,通过累计消费金额触发后续动作,务必保证结果准确
条件:
- 数据源为kafka中的binlog日志
- 账户id和订单id均为bitint类型,金额为int类型
- 采集端发送到kafka中的binlog保证at-least-once
- 结果存储在kv存储中
根据上述信息,我们了解到了要做的事,以及做这件事需要注意的问题。整个流程大致如下:
首先从kafka中获取数据,解析数据;需求是要统计每个账户的消费金额,所以根据账户id进行分组统计;由于采集端存在重复发送数据的可能,因此要判断订单是否已经计算过了,确保计算结果准确;在计算每笔新的消费时,需要知道历史消费了多少,所以需要对累加结果进行存储;将计算结果写入外部存储。
大致的流程咱们梳理完了,接下来就是考虑具体怎么实现,主要讨论下如何判断订单id是否处理过。由于账户数和订单数会一直增加且基数庞大,如果使用List或者Set的方式存放订单id进行判断的方法会需要很大的内存空间;需求对结果准确性有严格要求,如果使用布隆过滤器对订单id进行判断的话会存在误判的可能,因为布隆过滤器特性是一定不存在或者可能存在;使用外部存储进行判断需要保证事务;那什么是既节省存储空间,又能保证精确判断,并且不需要进行额外的事务保证呢?一顿分析下来,RoaringBitmap似乎是个不错的选择,不仅满足上述条件,并且扩展了对64位整数的支持。确定实现方式了,咱就开始写!
代码org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_2.11 ${flink.version} org.apache.flink flink-runtime_2.11 ${flink.version} org.apache.flink flink-connector-kafka_2.11 1.12.0 org.roaringbitmap RoaringBitmap 0.9.21
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fyb.bigdata.demo.bean.BinlogColumn;
import com.fyb.bigdata.demo.bean.BinlogTable;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.roaringbitmap.longlong.Roaring64Bitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Properties;
public class RoaringBitmapTest {
private static final Logger LOG = LoggerFactory.getLogger(RoaringBitmapTest.class);
public static void main(String[] args) throws Exception {
//Creates an execution environment that represents the context in which the program is currently executed
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
//set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//configuration kafka information
Properties properties = new Properties();
properties.put("bootstrap.servers", "111.111.111.111:9092");//comma separated list of Kafka brokers
properties.put("group.id", "RoaringBitmapTest");//consumer group
//creates a new Kafka streaming source consumer
DataStream source = env.addSource(
new FlinkKafkaConsumer<>("simulate-binlog", new SimpleStringSchema(), properties));
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(2))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
//create StateDescriptor
ValueStateDescriptor bitmapDescriptor = new ValueStateDescriptor(
"Roaring64Bitmap",
TypeInformation.of(new TypeHint() {
}));
ValueStateDescriptor priceDescriptor = new ValueStateDescriptor<>(
"AccountPrice",
Integer.class
);
//set state time-to-live
bitmapDescriptor.enableTimeToLive(ttlConfig);
priceDescriptor.enableTimeToLive(ttlConfig);
//parsing binlog data, group by user id
ObjectMapper mapper = new ObjectMapper();
source.map(new MapFunction>() {
@Override
public Tuple2 map(String value) throws Exception {
BinlogTable binlog = mapper.readValue(value, BinlogTable.class);
List columns = binlog.getColumns();
Long uid = Long.valueOf(columns.get(0).getValue());
return new Tuple2<>(uid, binlog);
}
})
.keyBy(0)
.process(new ProcessFunction, Object>() {
private transient ValueState bitmapState;
private transient ValueState priceState;
@Override
public void open(Configuration parameters) {
bitmapState = getRuntimeContext().getState(bitmapDescriptor);
priceState = getRuntimeContext().getState(priceDescriptor);
}
@Override
public void processElement(Tuple2 value, Context ctx, Collector
binlog的解析方法为了省事直接指定的索引,实际使用时根据自己的binlog模型进行解析,模拟几条数据测试下去重和计算功能,结果正确



