说明:在SpringAMQP的发送中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。
ctrl+p,查看ConvertAndSend方法中的类型,发现都是Object。
测试:
在之前的配置类中声明一个队列object.queue
package com.yy.comsumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
//yy.fanout 声明交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("yy.fanout");
}
//fanout.queue1 声明第1个队列
@Bean
public Queue fanoutQueue(){
return new Queue("fanout.queue1");
}
//绑定 绑定队列1和交换机
@Bean
public Binding fanoutBinding1(Queue fanoutQueue,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
}
//fanout.queue2 声明第2个队列
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
//绑定 绑定队列2和交换机
@Bean
public Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
@Bean
public Queue objectQueue(){
return new Queue("object.queue");
}
}
重启生效
编写消息发送map
package com.yy.spring;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.HashMap;
import java.util.Map;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue(){
String queueName="simple.queue";
String message="hello,string amqp!";
rabbitTemplate.convertAndSend(queueName,message);
}
@Test
public void testSendMessage2WorkQueue() throws InterruptedException {
String queueName="simple.queue";
String message="hello,message__";
for (int i = 1; i < 50; i++) {
rabbitTemplate.convertAndSend(queueName,message + i);
Thread.sleep(20);
}
}
@Test
public void testSendFanoutExchange() {
//交换机名称
String exchangeName = "yy.fanout";
// 消息
String message = "Hi ya every one!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName,"",message);
}
@Test
public void testSendDirectExchange() {
//交换机名称
String exchangeName = "yy.direct";
// 消息
String message = "Hi ya blue!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName,"blue",message);
}
@Test
public void testSendTopicExchange() {
//交换机名称
String exchangeName = "yy.topic";
// 消息
String message = "Hi ya china NO.1!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName,"china.news",message);
}
@Test
public void testSendObjectQueue() {
Map msg = new HashMap<>();
msg.put("name","张三");
msg.put("age","21");
// 发送消息
rabbitTemplate.convertAndSend("object.queue",msg);
}
}
查看消息,中文被java序列化。
此序列化数据长度太长,有安全等问题,建议调整。
Spring的对消息对象的处理是由org.springframwork.amqp.support.convert.MessageConvert来处理的。而默认实现是simpleMessageConverter,基于JDK的ObjectOutputStream完成序列化的。
如果要修改只需要定义一个MessageConverter类型的Bean即可。推荐用JSON方式序列化,步骤如下:
在父工程添加依赖
com.fasterxml.jackson.core jackson-databind
在publisher服务声明MessageConvert
利用的SpringBoot自动封装的原理,声明的MessageConvert将覆盖默认的MessageConvert。
启动类也是配置类,所以可以放在这里。
package com.yy.spring;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class PublisherApplication {
public static void main(String[] args) {
SpringApplication.run(PublisherApplication.class);
}
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
先删除掉之前的队列中消息,再重启publisher
接收Json
- 在consumer中引入jackson依赖(因为父工程已经引入,此处不需要重复引用)
- 在consumer定义MessageConvert(因为消息发送进行了转换,这里接收反过来也要转换)
- 定义消费者,监听Object.queue队列消息。
消息者定义MessageConvert
package com.yy.comsumer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class);
}
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
定义消费者,监听Object.queue队列消息
package com.yy.comsumer.listener;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Map;
@Component
//声明为bean,让Spring能够找到
public class SpringRabbitListener {
@RabbitListener(queues = "object.queue")
//使用注解,声明队列,交换机,绑定的key,这样就不需要像fanout一样声明配置类
public void listenObjectQueue(Map msg){
System.err.println("接收到Object.queue的消息:" + msg);
}
}
重启消费者后就能打印



