Fanout Exchange交换机模式类似于广播模式,相比于Topic模式来讲,没有了匹配符号,因此发送消息也就少了很多的匹配机制。
1.Springboot集成RabbitMQ
org.springframework.boot spring-boot-starter-amqp
配置依赖:
#rabbitmq spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ #u6D88u8D39u8005u6570u91CF消费者数量 spring.rabbitmq.listener.simple.concurrency= 10 spring.rabbitmq.listener.simple.max-concurrency= 10 #u6D88u8D39u8005u6BCFu6B21u4ECEu961Fu5217u83B7u53D6u7684u6D88u606Fu6570u91CF spring.rabbitmq.listener.simple.prefetch= 1 #u6D88u8D39u8005u81EAu52A8u542Fu52A8 spring.rabbitmq.listener.simple.auto-startup=true #u6D88u8D39u5931u8D25uFF0Cu81EAu52A8u91CDu65B0u5165u961F spring.rabbitmq.listener.simple.default-requeue-rejected= true #u542Fu7528u53D1u9001u91CDu8BD5重置 spring.rabbitmq.template.retry.enabled=true spring.rabbitmq.template.retry.initial-interval=1000 spring.rabbitmq.template.retry.max-attempts=3 spring.rabbitmq.template.retry.max-interval=10000 spring.rabbitmq.template.retry.multiplier=1.0
2.创建RabbitMQ的相关配置package
MQConfig.class(RabbitMQ配置类) MQSender.class(RabbitMQ发送者类) MQReceiver.class(RabbitMQ消息接收者类)
3.MQConfig.class配置类
@Configuration
public class MQConfig {
//声明两个消息队列标识
public static final String TOPTIC_QUEUE1="topicqueue1";
public static final String TOPIC_QUEUE2="topicqueue2";
//创建一个Fanout交换机标识
public static final String FANOUT_EXCHANGE="fanoutExchage";
//创建两个消息队列
@Bean
public Queue topicQueue1(){
return new Queue(TOPTIC_QUEUE1,true);
}
@Bean
public Queue topicQueue2(){
return new Queue(TOPIC_QUEUE2,true);
}
//创建一个Fanout交换机(广播交换机)
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(FANOUT_EXCHANGE);
}
//将两个消息队列进行与广播交换机进行绑定
//to(fanoutExchange()后面不需要携带匹配符
@Bean
public Binding fanoutBinding1(){
return BindingBuilder.bind(topicQueue1()).to(fanoutExchange());
}
@Bean
public Binding fanoutBinding2(){
return BindingBuilder.bind(topicQueue2()).to(fanoutExchange());
}
}
4.创建消息发送者信息类MQSender.class
@Service
public class MQSender {
//通过log.oinfo进行控制台日志标记的打印
private static Logger log = LoggerFactory.getLogger(MQSender.class);
//注入Rabbitmq的amqp引擎
@Autowired
AmqpTemplate amqpTemplate;
//fanout模式下的交换方法
public void sendFanout(String message){
log.info("fanout模式下MQSender发送出去的信息:"+message);
//发送两条消息
amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE,"",message+"1");
amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE,"",message+"2");
}
}
5.创建MQReceiver.class消息接收者信息类
@Service
public class MQReceiver {
//log进行消息打印操作
private static Logger log = LoggerFactory.getLogger(MQReceiver.class);
//实现对两个消息队列进行监听
@RabbitListener(queues=MQConfig.TOPTIC_QUEUE1)
public void receiverTopinc1(String message){
log.info("topicQueue1队列接受的信息:"+message);
}
@RabbitListener(queues=MQConfig.TOPIC_QUEUE2)
public void receiverTopic2(String message){
log.info("topicQueue2队列接受的信息:"+message);
}
}
6.编写测试Controller
@Controller
@RequestMapping("/demo")
public class SampleController {
//注入消息发送者实体类
@Autowired
MQSender mqSender;
@RequestMapping("/mqFanout")
@ResponseBody
public String home() {
//发送fanout广播模式的信息
mqSender.sendFanout("hello RabbitMQ---Fanout模式");
return "success";
}
}
7.测试
浏览器输入:
http://localhost:8080/demo/mqFanout
得到的控制台信息:
.MQSender: fanout模式下MQSender发送出去的信息:hello RabbitMQ---Fanout模式 rabbitmq.MQReceiver : topicQueue1队列接受的信息:hello RabbitMQ---Fanout模式1 rabbitmq.MQReceiver : topicQueue2队列接受的信息:hello RabbitMQ---Fanout模式2 rabbitmq.MQReceiver : topicQueue2队列接受的信息:hello RabbitMQ---Fanout模式1 rabbitmq.MQReceiver : topicQueue1队列接受的信息:hello RabbitMQ---Fanout模式2
结合上述的控制台反馈结果可以得到:
RabbitMQ【Headers Exchange交换机模式】实现Fanout模式下,多条信息的发送,能够实现与该Fanout交换机所绑定的消息队列的消息的传输,能够保证所绑定的消息队列都可以接收到发送者的发送信息。
1.Springboot集成RabbitMQ实现
内容参考前一篇,RabbitMQ直接交换机和Topic交换机模式实现
2.MQConfig.class配置类
//Configuration代表配置类
@Configuration
public class MQConfig {
//声明一个header队列标识
public static final String HEADERS_QUEUE1="headersqueue1";
//声明一个headerExchange交换机
public static final String HEADERS_EXCHANGE="HeadersExchange";
//创建一个headerQueue的消息队列
@Bean
public Queue headersQueue1(){
return new Queue(HEADERS_QUEUE1,true);
}
//创建HeadersExchange交换机
@Bean
public HeadersExchange headersExchange(){
return new HeadersExchange(HEADERS_EXCHANGE);
}
//headers交换机绑定headerqueue1消息队列
@Bean
public Binding headerBinding(){
//不同于之前的三种交换机模式,header模式要以Map的格式进行匹配机制
//whereAll(map).match()进行map内容的匹配
Map map = new HashMap();
map.put("header1", "value1");
map.put("header2", "value2");
return BindingBuilder.bind(headersQueue1()).to(headersExchange()).whereAll(map).match();
}
}
3.MQSender.class消息发送类
@Service
public class MQSender {
private static Logger log = LoggerFactory.getLogger(MQSender.class);
//redisService实现字符串内容的转化
@Autowired
RedisService redisService;
//amqp协议引擎
@Autowired
AmqpTemplate amqpTemplate;
//headersExchange模式下的发送函数
public void sendHeader(Object message){
//进行数据内容的转化
String msg = redisService.beanToString(message);
log.info("headersExchange模式下MQSender发送的数据:"+msg);
//第一个参数为字节类型的数据
MessageProperties messageProperties = new MessageProperties();
//必须要与之配置的map键值对内容匹配
messageProperties.setHeader("header1", "value1");
messageProperties.setHeader("header2", "value2");
Message obj = new Message(msg.getBytes(), messageProperties);
amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHANGE,"",obj);
}
//下面是上面的redisService的转化数据类型函数
public static String beanToString(T value) {
if(value == null) {
return null;
}
Class> clazz = value.getClass();
if(clazz == int.class || clazz == Integer.class) {
return ""+value;
}else if(clazz == String.class) {
return (String)value;
}else if(clazz == long.class || clazz == Long.class) {
return ""+value;
}else {
return JSON.toJSONString(value);
}
}
}
4.MQReceiver.class消息接收者实现类
@Service
public class MQReceiver {
private static Logger log = LoggerFactory.getLogger(MQReceiver.class);
//RabbitListener实现对HEADERS_QUEUE1队列进行监听
@RabbitListener(queues=MQConfig.HEADERS_QUEUE1)
public void receiverHeader(byte[] messsage){
log.info("headerQueue1队列接受的信息"+new String(messsage));
}
}
5.编写测试请求Controller
@Controller
@RequestMapping("/demo")
public class SampleController {
//引入相关的Mq发送信息类
@Autowired
MQSender mqSender;
@RequestMapping("/mqHeader")
@ResponseBody
public String home() {
mqSender.sendHeader("hello RabbitMQ---HeadersExchange模式");
return "success";
}
}
6.测试
浏览器输入:
http://localhost:8080/demo/mqHeader
控制台相关操作:
.rabbitmq.MQSender : headersExchange模式下MQSender发送的数据:hello RabbitMQ---HeadersExchange模式 .rabbitmq.MQReceiver : headerQueue1队列接受的信息hello RabbitMQ---HeadersExchange模式
因此通过HeadersExchange交换机实现的消息发送需要建立在map内容匹配的情况下进行判断发送。



