一、Maven依赖(生产者消费者一致) pom.xml:本案例有1个生产者(provide),2个消费者(consumer-1,consume-2)
4.0.0 org.springframework.boot spring-boot-starter-parent 2.3.1.RELEASE com.example consumer 0.0.1-SNAPSHOT consumer Demo project for Spring Boot 8 Hoxton.SR5 org.springframework.boot spring-boot-starter-web org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test org.springframework.cloud spring-cloud-stream-binder-kafka org.springframework.cloud spring-cloud-dependencies ${spring-cloud.version} pom import org.springframework.boot spring-boot-maven-plugin org.projectlombok lombok
注意:springboot和springcloud的版本不要太高,特别是springboot版本最好与本文案例一致,否则会出现启动失败情况
二、配置文件(application.yml)-【目前仅启动,下面会增加分组和分区配置】 1. 生产者(provide)server:
port: 7888
spring:
application:
name: producer
cloud:
stream:
kafka:
binder:
brokers: localhost:9092 #Kafka的消息中间件服务器
zk-nodes: localhost:2181 #Zookeeper的节点,如果集群,后面加,号分隔
auto-create-topics: true #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。
bindings:
stream-demo: #这里可以任意写,消费者应与之一致
destination: custom-message-topic #这里可以任意写,消费者应与之一致,消息发往的目的地
content-type: application/json #消息发送的格式,接收端不用指定格式,但是发送端要; 文本则为 text/plain
2. 消费者1(consume-1)
server:
port: 7890
spring:
application:
name: consumer_1
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
zk-nodes: localhost:2181
auto-create-topics: true
bindings:
stream-demo:
destination: custom-message-topic
content-type: application/json
3. 消费者2(consume-2)
server:
port: 7889
spring:
application:
name: consumer_1
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
zk-nodes: localhost:2181
auto-create-topics: true
bindings:
stream-demo:
destination: custom-message-topic
content-type: application/json
三、测试代码
1.生产者
StreamClient(必须要有):
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface StreamClient {
String STREAM_DEMO = "stream-demo";
@Output(StreamClient.STREAM_DEMO)
MessageChannel streamDateOut();
}
TestController(测试发送):
import com.example.demo.stream.StreamClient;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class TestController {
@Resource
private StreamClient streamClient;
@GetMapping("/produce")
public String produce() {
for (int i = 0; i < 100; i++) {
streamClient.streamDateOut().send(MessageBuilder.withPayload("aaaaaa" + i).build());
}
return "aaa";
}
}
2.消费者(消费者1和2代码一致)
StreamClient(必须要有,注意与生产者有区别):
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface StreamClient {
String STREAM_DEMO = "stream-demo";
@Input(StreamClient.STREAM_DEMO)
SubscribableChannel streamDateInput(); //SubscribableChannel与生产者处写的不一样,此处为接收信息
}
ReceiveData(接收信息):
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
@Slf4j
@EnableBinding(StreamClient.class)
public class ReceiveData {
@StreamListener(StreamClient.STREAM_DEMO)
public void consume(String message) {
log.info("接收消息:{}", message);
}
}
四、运行测试至此,基本收发已经配置完毕,接下来就是测试
1、调用接口http://localhost:7888/produce
2、后台接收到数据
五、分组(解决重复消费问题) 1、概念:多运行几遍,是否发现有问题:两个消费者都对消息进行了消费,那这样不就会产生重复消费吗?
组内只有1个实例消费。如果不设置group,则stream会自动为每个实例创建匿名且独立的group——于是每个实例都会参与消费。若不在一个消费组,则都会消费消息。
2、应用场景:3、小结:订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况,这时我们就可以使用Stream中的消息分组来解决。
注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。
不同组是可以全面消费的(重复消费),同一组内会发送竞争关系,只有其中一个可以消费。
若group相同,则组内只会有一个实例消费;若不同,则每个实例都消费同一个topic内容
4、实践测试生产者:不需要配置消费者1:
server:
port: 7890
kafka:
group: lyq
spring:
application:
name: consumer_1
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
zk-nodes: localhost:2181
auto-create-topics: true
bindings:
stream-demo:
destination: custom-message-topic
content-type: application/json
group: ${kafka.group}
消费者2:
server:
port: 7889
kafka:
group: lyq
spring:
application:
name: consumer_1
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
zk-nodes: localhost:2181
auto-create-topics: true
bindings:
stream-demo:
destination: custom-message-topic4
content-type: application/json
group: ${kafka.group}
六、结果
略.(自行测试)
多执行几次,会出现问题:重复消费是没有了,但是每一次都是同一个消费者消费
原因:启动消费者时是否注意到:
消费者1:
消费者2:
因为kafka默认其分区数量为1,而每个分区从属的消费实例最多仅能1个【具体关于kafka的分区自行百度】
生产者:
server:
port: 7888
kafka:
group: lyq
spring:
application:
name: producer
cloud:
stream:
kafka:
binder:
brokers: localhost:9092 #Kafka的消息中间件服务器
zk-nodes: localhost:2181 #Zookeeper的节点,如果集群,后面加,号分隔
auto-create-topics: true #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。
auto-add-partitions: true # 当partition-count设置的值超过原来设置的值,true=自动创建分区
bindings:
stream-demo: #这里用stream给我们提供的默认output,后面会讲到自定义output
destination: custom-message-topic4 #消息发往的目的地
content-type: application/json #消息发送的格式,接收端不用指定格式,但是发送端要; 文本则为 text/plain
producer:
# 分区的数量(默认为1)
partition-count: 3
消费者不需配置
八、配置参数启动运行即可,结果懒得演示,累了
Spring Cloud Stream配置参数详解



