- 下载与安装 Kafka
- 8.4.1 SpringCloud Sleuth 整合 Zipkin 实现分布式链路跟踪、收集
Maven 依赖
4.0.0 com.edcode.ecommerce edcode-study-scacommerce 1.0-SNAPSHOT jar org.springframework.boot spring-boot-starter-parent 2.3.1.RELEASE edcode-study-scacommerce Edcode SpringBoot org.springframework.boot spring-boot-starter-web org.projectlombok lombok true org.springframework.boot spring-boot-starter-actuator org.springframework.boot spring-boot-starter-test test org.springframework.cloud spring-cloud-context 2.2.6.RELEASE org.springframework.kafka spring-kafka 2.5.0.RELEASE org.apache.commons commons-lang3 3.11 ${artifactId} org.springframework.boot spring-boot-maven-plugin repackage
resources 配置文件 bootstrap.yml
spring:
profiles:
active: dev
application:
name: edcode-study-scacommerce
# 暴露端点
management:
endpoints:
web:
exposure:
include: '*'
endpoint:
health:
show-details: always
resources 配置文件 application-dev.yml
server:
port: 8001
servlet:
context-path: /edcode-study-scacommerce-dev
spring:
# SpringBoot 集成 Kafka 的配置, 最低配置只需要配置 spring.kafka.bootstrap-servers
kafka:
bootstrap-servers: 192.168.3.250:9092
# consumer:
# 如果 Consumer 没有指定 group-id, 则使用配置文件中配置的; 如果配置文件中也没有定义, 则由框架随机生成
# group-id: imooc-study-ecommerce
# auto-offset-reset: latest
# key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# producer:
# key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value-serializer: org.apache.kafka.common.serialization.StringSerializer
通过代码自定义 Kafka 配置
package com.edcode.ecommerce.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory producerFactory() {
Map configs = new HashMap<>(16);
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configs);
}
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory consumerFactory() {
Map props = new HashMap<>(16);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
// 并发数就是一个消费者实例起几个线程
factory.setConcurrency(3);
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
13.2.1.1 kafka 生产者如果复杂配置使用代码更直观,如果简单配置使用 yaml 更方便
package com.edcode.ecommerce.kafka;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate kafkaTemplate;
public void sendMessage(String key, String value, String topic) {
if (StringUtils.isBlank(value) || StringUtils.isBlank(topic)) {
throw new IllegalArgumentException("value 或 topic为null或空");
}
ListenableFuture> future = StringUtils.isBlank(key) ?
kafkaTemplate.send(topic, value) : kafkaTemplate.send(topic, key, value);
// 异步回调的方式获取通知
future.addCallback(
success -> {
assert null != success && null != success.getRecordmetadata();
// 发送到 kafka 的 topic
String _topic = success.getRecordmetadata().topic();
// 消息发送到的分区
int partition = success.getRecordmetadata().partition();
// 消息在分区内的 offset
long offset = success.getRecordmetadata().offset();
log.info("发送 kafka 信息成功: [{}], [{}], [{}]",
_topic, partition, offset);
}, failure -> {
log.error("发送 kafka 信息失败: [{}], [{}], [{}]",
key, value, topic);
}
);
// 同步等待的方式获取通知
try {
// SendResult sendResult = future.get();
SendResult sendResult = future.get(5, TimeUnit.SECONDS);
// 发送到 kafka 的 topic
String _topic = sendResult.getRecordmetadata().topic();
// 消息发送到的分区
int partition = sendResult.getRecordmetadata().partition();
// 消息在分区内的 offset
long offset = sendResult.getRecordmetadata().offset();
log.info("发送 kafka 信息成功: [{}], [{}], [{}]",
_topic, partition, offset);
} catch (Exception ex) {
log.error("发送 kafka 信息失败: [{}], [{}], [{}]",
key, value, topic);
}
}
}
13.3.1.1 kafka 消费者
通过 Kafka 传递的消息对象VO
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageVo {
private Integer id;
private String projectName;
}
Kafka 消费者
package com.edcode.ecommerce.kafka;
import com.edcode.ecommerce.vo.MessageVo;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.RequiredArgsConstructor;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaConsumer {
private final ObjectMapper mapper;
@KafkaListener(topics = { "edcode-springboot" }, groupId = "edcode-springboot-kafka")
public void listener01(ConsumerRecord consumerRecord) throws JsonProcessingException {
String key = consumerRecord.key();
String value = consumerRecord.value();
MessageVo kafkaMessage = mapper.readValue(value, MessageVo.class);
log.info("监听器01 消费 kafka 信息: [{}], [{}]", key, mapper.writevalueAsString(kafkaMessage));
}
@KafkaListener(topics = { "edcode-springboot" }, groupId = "edcode-springboot-kafka-1")
public void listener02(ConsumerRecord, ?> consumerRecord) throws JsonProcessingException {
String key = (String) consumerRecord.key();
Optional> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
MessageVo messageVo = mapper.readValue(message.toString(), MessageVo.class);
log.info("监听器02 消费 kafka 信息: [{}], [{}]", key, mapper.writevalueAsString(messageVo));
}
}
}
SpringBoot 集成 kafka 发送消息
package com.edcode.ecommerce.controller;
import com.edcode.ecommerce.kafka.KafkaProducer;
import com.edcode.ecommerce.vo.MessageVo;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
@RequestMapping("/kafka")
@RequiredArgsConstructor
public class KafkaController {
private final ObjectMapper mapper;
private final KafkaProducer kafkaProducer;
@GetMapping("/send-message")
public void sendMessage(@RequestParam(required = false) String key,
@RequestParam String topic) throws Exception {
MessageVo message = new MessageVo(
1,
"EdCode-Study-ScaCommerce"
);
kafkaProducer.sendMessage(key, mapper.writevalueAsString(message), topic);
}
}
kafka-controller.http
### kafka-send-message GET http://127.0.0.1:8001/edcode-study-scacommerce-dev/kafka/send-message?key=edcode&topic=edcode-springboot Content-Type: application/json ### kafka-send-message GET http://127.0.0.1:8001/edcode-study-scacommerce-dev/kafka/send-message?topic=edcode-springboot Content-Type: application/json ###
日志打印
2021-12-14 11:52:46.908 INFO 20556 --- [nio-8001-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
2021-12-14 11:52:46.908 INFO 20556 --- [nio-8001-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2021-12-14 11:52:46.908 INFO 20556 --- [nio-8001-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1639453966907
2021-12-14 11:52:46.934 INFO 20556 --- [ad | producer-1] org.apache.kafka.clients.metadata : [Producer clientId=producer-1] Cluster ID: gliuLj3bTNSg2ht1Cn32Dg
2021-12-14 11:52:46.973 INFO 20556 --- [ad | producer-1] c.edcode.ecommerce.kafka.KafkaProducer : 发送 kafka 信息成功: [edcode-springboot], [0], [1]
2021-12-14 11:52:46.973 INFO 20556 --- [nio-8001-exec-2] c.edcode.ecommerce.kafka.KafkaProducer : 发送 kafka 信息成功: [edcode-springboot], [0], [1]
2021-12-14 11:52:47.005 INFO 20556 --- [ntainer#0-0-C-1] c.edcode.ecommerce.kafka.KafkaConsumer : 监听器02 消费 kafka 信息: [edcode], [{"id":1,"projectName":"EdCode-Study-ScaCommerce"}]
2021-12-14 11:52:47.005 INFO 20556 --- [ntainer#1-0-C-1] c.edcode.ecommerce.kafka.KafkaConsumer : 监听器01 消费 kafka 信息: [edcode], [{"id":1,"projectName":"EdCode-Study-ScaCommerce"}]
13.4.1 SpringBoot集成RocketMQ构建消息驱动微服务
- 下载与安装RocketMQ
- 下载 RocketMQ:https://dlcdn.apache.org/rocketmq/4.9.2/rocketmq-all-4.9.2-source-release.zip
- RocketMQ 快速开始:https://rocketmq.apache.org/docs/quick-start/
- 下载以 bin-release 结尾的 zip 包解压即可完成安装
构建二进制
unzip rocketmq-all-4.9.2-source-release.zip cd rocketmq-all-4.9.2/ mvn -Prelease-all -DskipTests clean install -U # cd distribution/target/rocketmq-4.9.2/rocketmq-4.9.2 ln -s /opt/rocketmq-all-4.9.2/distribution/target/rocketmq-4.9.2/rocketmq-4.9.2 /opt/rocketmq-4.9.2 cd rocketmq-4.9.2/
启动 NameServer
nohup sh bin/mqnamesrv >/dev/null 2>&1 &
runbroker.sh 和 runserver.sh 可以 jvm 参数,默认内存比较大,个人测试可以适当调正
启动 Broker
nohup sh bin/mqbroker -n 192.168.3.250:9876 >/dev/null 2>&1 &
测试发送
export NAMESRV_ADDR=192.168.3.250:9876 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
关闭
[root@zk1 rocketmq-4.9.2]# sh bin/mqshutdown broker The mqbroker(10613) is running... Send shutdown request to mqbroker(10613) OK [root@zk1 rocketmq-4.9.2]# sh bin/mqshutdown namesrv The mqnamesrv(10271) is running... Send shutdown request to mqnamesrv(10271) OK13.4.1.2 通过 RocketMQ 生产者
package com.edcode.ecommerce.rocketmq;
import com.alibaba.fastjson.JSON;
import com.edcode.ecommerce.vo.MessageVo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class RocketMQProducer {
private static final String TOPIC = "edcode-study-rocketmq";
private final RocketMQTemplate rocketMQTemplate;
public void sendMessageWithValue(String value) {
// 随机选择一个 Topic 的 Message Queue 发送消息
SendResult sendResult = rocketMQTemplate.syncSend(TOPIC, value);
log.info("sendMessageWithValue result: [{}]", JSON.toJSONString(sendResult));
SendResult sendResultOrderly = rocketMQTemplate.syncSendOrderly(TOPIC, value, "Eddie");
log.info("sendMessageWithValue orderly result: [{}]", JSON.toJSONString(sendResultOrderly));
}
public void sendMessageWithKey(String key, String value) {
Message message = MessageBuilder.withPayload(value).setHeader(RocketMQHeaders.KEYS, key).build();
// 异步发送消息, 并设定回调
rocketMQTemplate.asyncSend(TOPIC, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("sendMessageWithKey success result: [{}]", JSON.toJSONString(sendResult));
}
@Override
public void onException(Throwable e) {
log.error("sendMessageWithKey failure: [{}]", e.getMessage(), e);
}
});
}
public void sendMessageWithTag(String tag, String value) {
MessageVo qinyiMessage = JSON.parseObject(value, MessageVo.class);
SendResult sendResult = rocketMQTemplate.syncSend(String.format("%s:%s", TOPIC, tag), qinyiMessage);
log.info("sendMessageWithTag result: [{}]", JSON.toJSONString(sendResult));
}
public void sendMessageWithAll(String key, String tag, String value) {
Message message = MessageBuilder.withPayload(value).setHeader(RocketMQHeaders.KEYS, key).build();
SendResult sendResult = rocketMQTemplate.syncSend(String.format("%s:%s", TOPIC, tag), message);
log.info("sendMessageWithAll result: [{}]", JSON.toJSONString(sendResult));
}
}
13.4.1.3 RocketMQ 消费者方式汇总
使用同步的方式发送消息, 不指定 key 和 tag
@Slf4j
@Component
@RocketMQMessageListener(
topic = "edcode-study-rocketmq",
consumerGroup = "edcode-springboot-rocketmq-string"
)
public class RocketMQConsumerString implements RocketMQListener {
@Override
public void onMessage(String message) {
MessageVo rocketMessage = JSON.parseObject(message, MessageVo.class);
log.info("consume message in RocketMQConsumerString: [{}]",
JSON.toJSONString(rocketMessage));
}
}
指定了消费带有 tag 的消息
@Slf4j
@Component
@RocketMQMessageListener(
topic = "edcode-study-rocketmq",
consumerGroup = "edcode-springboot-rocketmq-tag-string",
selectorexpression = "edcode" // 根据 tag 过滤
)
public class RocketMQConsumerTagString implements RocketMQListener {
@Override
public void onMessage(String message) {
MessageVo rocketMessage = JSON.parseObject(message, MessageVo.class);
log.info("consume message in RocketMQConsumerTagString: [{}]",
JSON.toJSONString(rocketMessage));
}
}
指定消费带有 tag 的消息, 且消费的是 Java Pojo
@Slf4j
@Component
@RocketMQMessageListener(
topic = "edcode-study-rocketmq",
consumerGroup = "edcode-springboot-rocketmq-tag-object",
selectorexpression = "edcode" // 根据 tag 做过滤
)
public class RocketMQConsumerObject implements RocketMQListener {
@Override
public void onMessage(MessageVo message) {
log.info("consume message in RocketMQConsumerObject: [{}]",
JSON.toJSONString(message));
// so something
}
}
扩展 MessageExt, 可以获取 Keys 之类字段
@Slf4j
@Component
@RocketMQMessageListener(
topic = "edcode-study-rocketmq",
consumerGroup = "edcode-springboot-rocketmq-message-ext"
)
public class RocketMQConsumerMessageExt implements RocketMQListener {
@Override
public void onMessage(MessageExt message) {
String value = new String(message.getBody());
log.info("consume message in RocketMQConsumerMessageExt: [{}], [{}]",
message.getKeys(), value);
log.info("MessageExt: [{}]", JSON.toJSONString(message)); // 会慢一些
}
}
13.6.1.1 RocketMQ Producer Api 发送消息
@Slf4j
@RestController
@RequestMapping("/rocket-mq")
@RequiredArgsConstructor
public class RocketMQController {
private static final MessageVo RocketMQMessage = new MessageVo(1, "Edcode-Study-RocketMQ-In-SpringBoot");
private final RocketMQProducer rocketMQProducer;
@GetMapping("/message-with-value")
public void sendMessageWithValue() {
rocketMQProducer.sendMessageWithValue(JSON.toJSONString(RocketMQMessage));
}
@GetMapping("/message-with-key")
public void sendMessageWithKey() {
rocketMQProducer.sendMessageWithKey("Edcode", JSON.toJSONString(RocketMQMessage));
}
@GetMapping("/message-with-tag")
public void sendMessageWithTag() {
rocketMQProducer.sendMessageWithTag("edcode", JSON.toJSONString(RocketMQMessage));
}
@GetMapping("/message-with-all")
public void sendMessageWithAll() {
rocketMQProducer.sendMessageWithAll("Edcode", "edcode", JSON.toJSONString(RocketMQMessage));
}
}
13.7.1.1 rocket-mq-controller.http
### message-with-value GET http://127.0.0.1:8001/edcode-study-scacommerce-dev/rocket-mq/message-with-value Content-Type: application/json ### message-with-key GET http://127.0.0.1:8001/edcode-study-scacommerce-dev/rocket-mq/message-with-key Content-Type: application/json ### message-with-tag GET http://127.0.0.1:8001/edcode-study-scacommerce-dev/rocket-mq/message-with-tag Content-Type: application/json ### message-with-all GET http://127.0.0.1:8001/edcode-study-scacommerce-dev/rocket-mq/message-with-all Content-Type: application/json ###13.8.1 SpringCloud Stream 消息驱动组件概览
- 为什么要有 SpringCloud Stream
- 如果没有 SpringCloud Stream ,那么我们怎么玩消息驱动?
- SpringCloud Stream 应用模型
- SpringCloud Stream 中的核心概念
- 负责与中间件交互的抽象绑定器:Binder
- 发送消息与接收消息的应用通信信道:Input、Output
- SpringCloud Stream 中的核心概念
- SpringCloud Stream 应用模型
- 经典的 SpringCloud Stream 发布-订阅模型
- Topic 可以认为就是 Kafka 中的 Topic 概念
- Producer 通过 Input 信道发布消息到 Topic 上
- Consumer 通过 Output 信道消费 Topic 上的消息
- 经典的 SpringCloud Stream 发布-订阅模型
Maven 依赖
sca-commerce com.edcode.commerce 1.0-SNAPSHOT 4.0.0 sca-commerce-stream-client 1.0-SNAPSHOT jar sca-commerce-stream-client Stream Client com.alibaba.cloud spring-cloud-starter-alibaba-nacos-discovery com.edcode.commerce sca-commerce-mvc-config 1.0-SNAPSHOT org.springframework.cloud spring-cloud-starter-zipkin org.springframework.kafka spring-kafka 2.5.0.RELEASE org.springframework.cloud spring-cloud-stream org.springframework.cloud spring-cloud-stream-binder-kafka ${artifactId} org.springframework.boot spring-boot-maven-plugin repackage
bootstrap.yml
server:
port: 8006
servlet:
context-path: /scacommerce-stream-client
spring:
application:
name: sca-commerce-stream-service
cloud:
nacos:
# 服务注册发现
discovery:
enabled: true # 如果不想使用 Nacos 进行服务注册和发现, 设置为 false 即可
#server-addr: ${NACOS_ADDR:127.0.0.1}:8848
server-addr: ${NACOS_ADDR_1:127.0.0.1}:8848,${NACOS_ADDR_2:127.0.0.1}:8858,${NACOS_ADDR_3:127.0.0.1}:8868 # Nacos 服务器地址
namespace: ${NAMESPACE_ID:1adcfdd8-5763-4768-9a15-9c7157988950}
metadata:
management:
context-path: ${server.servlet.context-path}/actuator
stream:
# SpringCloud Stream + RocketMQ
# rocketmq:
# binder:
# name-server: ${ROCKETMQ_SERVER:127.0.0.1}:${ROCKETMQ_PORT:9876}
kafka:
binder:
brokers: ${KAFKA_SERVER:127.0.0.1}:${KAFKA_PORT:9092}
auto-create-topics: true # 如果设置为false, 就不会自动创建Topic, 你在使用之前需要手动创建好, 生产环境建议 false
bindings:
# 默认发送方
output: # 这里用 Stream 给我们提供的默认 output 信道
destination: scacommerce-stream-client-default # 消息发往的目的地, Kafka 中就是 Topic
content-type: text/plain # 消息发送的格式, 接收端不用指定格式, 但是发送端要
# 默认接收方 (按道理是不同的项目工程 input 与 output)
input: # 这里用 Stream 给我们提供的默认 input 信道
destination: scacommerce-stream-client-default
# Edcode 发送方
edcodeOutput:
destination: scacommerce-stream-client-edcode
content-type: text/plain
# Edcode 接收方
edcodeInput:
destination: scacommerce-stream-client-edcode
kafka:
bootstrap-servers: ${KAFKA_SERVER:127.0.0.1}:${KAFKA_PORT:9092}
producer:
retries: 3
consumer:
auto-offset-reset: latest
zipkin:
sender:
type: ${ZIPKIN_KAFKA_SENDER:web} # 默认是 web
base-url: http://${ZIPKIN_URL:localhost}:${ZIPKIN_PORT:9411}/
sleuth:
sampler:
# RateLimitingSampler 抽样策略,设置了限速采样,spring.sleuth.sampler.probability 属性值无效
rate: 100 # 每秒间隔接受的 trace 量
# Probability 抽样策略
probability: 1.0 # 采样比例,1.0 表示 100%, 默认:0.1
redis:
database: 0
host: ${REDIS_HOST:localhost}
port: ${REDIS_PORT:6379}
timeout: 5000
# 暴露端点
management:
endpoints:
web:
exposure:
include: '*'
endpoint:
health:
show-details: always
创建 SpringBoot 启动类
@EnableDiscoveryClient
@SpringBootApplication
public class StreamClientApplication {
public static void main(String[] args) {
SpringApplication.run(StreamClientApplication.class, args);
}
}
消息传递对象: SpringCloud Stream + Kafka/RocketMQ
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageVo {
private Integer id;
private String projectName;
private String org;
private String author;
private String version;
public static MessageVo defaultMessage() {
return new MessageVo(
1,
"sca-commerce-stream-client",
"blog.eddilee.cn",
"Eddie",
"1.0"
);
}
}
使用默认的通信信道实现消息的发送
package com.edcode.commerce.stream;
import com.alibaba.fastjson.JSON;
import com.edcode.commerce.vo.MessageVo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
@Slf4j
@EnableBinding(Source.class)
@RequiredArgsConstructor
public class DefaultSendService {
private final Source source;
public void sendMessage(MessageVo message) {
String jsonString = JSON.toJSONString(message);
log.info("在 DefaultSendService 中发送消息: [{}]", jsonString);
// Spring Messaging, 统一消息的编程模型, 是 Stream 组件的重要组成部分之一
source.output().send(MessageBuilder.withPayload(jsonString).build());
}
}
使用默认的信道实现消息的接收
package com.edcode.commerce.stream;
import com.alibaba.fastjson.JSON;
import com.edcode.commerce.vo.MessageVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
@Slf4j
@EnableBinding(Sink.class)
public class DefaultReceiveService {
@StreamListener(Sink.INPUT)
public void receiveMessage(Object payload) {
log.info("在 DefaultReceiveService 消费消息中启动 ");
MessageVo edcodeMessage = JSON.parseObject(payload.toString(), MessageVo.class);
// 消费消息
log.info("在 DefaultReceiveService 中使用消息成功: [{}]", JSON.toJSONString(edcodeMessage));
}
}
13.10.1 自定义Stream消息通信信道实现定制分发
13.10.1.1 构建消息驱动
@Slf4j
@RestController
@RequestMapping("/message")
@RequiredArgsConstructor
public class MessageController {
private final DefaultSendService defaultSendService;
private final EdcodeSendService edcodeSendService;
@GetMapping("/default")
public void defaultSend() {
defaultSendService.sendMessage(MessageVo.defaultMessage());
}
@GetMapping("/edcode")
public void qinyiSend() {
edcodeSendService.sendMessage(MessageVo.defaultMessage());
}
}
13.10.1.2 message.http
### 发送默认信道消息 GET http://127.0.0.1:8006/scacommerce-stream-client/message/default Content-Type: application/json ### 发送自定义信道消息 GET http://127.0.0.1:8006/scacommerce-stream-client/message/edcode Content-Type: application/json ###13.11.1 SpringCloud Stream消息分组和消费分区的配置与说明 13.11.1.1 SpringCloud Stream消息分组
- SpringCloud Stream消息分组模型
- 应用的不同实例放在一个消费者组中,每一条消息只会被一个实例消费
- 消费者组的思想是通过多实例扩展服务吞吐量,且不会造成消息的重覆消费
- SpringCloud Stream消息分区
- 消费分区的作用就是为了确保距又共同特性标识的数据由同一个消费者实例进行处理
本章完整配置文件 bootstrap.yml
server:
port: 8006
servlet:
context-path: /scacommerce-stream-client
spring:
application:
name: sca-commerce-stream-service
cloud:
nacos:
# 服务注册发现
discovery:
enabled: true # 如果不想使用 Nacos 进行服务注册和发现, 设置为 false 即可
#server-addr: ${NACOS_ADDR:127.0.0.1}:8848
server-addr: ${NACOS_ADDR_1:127.0.0.1}:8848,${NACOS_ADDR_2:127.0.0.1}:8858,${NACOS_ADDR_3:127.0.0.1}:8868 # Nacos 服务器地址
namespace: ${NAMESPACE_ID:1adcfdd8-5763-4768-9a15-9c7157988950}
metadata:
management:
context-path: ${server.servlet.context-path}/actuator
stream:
# SpringCloud Stream + RocketMQ
# rocketmq:
# binder:
# name-server: ${ROCKETMQ_SERVER:127.0.0.1}:${ROCKETMQ_PORT:9876}
# 开启 stream 分区支持
instanceCount: 1 # 消费者的总数
instanceIndex: 0 # 当前消费者的索引
kafka:
binder:
brokers: ${KAFKA_SERVER:127.0.0.1}:${KAFKA_PORT:9092}
auto-create-topics: true # 如果设置为false, 就不会自动创建Topic, 你在使用之前需要手动创建好, 生产环境建议 false
bindings:
# 默认发送方
output: # 这里用 Stream 给我们提供的默认 output 信道
destination: scacommerce-stream-client-default # 消息发往的目的地, Kafka 中就是 Topic
content-type: text/plain # 消息发送的格式, 接收端不用指定格式, 但是发送端要
# 消息分区
producer:
# partitionKeyexpression: payload.author # 分区关键字, payload 指的是发送的对象, author 是对象中的属性
partitionCount: 1 # 分区大小
# 使用自定义的分区策略, 注释掉 partitionKeyexpression
partitionKeyExtractorName: edcodePartitionKeyExtractorStrategy # com.edcode.commerce.partition.EdcodePartitionKeyExtractorStrategy
partitionSelectorName: edcodePartitionSelectorStrategy # com.edcode.commerce.partition.EdcodePartitionSelectorStrategy
# 默认接收方 (按道理是不同的项目工程 input 与 output)
input: # 这里用 Stream 给我们提供的默认 input 信道
destination: scacommerce-stream-client-default
# 消息分组
group: sca-commerce-edcode-default
# 消费者开启分区支持
consumer:
partitioned: true
# Edcode 发送方
edcodeOutput:
destination: scacommerce-stream-client-edcode
content-type: text/plain
# Edcode 接收方
edcodeInput:
destination: scacommerce-stream-client-edcode
# 消息分组
group: sca-commerce-edcode-edcode
kafka:
bootstrap-servers: ${KAFKA_SERVER:127.0.0.1}:${KAFKA_PORT:9092}
producer:
retries: 3
consumer:
auto-offset-reset: latest
zipkin:
sender:
type: ${ZIPKIN_KAFKA_SENDER:web} # 默认是 web
base-url: http://${ZIPKIN_URL:localhost}:${ZIPKIN_PORT:9411}/
sleuth:
sampler:
# RateLimitingSampler 抽样策略,设置了限速采样,spring.sleuth.sampler.probability 属性值无效
rate: 100 # 每秒间隔接受的 trace 量
# Probability 抽样策略
probability: 1.0 # 采样比例,1.0 表示 100%, 默认:0.1
redis:
database: 0
host: ${REDIS_HOST:localhost}
port: ${REDIS_PORT:6379}
timeout: 5000
# 暴露端点
management:
endpoints:
web:
exposure:
include: '*'
endpoint:
health:
show-details: always
分区部分
自定义从 Message 中提取 partition key 的策略
@Slf4j
@Component
public class EdcodePartitionKeyExtractorStrategy implements PartitionKeyExtractorStrategy {
@Override
public Object extractKey(Message> message) {
MessageVo messageVo = JSON.parseObject(message.getPayload().toString(), MessageVo.class);
// 自定义提取 key
String key = messageVo.getProjectName();
log.info("SpringCloud Stream EdCode Partition Key: [{}]", key);
return key;
}
}
决定 message 发送到哪个分区的策略
@Slf4j
@Component
public class EdcodePartitionSelectorStrategy implements PartitionSelectorStrategy {
@Override
public int selectPartition(Object key, int partitionCount) {
int partition = key.toString().hashCode() % partitionCount;
log.info("SpringCloud Stream EdCode Selector info: [{}], [{}], [{}]", key.toString(), partitionCount, partition);
return partition;
}
}



