用法
KafkaSourcesource = KafkaSource. builder() .setBootstrapServers(bootstrapServers)//输入kafka的服务器地址 .setTopics("kafka-boot","kafka-boot-s")//底层是一个数组可以订阅一个及以上的主题 .setTopics("kafka-boot") .setGroupId("kafka_group_2") .setStartingOffsets(OffsetsInitializer.latest()) .setValueonlyDeserializer(new SimpleStringSchema()) .build();
主题分区订阅
Kafka源码提供了三种方式
KafkaSource.builder().setTopics("topic-a", "topic-b")
KafkaSource.builder().setTopicPattern("topic.*")
final HashSetpartitionSet = new HashSet<>(Arrays.asList( new TopicPartition("topic-a", 0), // Partition 0 of topic "topic-a" new TopicPartition("topic-b", 5))); // Partition 5 of topic "topic-b" KafkaSource.builder().setPartitions(partitionSet)
反序列化程序
解析 Kafka 消息需要反序列化器。Deserializer(反序列化模式)可以通过 配置setDeserializer(KafkaRecordDeserializationSchema),其中 KafkaRecordDeserializationSchema定义了如何反序列化 Kafka ConsumerRecord。
import org.apache.kafka.common.serialization.StringDeserializer; KafkaSource.builder().setDeserializer(KafkaRecordDeserializationSchema.valueonly(StringSerializer.class));
起始偏移量
Kafka 源可以通过指定从不同的偏移量开始消费消息 OffsetsInitializer。
KafkaSource.builder()
// Start from committed offset of the consuming group, without reset strategy
.setStartingOffsets(OffsetsInitializer.committedOffsets())
// Start from committed offset, also use EARLIEST as reset strategy if committed offset doesn't exist
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
// 从大于等于此时间戳开始的偏移量开始
.setStartingOffsets(OffsetsInitializer.timestamp(1592323200L))
// 从最早的偏移量开始
.setStartingOffsets(OffsetsInitializer.earliest())
// 从最新的偏移量开始
.setStartingOffsets(OffsetsInitializer.latest())



