栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka Stream

Kafka Stream

                                                                        **Kafka Stream**
  定义:  Kafka Stream是一个无限的,不断更新的数据集。流是一个有序的,可重放(反复的使用),不可变的容错序列,数据记录的格式是键值对(key-value)。
 实现过程:  通过编写一个或多个的计算逻辑的处理器拓扑来实现的.
 两个主要的处理器:    
  源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题  生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。
 Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的          Kafka主题。
处理器处理过程:由源处理器(source processor)将生产者发来的topic以流的形式进行传入,然后中间经过流计算,让sink处理器将流计算的结果发送给相应的消息订阅者.
 存储方式,即流的传参类型:  
 KStream数据流:即是一段顺序的,可以无限长,不断更新的数据集。每一次操作都是向其中插入(insert)新数据。
 KTable传统数据库:存储了大量状态(state)的表格。KTable负责抽象的,就是表状数据。每一次操作,都是更新插入.

使用:
一. 首先引入kafka-streams的依赖
二. 在配置文件中添加kafka的相关配置
三. KafkaStreamConfig的相关配置
1. new 一个map集合将StreamsConfig的bootstrap_servers_config作为键,当前的kafka的地址作为值进行绑定.当前的kafka地址, 可以写进配置文件中,方便管理.
2.以StreamsConfig.application_id_config为键,以当前的服务名为键进行绑定,同样从配置文件中获取.
3.然后将当前的消息转换器serdes也设置进map中
4.还可以设置一下消息副本数量,对其进行一些优化
5.最后new 一个kafkaStreamConfigration将这个map设置进去,然后类上加上enableKafkaStream注解
四. 编写一个监听接口,里边写三个方法,第一个接口是传入处理的topic,第二个是最终传给的消费者主题,最重要的就是流式处理的方法,后边会实现这三个方法进行具体的设置与编写.
五. 编写一个kafkaListenerFactory类,让KafkaStreamListener扫描和实例化成KafkaStreamProcessor.doAction的返回类,完成监听器实际注册的过程.
六. 编写kafkaStreamProcessor 自动处理的包装类,在处理器中构建流构建器,streamBuilder,将流构建器与刚写的监听的方法进行传入,通过泛型类型自动注册对应类型的流处理器对象,就是将刚传入与传出流的topic和流式处理的具体方法进行自动注册.
七.重写监听接口,前两个就将消息生产者与消费者的topic返回就行,重写流式处理接口,比如生产者传入一些以逗号字符串,流式计算中就可以去统计每个单词的个数.处理完之后传给消费者主题.

@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix=“kafka”)
public class KafkaStreamConfig {
private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
private String hosts;
private String group;

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
    Map props = new HashMap<>();
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
    props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
    props.put(StreamsConfig.RETRIES_CONFIG, 10);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    // 消息副本数量
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
    props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 5_000);
    props.put(StreamsConfig.SEND_BUFFER_CONFIG, 3*MAX_MESSAGE_SIZE);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, Topology.AutoOffsetReset.EARLIEST.name().toLowerCase());
    return new KafkaStreamsConfiguration(props);
}

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/751830.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号