最近学习Demo的时候,遇上了一个大坑,SpringBoot中使用Stream,
因为,是多个模块之间相互耦合的,最后折腾了4个小时,终于剥离出来了核心配置依赖,
附上配置和代码,有需要的,请自行修改
POM.XML
4.0.0 org.springframework.boot spring-boot-starter-parent2.6.4 com.example kafka-demo20.0.1-SNAPSHOT kafka-demo2 kafka-demo2 1.8 org.springframework.boot spring-boot-starterorg.springframework.boot spring-boot-starter-loggingorg.slf4j slf4j-log4j12org.projectlombok lomboktrue org.springframework.boot spring-boot-starter-testtest org.springframework.boot spring-boot-starter-log4j2org.springframework.boot spring-boot-starter-weborg.springframework.kafka spring-kafkaorg.apache.kafka kafka-clientsorg.apache.kafka kafka-clientsorg.apache.kafka kafka-streamsconnect-json org.apache.kafka org.apache.kafka kafka-clientsorg.springframework.boot spring-boot-maven-pluginorg.projectlombok lombok
引导类
package com.example.kafkademo2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaDemo2Application {
public static void main(String[] args) {
SpringApplication.run(KafkaDemo2Application.class, args);
}
}
application.yml
spring:
application:
name: kafka-demo
kafka:
bootstrap-servers: 10.147.17.93:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: ${spring.application.name}-test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka:
hosts: 10.147.17.93:9092
group: ${spring.application.name}
消费者
package com.example.kafkademo2.sample;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerQuickStart {
public static void main(String[] args) {
//1.kafka的配置信息
Properties prop = new Properties();
//链接地址
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.147.17.93:9092");
//key和value的反序列化器
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//设置消费者组
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
//手动提交偏移量
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//2.创建消费者对象
KafkaConsumer consumer = new KafkaConsumer(prop);
//3.订阅主题
consumer.subscribe(Collections.singletonList("itcast-topic-out"));
//4.拉取消息
//同步提交和异步提交偏移量
try {
while (true) {
ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key());
System.out.println(consumerRecord.value());
}
//异步提交偏移量
consumer.commitAsync();
}
}catch (Exception e){
e.printStackTrace();
System.out.println("记录错误的信息:"+e);
}finally {
//同步
consumer.commitSync();
}
}
}
生产者
package com.example.kafkademo2.sample;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerQuickStart {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.kafka链接配置信息
Properties prop = new Properties();
//kafka链接地址
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.147.17.93:9092");
//key和value的序列化
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//ack配置 消息确认机制
prop.put(ProducerConfig.ACKS_CONFIG,"all");
//重试次数
prop.put(ProducerConfig.RETRIES_CONFIG,10);
//数据压缩
prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
//2.创建kafka生产者对象
KafkaProducer producer = new KafkaProducer(prop);
//3.发送消息
for (int i = 0; i < 5; i++) {
ProducerRecord kvProducerRecord = new ProducerRecord("itcast-topic-input","hello kafka");
producer.send(kvProducerRecord);
}
//4.关闭消息通道 必须要关闭,否则消息发送不成功
producer.close();
}
}
配置类
package com.example.kafkademo2.config;
import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import java.util.HashMap;
import java.util.Map;
@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
private String hosts;
private String group;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
Map props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
props.put(StreamsConfig.RETRIES_CONFIG, 10);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return new KafkaStreamsConfiguration(props);
}
}
聚合类
package com.example.kafkademo2.stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
import java.util.Arrays;
@Configuration
@Slf4j
public class KafkaStreamHelloListener {
@Bean
public KStream kStream(StreamsBuilder streamsBuilder){
//创建kstream对象,同时指定从那个topic中接收消息
KStream stream = streamsBuilder.stream("itcast-topic-input");
stream.flatMapValues(new ValueMapper>() {
@Override
public Iterable apply(String value) {
return Arrays.asList(value.split(" "));
}
})
//根据value进行聚合分组
.groupBy((key,value)->value)
//聚合计算时间间隔
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
//求单词的个数
.count()
.toStream()
//处理后的结果转换为string字符串
.map((key,value)->{
System.out.println("key:"+key+",value:"+value);
return new KeyValue<>(key.key().toString(),value.toString());
})
//发送消息
.to("itcast-topic-out");
return stream;
}
}



