时间乱序数据情况实验验证
POM文件代码实现
时间乱序数据情况由于业务数据采集是获取的数据有时并不能保证数据的顺序传输,错误的数据顺序可能会带来业务的异常。
例如:合并宽表时最后更新的数据比之前更新的数据先到达并被处理,将出现脏数据情况。
{
// key 里分别表示主键Id、表名、时间
"key": "id_table1_202203"
// ... 表示其他数据,此处省略
"eventTime" 1648178741000
}
实验验证
使用 kafka 接收数据,flink 处理合并,然后写入到es。
POM文件代码实现UTF-8 UTF-8 1.8 1.13.2 2.12 org.apache.flink flink-java ${flink.version} provided org.apache.flink flink-core ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} provided org.apache.flink flink-clients_${scala.binary.version} ${flink.version} provided org.apache.flink flink-connector-kafka_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-elasticsearch7_${scala.binary.version} ${flink.version} com.alibaba fastjson 1.2.7 com.alibaba fastjson 1.2.38 log4j log4j 1.2.17 commons-lang commons-lang 2.6 org.apache.commons commons-lang3 3.3.2
继承 KeyedProcessFunction 实现新的状态存储类,保存数据中记录的更新时间来过滤可能出现的时间乱序问题。当前数据的更新时间小于状态中记录的时间直接抛弃,大于状态中记录的时间传递到下一个节点进行存储。
import java.io.IOException;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.elasticsearch.action.update.UpdateRequest;
import com.alibaba.fastjson.JSONObject;
import MyProcessFunction;
public class DapDataSync4 {
public static void main(final String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(90000); // checkpoint every 5000 msecs
// TODO kafka
KafkaRecordDeserializationSchema> recordDeserializer = new KafkaRecordDeserializationSchema>() {
// 反序列化 kafka 的 record,我们直接返回一个 tuple2
@Override
public void deserialize(final ConsumerRecord record,
final Collector> out) throws IOException {
final JSONObject body = JSONObject.parseObject(new String(record.value(), "UTF-8"));
// 获取唯一主键当做状态的key值
final String keyBuffer = body.getString("key");
out.collect(new Tuple2(keyBuffer, body.toJSONString()));
}
// 告诉 Flink 我输入的数据类型, 方便 Flink 的类型推断
@Override
public TypeInformation> getProducedType() {
return new TupleTypeInfo>(BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
}
};
KafkaSource> source = KafkaSource.>builder()
.setBootstrapServers("xx.xx.xx.xx:9092")
.setTopics("test_topics")
.setGroupId("dap-group")
.setStartingOffsets(OffsetsInitializer.earliest())
// 定义 Kafka 源发现新分区的时间间隔(毫秒)
.setProperty("partition.discovery.interval.ms", "3000")
.setDeserializer(recordDeserializer)
// .setValueonlyDeserializer(new SimpleStringSchema())
// .setDeserializer(KafkaRecordDeserializationSchema.valueonly(StringSerializer.class));
.build();
DataStreamSource> sourceStream = env.fromSource(source, WatermarkStrategy.noWatermarks(),
"Kafka Source");
KeyedStream, String> keyedStream = sourceStream
.keyBy(new KeySelector, String>() {
@Override
public String getKey(final Tuple2 value) throws Exception {
return value.f0;
}
});
SingleOutputStreamOperator processStream = keyedStream.process(new MyProcessFunction());
// TODO ES
List httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("xx.xx.xx.xx", 9200, "http"));
// use a ElasticsearchSink.Builder to create an ElasticsearchSink
ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
new ElasticsearchSinkFunction() {
private static final long serialVersionUID = 1L;
public UpdateRequest createUpdateRequest(final String element) {
//
final JSONObject body = JSONObject.parseObject(element);
final String id = body.getString("id");
// 修改对象
UpdateRequest updateRequest = new UpdateRequest("new_index", id);
updateRequest.doc(data);
updateRequest.docAsUpsert(true);
updateRequest.retryOnConflict(1);
return updateRequest;
}
@Override
public void process(final String element, final RuntimeContext ctx, final RequestIndexer indexer) {
indexer.add(createUpdateRequest(element));
}
});
// configuration for the bulk requests; this instructs the sink to emit
// after every element, otherwise they would be buffered
esSinkBuilder.setBulkFlushMaxActions(3000);
esSinkBuilder.setBulkFlushMaxSizeMb(50);
esSinkBuilder.setBulkFlushInterval(500);
// provide a RestClientFactory for custom configuration on the
// internally created REST client
esSinkBuilder.setRestClientFactory(restClientBuilder -> {
// restClientBuilder.setDefaultHeaders(...);
// restClientBuilder.setMaxRetryTimeoutMillis(...);
// restClientBuilder.setPathPrefix(...);
// restClientBuilder.setHttpClientConfigCallback(...);
});
// finally, build and add the sink to the job's pipeline
processStream.addSink(esSinkBuilder.build()).name("elasticsearch sink");
// TODO 启动
env.execute("kafka to ES test");
}
}
import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import com.alibaba.fastjson.JSONObject; public class MyProcessFunction extends KeyedProcessFunction, String> { private static final long serialVersionUID = 2209685641989059608L; private ValueState state; @Override public void open(final Configuration parameters) throws Exception { state = getRuntimeContext().// getState(new ValueStateDescriptor<>("myState", Long.class)); } @Override public void processElement(final Tuple2 value, // final Context ctx, final Collector out) throws Exception { // 获取到状态时间 Long eventTime = state.value(); // 获取到当前时间 final JSONObject body = JSONObject.parseObject(value.f1); long thisEventTime = body.getLongValue("eventTime"); if (eventTime == null || eventTime < thisEventTime) { eventTime = thisEventTime; // write the state back state.update(eventTime); out.collect(value.f1); } } }



