栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink 使用 ProcessFunction 处理时间乱序数据

Flink 使用 ProcessFunction 处理时间乱序数据

Flink 使用 ProcessFunction 处理时间乱序数据

时间乱序数据情况实验验证

POM文件代码实现

时间乱序数据情况

由于业务数据采集是获取的数据有时并不能保证数据的顺序传输,错误的数据顺序可能会带来业务的异常。

例如:合并宽表时最后更新的数据比之前更新的数据先到达并被处理,将出现脏数据情况。

{
	// key 里分别表示主键Id、表名、时间
	"key": "id_table1_202203" 
	// ... 表示其他数据,此处省略
	"eventTime" 1648178741000
}
实验验证

使用 kafka 接收数据,flink 处理合并,然后写入到es。

POM文件

	UTF-8
	UTF-8
	1.8
	1.13.2
	2.12



	
		org.apache.flink
		flink-java
		${flink.version}
		provided
	
	
		org.apache.flink
		flink-core
		${flink.version}
	
	
		org.apache.flink
		flink-streaming-java_${scala.binary.version}
		${flink.version}
		provided
	
	
		org.apache.flink
		flink-clients_${scala.binary.version}
		${flink.version}
		provided
	

	
		org.apache.flink
		flink-connector-kafka_${scala.binary.version}
		${flink.version}
		
	

	
		org.apache.flink
		flink-connector-elasticsearch7_${scala.binary.version}
		${flink.version}
		
	

	
		com.alibaba
		fastjson
		1.2.7
	

	
	
		com.alibaba
		fastjson
		1.2.38
	

	
		log4j
		log4j
		1.2.17
	
	
		commons-lang
		commons-lang
		2.6
	
	
		org.apache.commons
		commons-lang3
		3.3.2
	

代码实现

继承 KeyedProcessFunction 实现新的状态存储类,保存数据中记录的更新时间来过滤可能出现的时间乱序问题。当前数据的更新时间小于状态中记录的时间直接抛弃,大于状态中记录的时间传递到下一个节点进行存储。

import java.io.IOException;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.elasticsearch.action.update.UpdateRequest;

import com.alibaba.fastjson.JSONObject;

import MyProcessFunction;

public class DapDataSync4 {

	public static void main(final String[] args) throws Exception {

		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.enableCheckpointing(90000); // checkpoint every 5000 msecs

		// TODO kafka
		KafkaRecordDeserializationSchema> recordDeserializer = new KafkaRecordDeserializationSchema>() {
			// 反序列化 kafka 的 record,我们直接返回一个 tuple2
			@Override
			public void deserialize(final ConsumerRecord record,
					final Collector> out) throws IOException {
				final JSONObject body = JSONObject.parseObject(new String(record.value(), "UTF-8"));
				
				// 获取唯一主键当做状态的key值
				final String keyBuffer = body.getString("key");
				
				out.collect(new Tuple2(keyBuffer, body.toJSONString()));
			}

			// 告诉 Flink 我输入的数据类型, 方便 Flink 的类型推断
			@Override
			public TypeInformation> getProducedType() {
				return new TupleTypeInfo>(BasicTypeInfo.STRING_TYPE_INFO,
						BasicTypeInfo.STRING_TYPE_INFO);
			}

		};

		KafkaSource> source = KafkaSource.>builder()
				.setBootstrapServers("xx.xx.xx.xx:9092")
				.setTopics("test_topics")
				.setGroupId("dap-group")
				.setStartingOffsets(OffsetsInitializer.earliest())
				// 定义 Kafka 源发现新分区的时间间隔(毫秒)
				.setProperty("partition.discovery.interval.ms", "3000")
				.setDeserializer(recordDeserializer)
				// .setValueonlyDeserializer(new SimpleStringSchema())
				// .setDeserializer(KafkaRecordDeserializationSchema.valueonly(StringSerializer.class));
				.build();

		DataStreamSource> sourceStream = env.fromSource(source, WatermarkStrategy.noWatermarks(),
				"Kafka Source");

		KeyedStream, String> keyedStream = sourceStream
				.keyBy(new KeySelector, String>() {
					@Override
					public String getKey(final Tuple2 value) throws Exception {
						return value.f0;
					}
				});

		SingleOutputStreamOperator processStream = keyedStream.process(new MyProcessFunction());

		// TODO ES
		List httpHosts = new ArrayList<>();
		httpHosts.add(new HttpHost("xx.xx.xx.xx", 9200, "http"));
		// use a ElasticsearchSink.Builder to create an ElasticsearchSink
		ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
				new ElasticsearchSinkFunction() {
					private static final long serialVersionUID = 1L;

					public UpdateRequest createUpdateRequest(final String element) {
						//
						final JSONObject body = JSONObject.parseObject(element);
						
						final String id = body.getString("id");

						// 修改对象
						UpdateRequest updateRequest = new UpdateRequest("new_index", id);

						updateRequest.doc(data);
						updateRequest.docAsUpsert(true);
						updateRequest.retryOnConflict(1);
						return updateRequest;
					}

					@Override
					public void process(final String element, final RuntimeContext ctx, final RequestIndexer indexer) {
						indexer.add(createUpdateRequest(element));
					}
				});

		// configuration for the bulk requests; this instructs the sink to emit
		// after every element, otherwise they would be buffered
		esSinkBuilder.setBulkFlushMaxActions(3000);
		esSinkBuilder.setBulkFlushMaxSizeMb(50);
		esSinkBuilder.setBulkFlushInterval(500);

		// provide a RestClientFactory for custom configuration on the
		// internally created REST client
		esSinkBuilder.setRestClientFactory(restClientBuilder -> {
			// restClientBuilder.setDefaultHeaders(...);
			// restClientBuilder.setMaxRetryTimeoutMillis(...);
			// restClientBuilder.setPathPrefix(...);
			// restClientBuilder.setHttpClientConfigCallback(...);
		});

		// finally, build and add the sink to the job's pipeline
		processStream.addSink(esSinkBuilder.build()).name("elasticsearch sink");

		// TODO 启动
		env.execute("kafka to ES test");
	}

}

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import com.alibaba.fastjson.JSONObject;

public class MyProcessFunction extends KeyedProcessFunction, String> {

	private static final long serialVersionUID = 2209685641989059608L;
	
	
	private ValueState state;

	@Override
	public void open(final Configuration parameters) throws Exception {
		state = getRuntimeContext().//
				getState(new ValueStateDescriptor<>("myState", Long.class));
	}

	@Override
	public void processElement(final Tuple2 value, //
			final Context ctx, final Collector out) throws Exception {
		// 获取到状态时间
		Long eventTime = state.value();

		// 获取到当前时间
		final JSONObject body = JSONObject.parseObject(value.f1);
		
		long thisEventTime = body.getLongValue("eventTime");

		if (eventTime == null || eventTime < thisEventTime) {
			eventTime = thisEventTime;
			// write the state back
			state.update(eventTime);

			out.collect(value.f1);
		}

	}
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/780443.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号