栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

KafkaStreamer简单使用

KafkaStreamer简单使用

方便和我一样的萌新,省去翻文档的时间
下面是官方文档的示例

KafkaStreamer kafkaStreamer = new KafkaStreamer<>();
IgniteDataStreamer stmr = ignite.dataStreamer("myCache"));

// allow overwriting cache data
stmr.allowOverwrite(true);

kafkaStreamer.setIgnite(ignite);//ignite客户端配置参考官方文档
kafkaStreamer.setStreamer(stmr);

// set the topic
kafkaStreamer.setTopic(someKafkaTopic);//见下

// set the number of threads to process Kafka streams
kafkaStreamer.setThreads(4);

// set Kafka consumer configurations
kafkaStreamer.setConsumerConfig(kafkaConsumerConfig);//参考kafka官方文档配置,和Consumer配置一样的

// set extractor
kafkaStreamer.setSingleTupleExtractor(strExtractor);//见下

kafkaStreamer.start();

...

// stop on shutdown
kafkaStreamer.stop();

strm.close();

对示例里函数用法的补充
  • kafkaStreamer.setTopic(someKafkaTopic);

    kafkaStreamer.setTopic(Arrays.asList("topic1","topic2"));

  • kafkaStreamer.setSingleTupleExtractor(strExtractor);

    kafkaStreamer.setSingleTupleExtractor(new StreamSingleTupleExtractor() {
    
        @Override
        public Map.Entry extract(ConsumerRecord record) {
            
            //写自己的数据转换        
            String key=.....
            Int val=...
    
            return new IgniteBiTuple<>(key, val);
        }
    });
    
    
    • 如果一次返回多组数据,使用StreamMultipleTupleExtractor

    • 参考:
      ZeroMqStringSingleTupleExtractor.java
      KafkaIgniteStreamerSelfTest.java

  • stmr.flush()

    接收到的数据不会自动加入cache,此函数在不关闭stmr的情况下加入数据到cache。如果使用stmr.close(),会在关闭之前加入剩余的数据

相关参考

数据流处理 | Apache Ignite技术服务 (ignite-service.cn)

Kafka流处理器 | Apache Ignite技术服务 (ignite-service.cn)

Interface IgniteDataStreamer

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/585217.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号