**Kafka Stream** 定义: Kafka Stream是一个无限的,不断更新的数据集。流是一个有序的,可重放(反复的使用),不可变的容错序列,数据记录的格式是键值对(key-value)。 实现过程: 通过编写一个或多个的计算逻辑的处理器拓扑来实现的. 两个主要的处理器: 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题 生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。 Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的 Kafka主题。 处理器处理过程:由源处理器(source processor)将生产者发来的topic以流的形式进行传入,然后中间经过流计算,让sink处理器将流计算的结果发送给相应的消息订阅者. 存储方式,即流的传参类型: KStream数据流:即是一段顺序的,可以无限长,不断更新的数据集。每一次操作都是向其中插入(insert)新数据。 KTable传统数据库:存储了大量状态(state)的表格。KTable负责抽象的,就是表状数据。每一次操作,都是更新插入.
使用:
一. 首先引入kafka-streams的依赖
二. 在配置文件中添加kafka的相关配置
三. KafkaStreamConfig的相关配置
1. new 一个map集合将StreamsConfig的bootstrap_servers_config作为键,当前的kafka的地址作为值进行绑定.当前的kafka地址, 可以写进配置文件中,方便管理.
2.以StreamsConfig.application_id_config为键,以当前的服务名为键进行绑定,同样从配置文件中获取.
3.然后将当前的消息转换器serdes也设置进map中
4.还可以设置一下消息副本数量,对其进行一些优化
5.最后new 一个kafkaStreamConfigration将这个map设置进去,然后类上加上enableKafkaStream注解
四. 编写一个监听接口,里边写三个方法,第一个接口是传入处理的topic,第二个是最终传给的消费者主题,最重要的就是流式处理的方法,后边会实现这三个方法进行具体的设置与编写.
五. 编写一个kafkaListenerFactory类,让KafkaStreamListener扫描和实例化成KafkaStreamProcessor.doAction的返回类,完成监听器实际注册的过程.
六. 编写kafkaStreamProcessor 自动处理的包装类,在处理器中构建流构建器,streamBuilder,将流构建器与刚写的监听的方法进行传入,通过泛型类型自动注册对应类型的流处理器对象,就是将刚传入与传出流的topic和流式处理的具体方法进行自动注册.
七.重写监听接口,前两个就将消息生产者与消费者的topic返回就行,重写流式处理接口,比如生产者传入一些以逗号字符串,流式计算中就可以去统计每个单词的个数.处理完之后传给消费者主题.
@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix=“kafka”)
public class KafkaStreamConfig {
private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
private String hosts;
private String group;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
Map props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
props.put(StreamsConfig.RETRIES_CONFIG, 10);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 消息副本数量
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 5_000);
props.put(StreamsConfig.SEND_BUFFER_CONFIG, 3*MAX_MESSAGE_SIZE);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, Topology.AutoOffsetReset.EARLIEST.name().toLowerCase());
return new KafkaStreamsConfiguration(props);
}
}



