微服务架构 基础(七) 持续更新…
消息驱动继续前面的基础六工程进行扩展
什么是消息驱动?
Spring Cloud Stream消息驱动可以降低开发人员对消息中间件的使用复杂度,让系统开发人员更多尽力专注与核心业务逻辑的开发。Spring Cloud Stream基于SpringBoot实现,自动配置化的功能可以帮助我们快速上手学习,利用消息驱动,可以平滑地切换多种不同的消息中间件(如RabbitMQ、Kafka),屏蔽底层消息中间件的差异,降低学习成本,统一消息的编程模型。
什么是Spring Cloud Stream?
官方定义的Spring Cloud Stream是一个构建消息驱动服务的微服务框架。
应用程序通过Inputs或者通过outputs来与Spring Cloud Stream中的builder对象交互。
我们只要通过配置来binding(绑定),而Spring Cloud Stream 的binder对象则负责与消息中间件交互。所以,我们只需要掌握如何与Spring Cloud Stream交互就可以方便地使用消息驱动中间件。通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream为一些供应商的消息中间件提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
- Middleware:消息中间件,目前只支持RabbitMQ和Kafka
- Binder:Binder是应用与消息中间件的封装,目前实现了RabbitMQ和Kafka的Binder,通过Binder可以很方便地连接中间件,可以动态地改变消息类型(对应与RabbitMQ地exchange以及Kafka的topic),这些都可以通过配置文件来实现
- @Input:注解标识为输入通道,通过该输入通道接收到的消息进入应用程序
- @Output:注解标识为输出通道,发布的的消息通过该通道离开应用程序
- @StreamListener:监听队列,用于消费者的队列的消息接收
- @EnableBinding:指启用信道channel和Binder绑定
测试学习需要,新建三个子模块,分别为rabbitmq-stream-provider-9901消息生产者、rabbitmq-stream-consumer-9902消息消费者以及rabbitmq-stream-consumer-9903消息消费者,当前工程结构如下:
消息生产者引入主要依赖:
org.springframework.cloud spring-cloud-starter-netflix-eureka-client org.springframework.cloud spring-cloud-starter-stream-rabbit org.springframework.boot spring-boot-starter-actuator org.springframework.boot spring-boot-starter-web org.projectlombok lombok org.springframework.boot spring-boot-starter-test test org.junit.vintage junit-vintage-engine
添加application.yml文件:
server:
port: 9901
spring:
application:
name: rabbitmq-stream-provider
cloud:
stream:
binders: # 配置绑定的消息中间件服务信息
defaultRabbit: # 定义实例对象名称
type: rabbit # 消息中间组件的类型
environment: # 设置RabbitMQ相关的环境设置
spring:
rabbitmq:
host: 192.168.50.248
port: 5672
username: root
password: root
virtual-host: /
bindings: # 服务的整合处理
output: # 定义该模块为生产者模块
destination: Exchange # 定义交换器名称
content-type: application/json # 设置消息类型,文本可以设置为text/plain
binder:
- ${spring.cloud.stream.binders.defaultRabbit} # 绑定实例对象
rabbitmq:
host: 192.168.50.248
port: 5672
username: root
password: root
virtual-host: /
eureka:
client:
register-with-eureka: true
fetch-registry: true
service-url:
defaultZone: http://eureka7001.cn:7001/eureka,http://eureka7002.cn:7002/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳时间为2秒
lease-expiration-duration-in-seconds: 20 # 生存时间设置为20秒,默认为30秒
业务层:
消息发送接口
package cn.wu.service;
public interface IMessageProvider {
public void send(String message);
}
消息接口实现类
package cn.wu.service.impl;
import cn.wu.service.IMessageProvider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Slf4j
@Service("messageProviderBean")
@EnableBinding(Source.class) // 这里的Source是上图消息生产者的中的Source,即定义消息推送推送管道
public class MessageProvider implements IMessageProvider {
@Resource
// 这里有坑:一定需要是output名称的MessageChannel对象,因为IoC有三个同类型的MessageChannel,否则无法确认注入对象
private MessageChannel output; // 消息发送管道
@Override
public void send(String message) {
output.send(MessageBuilder.withPayload(message).build());
log.info("构建Message对象完成… ");
}
}
控制层:
package cn.wu.controller;
import cn.wu.service.IMessageProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MessageController {
private IMessageProvider messageProvider;
@Autowired
@Qualifier("messageProviderBean")
public void setMessageProvider(IMessageProvider messageProvider) {
this.messageProvider = messageProvider;
}
@GetMapping("/send/{message}")
public String sendMessage(@PathVariable("message") String message){
try{
messageProvider.send(message);
return "发送成功!";
}catch(Exception e){
return "发送失败!";
}
}
}
RabbitMQ连接情况:
消息生产者测试:
消息消费者两个消息消费者依赖配置文件同上
添加application.yml文件:
server:
port: 9902/9903
spring:
application:
name: rabbitmq-stream-provider
cloud:
stream:
binders: # 配置绑定的消息中间件服务信息
defaultRabbit: # 定义实例对象名称
type: rabbit # 消息中间组件的类型
environment: # 设置RabbitMQ相关的环境设置
spring:
rabbitmq:
host: 192.168.50.248
port: 5672
username: root
password: root
virtual-host: /
bindings: # 服务的整合处理
input: # 定义该模块为生产者模块
destination: Exchange # 定义交换器名称
content-type: application/json # 设置消息类型,文本可以设置为text/plain
binder:
- ${spring.cloud.stream.binders.defaultRabbit} # 绑定实例对象
eureka:
client:
register-with-eureka: true
fetch-registry: true
service-url:
defaultZone: http://eureka7001.cn:7001/eureka,http://eureka7002.cn:7002/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳时间为2秒
lease-expiration-duration-in-seconds: 20 # 生存时间设置为20秒,默认为30秒
添加控制层:
package cn.wu.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@EnableBinding(Sink.class) // 这里是上面图中的Sink
public class ConsumerController {
@Value("${server.port}")
private String port;
@StreamListener(Sink.INPUT)
public void receive(Message message){
log.info("消息接收者II 已经接收到的消息为: " + message.getPayload()+",当前端口为: "+port);
}
}
测试结果:
消息生产者通过一次URL请求,两个消息接收者均接收到消息
从以上的结果可以发现一个问题,就是存在重复消费,即生产者每生产一个消息,消费者都会进行消费,这种结果可能会造成数据错误
至于出现以上情况的原因如下:
由于同时出现了两个队列(group),生产者每次生产的消息都放入这两个队列中,因此当消费者接收消息时,都能接收到,也即是说,只有处于同一队列中的消费者才具有竞争关系…
设置为不同分组很简单,只需要添加group配置信息即可
bindings: # 服务的整合处理
input: # 定义该模块为生产者模块
group: consumerB # 组名(实际为队列名称)
重启,如下结果:
此时,如果想要竞争关系,都设置为相同组即可:
bindings: # 服务的整合处理
input: # 定义该模块为生产者模块
group: consumer # 组名(实际为队列名称)
分布式请求链路跟踪group属性除了解决重复消费的问题,还解决了持久化的问题,即当消息生产者生产了消息后,但是消费者侧由于某些原因关机,此时对于具有分组属性的消费者重新启动依然能接收到关机前消息生产者生产的消息,但是对于没有分组属性的消费者,则不能接收到生产者生产的消息而造成消息丢失。
在微服务框架中,一个由客户端发起的请求在后端系统中会经过多个不同的服务节点调用来协同产生最后的请求结果,每一个前段请求都会形成一条复杂的分布式服务调用链路,链路中的任何一环出现高延时或错误都会引起整个请求最后的失败…
zipkin搭建Spring Cloud Sleuth提供了一套完整的服务跟踪的解决方案
什么是zipkin?
Zipkin是一个分布式跟踪系统。它有助于收集用于解决微服务架构中的延迟问题所需的时序数据。以解决微服务框架中的延迟问题,包括数据的收集,存储,查找和展现。
这里可以下载相应的jar包
然后编译jar包启动…
浏览器打开相应网址:
服务提供者(micro-service-8001)为了方便测试学习,对以前的两个子模块(服务提供者micro-service-8001和未使用OpenFeigin的服务消费者main-service-80)进行修改
再次添加依赖:
org.springframework.cloud spring-cloud-starter-zipkin
此时可以发现zipkin中已经包含了sleuth依赖
添加application.yml文件的配置信息:
spring:
zipkin:
base-url: http://localhost:9411 # zipkin管理界面
sleuth:
sampler:
probability: 1 # 采样率介于0到1之间,1表示全部采样
修改控制层:
@GetMapping("/test/zipkin")
public String testZipkin(){
return "测试zipkin… ";
}
服务消费者(main-service-80)
同样地引入对应的依赖:
org.springframework.cloud spring-cloud-starter-zipkin
同样地添加配置信息:
spring:
zipkin:
base-url: http://localhost:9411 # zipkin管理界面
sleuth:
sampler:
probability: 1 # 采样率介于0到1之间,1表示全部采样
修改控制层:
@GetMapping("/test/zipkin")
public String testZipkin(){
return restTemplate.getForObject(URL1+"/test/zipkin",String.class);
}
依次启动两个Eureka服务端,服务提供者(micro-service-8001),服务消费者(main-service-80),测试结果为:
点击SHOW,可以查看更多详细信息:



