栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink(kafka连接器)

flink(kafka连接器)

用法

KafkaSource source = KafkaSource.builder()
        .setBootstrapServers(bootstrapServers)//输入kafka的服务器地址
          .setTopics("kafka-boot","kafka-boot-s")//底层是一个数组可以订阅一个及以上的主题
        .setTopics("kafka-boot")
        .setGroupId("kafka_group_2")
        .setStartingOffsets(OffsetsInitializer.latest())
        .setValueonlyDeserializer(new SimpleStringSchema())
        .build();

主题分区订阅

Kafka源码提供了三种方式

  • 主题列表:订阅所有列表中的主题。
    KafkaSource.builder().setTopics("topic-a", "topic-b")

  • 主题模式:订阅正则表达式能匹配到的主题。
    KafkaSource.builder().setTopicPattern("topic.*")

  • 分区集:订阅提供的分区集中的分区。
    final HashSet partitionSet = new HashSet<>(Arrays.asList(
            new TopicPartition("topic-a", 0),    // Partition 0 of topic "topic-a"
            new TopicPartition("topic-b", 5)));  // Partition 5 of topic "topic-b"
    KafkaSource.builder().setPartitions(partitionSet)

    反序列化程序 

    解析 Kafka 消息需要反序列化器。Deserializer(反序列化模式)可以通过 配置setDeserializer(KafkaRecordDeserializationSchema),其中 KafkaRecordDeserializationSchema定义了如何反序列化 Kafka ConsumerRecord。

    import org.apache.kafka.common.serialization.StringDeserializer;
    KafkaSource.builder().setDeserializer(KafkaRecordDeserializationSchema.valueonly(StringSerializer.class));

    起始偏移量

    Kafka 源可以通过指定从不同的偏移量开始消费消息 OffsetsInitializer。

    KafkaSource.builder()
             // Start from committed offset of the consuming group, without reset strategy
             .setStartingOffsets(OffsetsInitializer.committedOffsets())
              // Start from committed offset, also use EARLIEST as reset strategy if committed offset doesn't exist
              .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
              // 从大于等于此时间戳开始的偏移量开始
              .setStartingOffsets(OffsetsInitializer.timestamp(1592323200L))
              // 从最早的偏移量开始
              .setStartingOffsets(OffsetsInitializer.earliest())
              // 从最新的偏移量开始
              .setStartingOffsets(OffsetsInitializer.latest())

  • 转载请注明:文章转载自 www.mshxw.com
    本文地址:https://www.mshxw.com/it/746238.html
    我们一直用心在做
    关于我们 文章归档 网站地图 联系我们

    版权所有 (c)2021-2022 MSHXW.COM

    ICP备案号:晋ICP备2021003244-6号