第四章 `RabbitMQ`与`Spring`整合
4.1-`AMQP`核心组件4.2-`SprigAMQP`管理组件`RabbitAdmin`应用4.3-`SprigAMQP-RabbitMQ`声明式配置使用4.4-`SpringAMQP`消息模板组件-`RabbitTemplate`4.5-`SpringAMQP`简单消息监听容器-`SimpleMessageListenerContainer`4.6-`SpringAMQP`消息监听适配器-`MessageListenerAdapter`4.7-`SpringAMPQ`消息转换器-`MessageConverter`4.8-`RabbitMQ`与`SpringBoot2.0`整合
生产端消费端 4.9-`RabbitMQ`与`SpringCloud Stream`整合
4.1-AMQP核心组件RabbitAdmin //管控组件SpringAMQP声明 //声明交换机、绑定、队列等操作,使用@Bean注解,直接加入IOCRabbitTemplate //消息模板SimpleMessageListenerContainer //简单消息监听容器,可以在消费者端进行详细配置MessageListenerAdapter //消息监听适配器,适用于添加完监听器之后MessageConverter // 消息转换器,比如消息序列号和反序列化 4.2-SprigAMQP管理组件RabbitAdmin应用
作用:
RabbitAdmin类可以很好的操作RabbitMQ,比如增加删除绑定交换机、队列,在Spring中直接进行注入即可使用
//ConnectionFactory 必须是 org.springframework.amqp.rabbit.connection.ConnectionFactory;
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
注意:autoStartup必须要设置为true,否则Spring容器不会加载RabbitAdmin类
RabbitAdmin底层实现就是从Spring容器中获取Exchange、Bingding、RoutingKey、Queue的@Bean声明,然后使用RabbitTemplate的execute方法执行对应的声明、修改、删除等一系列RabbitMq基础操作
代码演示
引入依赖
com.rabbitmq amqp-client RELEASE org.springframework.boot spring-boot-starter-amqp 2.3.2.RELEASE
配置Bean
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
//引入的是spring.amqp下的connectionFactory,而不是rabbit.client包下的
@Configuration
@ComponentScan({"com.cy.amqp"})
public class RabbitMQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("127.0.0.1:5672");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
编写测试类
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
@Autowired
private RabbitAdmin rabbitAdmin;
@Test
public void testRabbitAdmin(){
rabbitAdmin.declareExchange(new DirectExchange("test.direct", false,false ));
rabbitAdmin.declareQueue(new Queue("test.direct.queue",false ));
rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, "test.direct", "direct", null));
//BindingBuilder 链式操作,但在操作中,如果没有事先声明创建交换机和队列,会报错提示找不到
rabbitAdmin.declareBinding(
BindingBuilder
.bind(new Queue("test.direct.queue", false))
.to(new DirectExchange("test.direct", false, false))
.with("direct")
);
rabbitAdmin.purgeQueue("test.direct.queue");
}
}
RabbitAdmin源码
实现了InitializingBean接口,表明在Bean配置加载完后再加载RabbitAdmin配置。找到afterPropertiesSet()方法中最要的initialize()初始化方法。可以看到Exchange、Queue、Binding都是从Spring容器中获取三种类型,加载到上方定义的contextExchanges、contextQueues、contextBindings三种容器中。 后续的源码中,也可以看出通过筛选Spring容器中RabbitMQ的信息之后,再去建立RabbitMQ服务器的连接。主要通过Spring以@Bean的方式,将配置加载到Spring容器之后,再从容器中获取相关信息,再去建立连接。
在Rabbit基础API里面声明一个Exchange、声明一个绑定、一个队列
// 用的是 amqp-client 依赖; channel.exchangeDeclare() channel.queueDeclare() channel.queueBind()
使用SpringAMQP去声明,就需要使用SpringAMQP的如下模式,即声明@Bean方式
// 用的是spring-boot-starter-amqp 依赖;
@Bean
public TopicExchange exchange001() {
return new TopicExchange("topic001", true, false);
}
@Bean
public Queue queue001() {
return new Queue("queue001", true); //队列持久
}
@Bean
public Binding binding001() {
return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
}
@Bean
public TopicExchange exchange002() {
return new TopicExchange("topic002", true, false);
}
@Bean
public Queue queue002() {
return new Queue("queue002", true); //队列持久
}
@Bean
public Binding binding002() {
return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*");
}
@Bean
public Queue queue003() {
return new Queue("queue003", true); //队列持久
}
@Bean
public Binding binding003() {
//同一个Exchange绑定了2个队列
return BindingBuilder.bind(queue003()).to(exchange001()).with("mq.*");
}
4.4-SpringAMQP消息模板组件-RabbitTemplate
消息模板,是与SpringAMQP整合的时候进行发送消息的关键类
该类提供了丰富的发送消息方法,包括可靠性投递消息方法、回调监听消息接口/confirm/iCallback,返回值确认接口ReturnCallback等。同样的我们需要进行注入到Spring容器中,然后直接使用
在与Spring整合时,需要实例化,但是在和Springboot整合时,在配置文件中添加配置即可
代码演示
RabbitMQConfig-配置参数
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
测试类
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMsg(){
//创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("desc", "消息描述。。。");
messageProperties.getHeaders().put("type", "自定义消息类型");
//消息体
Message message = new Message("hello rabbit".getBytes(), messageProperties);
//转换并发送
rabbitTemplate.convertAndSend("dir01", "spring.amqp", message, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
message.getMessageProperties().getHeaders().put("attr", "额外新增的属性");
return message;
}
});
}
简单写法
@Test
public void sendmsg2(){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("mq消息".getBytes(), messageProperties);
rabbitTemplate.send("topic", "spring.abc", message);
rabbitTemplate.convertAndSend("topic1", "spring.a", "hello,1");
}
4.5-SpringAMQP简单消息监听容器-SimpleMessageListenerContainer
这个类非常强大,我们可以对它进行很多设置,对于消费者的配置项,这个类可以满足
监听队列(多个队列)、自动启动、自动声明功能
设置事务特性、事务管理器、事务属性、事务容器(并发)、是否开启事务、回滚消息等
设置消费者数量、最小最大数量、批量消费
设置消息确认和自动确认模式、是否重回队列、异常捕捉handler函数
设置消费者标签生成策略、是否独占模式、消费者属性等
设置具体的监听器、消息转换器等
SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量大小、接收消息的模式等
很多基于RabbitMQ的自制后端管控台在进行动态设置的时候,也是根据这一特性去实现的
主要使用方法:
messageListenerAdapter 消息监听适配器
defaultListenerMethod 默认监听方法
Delegate 委托对象,实际真实的委托人,进行处理消息
queueOrTagToMethodName 队列与方法进行绑定
代码演示
RabbitMQConfig-配置参数
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(connectionFactory);
listenerContainer.setConcurrentConsumers(1); // 设置当前消费者为1
listenerContainer.setMaxConcurrentConsumers(5);// 设置最大消费者5
listenerContainer.setQueues(queue001(),queue002(),queue003());// 设置监听的队列,此处使用了4.3 声明式配置中定义的队列
listenerContainer.setDefaultRequeueRejected(false);// 设置是否重回队列
listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);//设置接收方式,AUTO-自动接收,MANUAL-手动接收,NULL-不接收
listenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() { //设置消费者标签策略
@Override
public String createConsumerTag(String s) {
return s+"_"+ UUID.randomUUID().toString();
}
});
listenerContainer.setMessageListener(new MessageListener() { // 监听消息
@Override
public void onMessage(Message message) {
String msg = new String(message.getBody());
System.out.println("消息:"+msg);
}
});
return listenerContainer;
}
4.6-SpringAMQP消息监听适配器-MessageListenerAdapter
RabbitMQconfig - 配置参数
//原始方法,可以直接new MessageListener
listenerContainer.setMessageListener(new MessageListener() { // 监听消息
@Override
public void onMessage(Message message) {
String msg = new String(message.getBody());
System.out.println("消息:"+msg);
}
});
// 1.1 默认是有自己的方法名字的:handleMessage
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
listenerContainer.setMessageListener(adapter);
// 1.2 可以自己指定一个方法的名字: consumeMessage
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
listenerContainer.setMessageListener(adapter);
// 2 也可以添加一个转换器: 从字节数组转换为String
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
adapter.setMessageConverter(new MessageConverter() { // 消息转换器
@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
// java对象,转换成message对象
return new Message(o.toString().getBytes(),messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
// message对象,转换为java对象
String contentType = message.getMessageProperties().getContentType();
if (null != contentType && contentType.contains("text")) {
return new String(message.getBody());
}
return message.getBody();
}
});
listenerContainer.setMessageListener(adapter);
return listenerContainer;
MessageDelegate 消息代理
public class MessageDelegate {
public void handleMessage(byte[] messageBody) {
System.out.println("默认方法, 消息内容:" + new String(messageBody));
}
public void consumeMessage(byte[] messageBody) {
System.out.println("字节数组方法, 消息内容:" + new String(messageBody));
}
public void consumeMessage(String messageBody) {
System.err.println("字符串方法, 消息内容:" + messageBody);
}
public void method1(String messageBody) {
System.err.println("method1 收到消息内容:" + new String(messageBody));
}
public void method2(String messageBody) {
System.err.println("method2 收到消息内容:" + new String(messageBody));
}
public void consumeMessage(Map messageBody) {
System.err.println("map方法, 消息内容:" + messageBody);
}
public void consumeMessage(Order order) {
System.err.println("order对象, 消息内容, id: " + order.getId() +
", name: " + order.getName() +
", content: "+ order.getContent());
}
public void consumeMessage(Packaged pack) {
System.err.println("package对象, 消息内容, id: " + pack.getId() +
", name: " + pack.getName() +
", content: "+ pack.getDescription());
}
public void consumeMessage(File file) {
System.err.println("文件对象 方法, 消息内容:" + file.getName());
}
}
单元测试
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class RabbitAdminTest {
@Autowired
private RabbitAdmin rabbitAdmin ;
@Autowired
private RabbitTemplate rabbitTemplate;
// 测试对应 RabbitMQConfig-1.1和1.2
@Test
public void testAdapter1() {
MessageProperties messageProperties = new MessageProperties();
Message message = new Message("hello rabbitmq-1".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("topic001", "spring.a", message);
}
// 测试对应 RabbitMQConfig-2 也可以添加一个转换器: 从字节数组转换为String
@Test
public void testAdapter2() {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("hello rabbitmq-2".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("topic001", "spring.b", message);
}
// 测试对应 RabbitMQConfig-3 可以将队列名称,和我们自定义的方法名进行绑定
@Test
public void testAdapter3() {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message1 = new Message("hello rabbitmq-3.1".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("topic001", "spring.c", message1);
Message message2 = new Message("hello rabbitmq-3.2".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("topic002", "rabbit.a", message2);
}
}
4.7-SpringAMPQ消息转换器-MessageConverter
发送消息时,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要使用MessageConverter自定义转换器,一般都需要实现接口重写两个方法
toMessage java对象转换为MessagefromMessage Message对象转换为java对象 除了转换string类型数据,还有以下
json转换器 Jackson2JsonMessageConverter 可以进行java对象的转换功能DefaultJackson2JavaTypeMapper映射器 可以进行java对象的映射关系自定义二进制转换器,比如图片、PDF、PPT、流媒体
RabbitMqConfig 配置参数
// 1.1 支持json格式的转换器
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
adapter.setMessageConverter(jackson2JsonMessageConverter);
listenerContainer.setMessageListener(adapter);
// 1.2 支持java对象转换 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
//添加包信任,否则报错:The class 'com.lc.study.spring.OrderEntity' is not in the trusted packages
javaTypeMapper.setTrustedPackages("*");
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
listenerContainer.setMessageListener(adapter);
// 1.3 支持java多对象映射转换 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
//添加包信任,否则报错:The class 'com.lc.study.spring.OrderEntity' is not in the trusted packages
javaTypeMapper.setTrustedPackages("*");
HashMap> classHashMap = new HashMap<>();
classHashMap.put("order",com.lc.study.spring.OrderEntity.class);
classHashMap.put("package",com.lc.study.spring.PackageEntity.class);
javaTypeMapper.setIdClassMapping(classHashMap);
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
listenerContainer.setMessageListener(adapter);
// 1.4 text/image/pdf 转换器
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
//全局转换器,大的
ContentTypeDelegatingMessageConverter converter = new ContentTypeDelegatingMessageConverter();
//文本转换器
MessageConverter testConverter = new MessageConverter() {
@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
return new Message(o.toString().getBytes(),messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
String contentType = message.getMessageProperties().getContentType();
if (null != contentType && contentType.contains("text")) {
return new String(message.getBody());
}
return message.getBody();
}
};
converter.addDelegate("text",testConverter);
converter.addDelegate("html/text",testConverter);
converter.addDelegate("xml/text",testConverter);
converter.addDelegate("text/plain",testConverter);
// json转换器
Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
converter.addDelegate("json",jsonMessageConverter);
converter.addDelegate("application/json",jsonMessageConverter);
// 图片转换器
MessageConverter imageConverter = new MessageConverter() {
@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException("图片转换异常");
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
Object _extName = message.getMessageProperties().getHeaders().get("extName");
String extName = _extName == null ?"png":_extName.toString();
System.out.println(extName);
byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
String path = "C:/Users/24669/Downloads/workspace/" + fileName +"."+extName;
File file = new File(path);
try {
Files.copy(new ByteArrayInputStream(body),file.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
};
converter.addDelegate("image/png",imageConverter);
converter.addDelegate("image",imageConverter);
// pdf 转换器
MessageConverter pdfConverter = new MessageConverter() {
@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException("pdf转换异常");
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
String path = "C:/Users/24669/Downloads/workspace/" + fileName +".pdf";
File file = new File(path);
try {
Files.copy(new ByteArrayInputStream(body),file.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
};
converter.addDelegate("application/pdf",pdfConverter);
adapter.setMessageConverter(converter);
listenerContainer.setMessageListener(adapter);
return listenerContainer;
MessageDelegate 消息代理 同上一章
单元测试
//测试对应 1.1 支持json格式的转换器
@Test
public void testJsonMessage() throws JsonProcessingException {
OrderEntity order = new OrderEntity();
order.setId("0001");
order.setName("订单消息");
order.setContent("小笼包+豆浆+茶叶蛋");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writevalueAsString(order);
System.out.println("json消息体:"+json);
MessageProperties properties = new MessageProperties();
//这里注意一定要修改contentType为 application/json
properties.setContentType("application/json");
Message message = new Message(json.getBytes(), properties);
rabbitTemplate.convertAndSend("topic001","spring.json",message);
}
//测试对应 1.2 支持java对象格式的转换器
@Test
public void testJavaEntityMessage() throws JsonProcessingException {
OrderEntity order = new OrderEntity();
order.setId("0002");
order.setName("订单消息");
order.setContent("小笼包+豆浆+茶叶蛋");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writevalueAsString(order);
System.out.println("java对象:"+json);
MessageProperties properties = new MessageProperties();
//这里注意一定要修改contentType为 application/json
properties.setContentType("application/json");
properties.getHeaders().put("__TypeId__","com.lc.study.spring.OrderEntity");
Message message = new Message(json.getBytes(), properties);
rabbitTemplate.convertAndSend("topic001","spring.entity",message);
}
//测试对应 1.3 支持java多对象映射的转换器
@Test
public void testJavaEntityMapperMessage() throws JsonProcessingException {
OrderEntity order = new OrderEntity();
order.setId("0003");
order.setName("订单消息");
order.setContent("小笼包+豆浆+茶叶蛋");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writevalueAsString(order);
System.out.println("java对象:"+json);
MessageProperties properties = new MessageProperties();
//这里注意一定要修改contentType为 application/json
properties.setContentType("application/json");
properties.getHeaders().put("__TypeId__","order");
Message message = new Message(json.getBytes(), properties);
rabbitTemplate.convertAndSend("topic001","spring.entity",message);
PackageEntity pack = new PackageEntity();
pack.setId("0001");
pack.setName("快递包裹信息");
pack.setDescription("你的快递已送达");
String json2 = mapper.writevalueAsString(pack);
System.out.println("json2对象:"+json2);
MessageProperties properties2 = new MessageProperties();
//这里注意一定要修改contentType为 application/json
properties2.setContentType("application/json");
properties2.getHeaders().put("__TypeId__","package");
Message message2 = new Message(json2.getBytes(), properties2);
rabbitTemplate.convertAndSend("topic001","spring.entity2",message2);
}
//测试对应 1.4 text/image/pdf 的转换器
@Test
public void testTextJsonImagePdfMessage() throws Exception {
byte[] body1 = Files.readAllBytes(Paths.get("C:/Users/24669/Downloads", "picture.png"));
MessageProperties properties1 = new MessageProperties();
properties1.setContentType("image/png");
properties1.getHeaders().put("extName","png");
Message message1 = new Message(body1, properties1);
rabbitTemplate.convertAndSend("topic001","spring.png",message1);
byte[] body2 = Files.readAllBytes(Paths.get("C:/Users/24669/Downloads", "test.pdf"));
MessageProperties properties2 = new MessageProperties();
properties2.setContentType("application/pdf");
Message message2 = new Message(body2, properties2);
rabbitTemplate.convertAndSend("topic001","spring.pdf",message2);
}
4.8-RabbitMQ与SpringBoot2.0整合
生产端
publisher-/confirm/is 实现一个监听器,用于监听Broker端给我们返回的确认请求 RabbitTemplate./confirm/iCallback
publisher-returns 保证消息对于Broker端可达,如果出现路由键不可达的情况,则使用监听器对不可达消息进行后续处理,保证消息的路由成功RabbitTemplate.ReturnCallback
在发送消息的时候,对template进行配置mandatory = true 保证监听有效
当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会将消息返回给生产者;当mandatory设置为false时,出现上述情形broker会直接将消息扔掉。
// pom文件spring-boot-starter-parent org.springframework.boot 2.3.4.RELEASE org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-test
// 配置application.yml
server:
port: 5555
spring:
rabbitmq:
addresses: 192.168.2.58:5672
username: guest
password: guest
virtual-host: /
publisher-/confirm/i-type: correlated
publisher-returns: true
template:
mandatory: true
// 声明式创建元数据(交换机、队列、绑定)
@Configuration
@ComponentScan(value = {"com.lc.producer.*"})
public class ProducerConfig {
//声明交换机和队列,以及绑定
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("springboot.exchange1", true, false);
}
@Bean
public Queue queue1() {
//如果你想创建一个只有自己可见的队列,即不允许其它用户访问,RabbitMQ允许你将一个Queue声明成为排他性的(Exclusive Queue)。
//1-只对首次声明它的连接(Connection)可见;2-会在其连接断开的时候自动删除。
// String name, boolean durable, boolean exclusive, boolean autoDelete
return new Queue("spring.boot.queue1",true,false,false);
}
@Bean
public Binding binding1() {
return BindingBuilder.bind(queue1()).to(topicExchange()).with("springboot.#");
}
}
// 自定义消费发送类,重写rabbitTemplate
@Component
public class RabbitSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RabbitConfirmCallback rabbit/confirm/iCallback;
@Autowired
private RabbitReturnCallback rabbitReturnCallback;
public void sendMessage(Object message, MessageProperties properties) {
Message msg = new Message(message.toString().getBytes(), properties);
rabbitTemplate.setMandatory(true);
//消息从 producer 到 rabbitmq broker有一个 confirmCallback 确认模式。
//消息从 exchange 到 queue 投递失败有一个 returnCallback 退回模式。
rabbitTemplate.set/confirm/iCallback(rabbit/confirm/iCallback);
rabbitTemplate.setReturnCallback(rabbitReturnCallback);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 第一次测试,使用正确的routingkey;第二次测试,使用错误的routingkey
rabbitTemplate.convertAndSend("springboot.exchange1", "springboot.a", msg, correlationData);
}
}
// 消息/confirm/iCallBack类,确认模式
@Component
public class Rabbit/confirm/iCallback implements RabbitTemplate./confirm/iCallback {
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("correlationdata:" + correlationData);
System.err.println("ack:" + ack);
if (!ack) {
System.err.println("异常处理..."+cause);
} else {
System.err.println("消息已送达");
}
}
}
// 消息 returnCallBack类,退回模式
@Component
public class RabbitReturnCallback implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.err.println("消息未达的原因:" + s);
}
}
// 测试类
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitMqApplication.class)
public class ProducerTest {
@Autowired
private RabbitSender rabbitSender;
private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Test
public void testSender() throws Exception {
MessageProperties properties = new MessageProperties();
properties.getHeaders().put("key","1231231");
properties.getHeaders().put("send_time",simpleDateFormat.format(new Date()));
rabbitSender.sendMessage("springBoot+rabbitmq",properties);
}
}
消费端
手动签收数据
消费端使用@RabbitMQListener注解
server:
port: 5556
spring:
rabbitmq:
addresses: 192.168.2.58:5672
username: guest
password: guest
virtual-host: /
listener:
simple:
acknowledge-mode: manual # 手动签收消息
@Component
public class RabbitReceiver {
// 使用注解进行时间监听绑定,也可以用作创建元数据
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "springboot.queue1",durable = "true"),
exchange = @Exchange(value = "springboot.exchange1",durable = "true",type = "topic",ignoreDeclarationExceptions = "true"),
key = "springboot.#"
))
@RabbitHandler
public void onMessage(Message message, Channel channel) throws IOException {
System.out.println("消费端:"+ new String((byte[]) message.getPayload()));
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
// 手动ACK
channel.basicAck(deliveryTag,false);
}
}
4.9-RabbitMQ与SpringCloud Stream整合
支持消息的生产与发送,可以是不同的消息中间件(Rabbitmq---->Spring Cloud Stream---->Kafka)SpringCloud Stream整合消息中间件,会有输入端和输出端,中间绑定消息中间件可以是Rabbitmq,Kafka,相当于重写了出、入接口@Output 输出注解,用户定义发送消息接口@Input 输入注解,用于定义消费接口@StreamListener 用于定义监听方法注解缺点:不能100%实现消息的可靠性投递,会存在少量消息丢失问题。因为要兼容Kafka



