栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

Kafka流:从应用程序的每个实例的所有分区中读取

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Kafka流:从应用程序的每个实例的所有分区中读取

将输入主题“ data_in”的分区数更改为1个分区,或使用a

GlobalKtable
从主题中所有分区获取数据,然后可以将其与流一起加入。这样一来,您的应用实例将不再需要位于不同的使用者组中。

该代码将如下所示:

private GlobalKTable<String, theDataList> globalStream() {   // KStream of records from data-in topic using String and theDataSerde deserializers  KStream<String, Data> trashStream = getBuilder().stream("data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));  thrashStream.to("new_data_in"); // by sending to an other topic you're forcing a repartition on that topic  KStream<String, Data> newTrashStream = getBuilder().stream("new_data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));  // Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)  KGroupedStream<String, Data> KGS = newTrashStream.groupByKey();  Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");  materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);// Return a KTable  KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {      if (!value.getValideData())          aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());      else        aggregate.getList().add(value);      return aggregate;  }, materialized)  .to("agg_data_in");  return getBuilder().globalTable("agg_data_in");}

编辑:我编辑了上面的代码,以强制对名为“ new_data_in”的主题进行重新分区。



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/515685.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号