将输入主题“ data_in”的分区数更改为1个分区,或使用a
GlobalKtable从主题中所有分区获取数据,然后可以将其与流一起加入。这样一来,您的应用实例将不再需要位于不同的使用者组中。
该代码将如下所示:
private GlobalKTable<String, theDataList> globalStream() { // KStream of records from data-in topic using String and theDataSerde deserializers KStream<String, Data> trashStream = getBuilder().stream("data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde)); thrashStream.to("new_data_in"); // by sending to an other topic you're forcing a repartition on that topic KStream<String, Data> newTrashStream = getBuilder().stream("new_data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde)); // Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream) KGroupedStream<String, Data> KGS = newTrashStream.groupByKey(); Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store"); materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);// Return a KTable KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> { if (!value.getValideData()) aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp()); else aggregate.getList().add(value); return aggregate; }, materialized) .to("agg_data_in"); return getBuilder().globalTable("agg_data_in");}编辑:我编辑了上面的代码,以强制对名为“ new_data_in”的主题进行重新分区。



