栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ【Fanout Exchange交换机模式】【Headers Exchange交换机模式】实现

RabbitMQ【Fanout Exchange交换机模式】【Headers Exchange交换机模式】实现

RabbitMQ【Fanout Exchange交换机模式】实现

Fanout Exchange交换机模式类似于广播模式,相比于Topic模式来讲,没有了匹配符号,因此发送消息也就少了很多的匹配机制。

1.Springboot集成RabbitMQ

   
		org.springframework.boot  
		spring-boot-starter-amqp 
 

配置依赖:

#rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
#u6D88u8D39u8005u6570u91CF消费者数量
spring.rabbitmq.listener.simple.concurrency= 10
spring.rabbitmq.listener.simple.max-concurrency= 10
#u6D88u8D39u8005u6BCFu6B21u4ECEu961Fu5217u83B7u53D6u7684u6D88u606Fu6570u91CF
spring.rabbitmq.listener.simple.prefetch= 1
#u6D88u8D39u8005u81EAu52A8u542Fu52A8
spring.rabbitmq.listener.simple.auto-startup=true
#u6D88u8D39u5931u8D25uFF0Cu81EAu52A8u91CDu65B0u5165u961F
spring.rabbitmq.listener.simple.default-requeue-rejected= true
#u542Fu7528u53D1u9001u91CDu8BD5重置
spring.rabbitmq.template.retry.enabled=true 
spring.rabbitmq.template.retry.initial-interval=1000 
spring.rabbitmq.template.retry.max-attempts=3
spring.rabbitmq.template.retry.max-interval=10000
spring.rabbitmq.template.retry.multiplier=1.0

2.创建RabbitMQ的相关配置package

MQConfig.class(RabbitMQ配置类)
MQSender.class(RabbitMQ发送者类)
MQReceiver.class(RabbitMQ消息接收者类)

3.MQConfig.class配置类

@Configuration
public class MQConfig {
	//声明两个消息队列标识
	public static final String TOPTIC_QUEUE1="topicqueue1";
	public static final String TOPIC_QUEUE2="topicqueue2";
	//创建一个Fanout交换机标识
	public static final String FANOUT_EXCHANGE="fanoutExchage";
	
	//创建两个消息队列
	@Bean
	public Queue topicQueue1(){
		return new Queue(TOPTIC_QUEUE1,true);
	}
	@Bean
	public Queue topicQueue2(){
		return new Queue(TOPIC_QUEUE2,true);
	}

	//创建一个Fanout交换机(广播交换机)
	@Bean
	public FanoutExchange fanoutExchange(){
		return new FanoutExchange(FANOUT_EXCHANGE);
	}
	
	//将两个消息队列进行与广播交换机进行绑定
	//to(fanoutExchange()后面不需要携带匹配符
	@Bean
	public Binding fanoutBinding1(){
		return BindingBuilder.bind(topicQueue1()).to(fanoutExchange());
	}
	@Bean
	public Binding fanoutBinding2(){
		return BindingBuilder.bind(topicQueue2()).to(fanoutExchange());
	}
}

4.创建消息发送者信息类MQSender.class

@Service
public class MQSender {
	//通过log.oinfo进行控制台日志标记的打印
	private static Logger log = LoggerFactory.getLogger(MQSender.class);

	//注入Rabbitmq的amqp引擎
	@Autowired
	AmqpTemplate amqpTemplate;

		//fanout模式下的交换方法
	public void sendFanout(String message){
		log.info("fanout模式下MQSender发送出去的信息:"+message);
		//发送两条消息
		amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE,"",message+"1");
		amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE,"",message+"2");
	}
}

5.创建MQReceiver.class消息接收者信息类

@Service
public class MQReceiver {
	//log进行消息打印操作
	private static Logger log = LoggerFactory.getLogger(MQReceiver.class);
	//实现对两个消息队列进行监听
	@RabbitListener(queues=MQConfig.TOPTIC_QUEUE1)
	public void receiverTopinc1(String message){
		log.info("topicQueue1队列接受的信息:"+message);
	}
	
	@RabbitListener(queues=MQConfig.TOPIC_QUEUE2)
	public void receiverTopic2(String message){
		log.info("topicQueue2队列接受的信息:"+message);
	}
}

6.编写测试Controller

@Controller
@RequestMapping("/demo")
public class SampleController {
	//注入消息发送者实体类
	@Autowired
	MQSender mqSender;
	
	
    @RequestMapping("/mqFanout")
    @ResponseBody
    public String home() {
    	//发送fanout广播模式的信息
    	mqSender.sendFanout("hello RabbitMQ---Fanout模式");
        return "success";
    }

}

7.测试

浏览器输入:

http://localhost:8080/demo/mqFanout

得到的控制台信息:

.MQSender: fanout模式下MQSender发送出去的信息:hello RabbitMQ---Fanout模式
rabbitmq.MQReceiver    : topicQueue1队列接受的信息:hello RabbitMQ---Fanout模式1
rabbitmq.MQReceiver    : topicQueue2队列接受的信息:hello RabbitMQ---Fanout模式2
rabbitmq.MQReceiver    : topicQueue2队列接受的信息:hello RabbitMQ---Fanout模式1
rabbitmq.MQReceiver    : topicQueue1队列接受的信息:hello RabbitMQ---Fanout模式2

结合上述的控制台反馈结果可以得到:

Fanout模式下,多条信息的发送,能够实现与该Fanout交换机所绑定的消息队列的消息的传输,能够保证所绑定的消息队列都可以接收到发送者的发送信息。

RabbitMQ【Headers Exchange交换机模式】实现

1.Springboot集成RabbitMQ实现
内容参考前一篇,RabbitMQ直接交换机和Topic交换机模式实现

2.MQConfig.class配置类

//Configuration代表配置类
@Configuration
public class MQConfig {
	//声明一个header队列标识
	public static final String HEADERS_QUEUE1="headersqueue1";
	//声明一个headerExchange交换机
	public static final String HEADERS_EXCHANGE="HeadersExchange";

	//创建一个headerQueue的消息队列
	@Bean
	public Queue headersQueue1(){
		return new Queue(HEADERS_QUEUE1,true);
	}
	//创建HeadersExchange交换机
	@Bean
	public HeadersExchange headersExchange(){
		return new HeadersExchange(HEADERS_EXCHANGE);
	}

	//headers交换机绑定headerqueue1消息队列
	@Bean
	public Binding headerBinding(){
	//不同于之前的三种交换机模式,header模式要以Map的格式进行匹配机制
	//whereAll(map).match()进行map内容的匹配
		Map map = new HashMap();
		map.put("header1", "value1");
		map.put("header2", "value2");
		return BindingBuilder.bind(headersQueue1()).to(headersExchange()).whereAll(map).match();
	}
}

3.MQSender.class消息发送类

@Service
public class MQSender {

	private static Logger log = LoggerFactory.getLogger(MQSender.class);
	//redisService实现字符串内容的转化
	@Autowired
	RedisService redisService;
	//amqp协议引擎
	@Autowired
	AmqpTemplate amqpTemplate;
	
		//headersExchange模式下的发送函数
	public void sendHeader(Object message){
		//进行数据内容的转化
		String msg = redisService.beanToString(message);
		log.info("headersExchange模式下MQSender发送的数据:"+msg);
		//第一个参数为字节类型的数据
		MessageProperties messageProperties = new MessageProperties();
		//必须要与之配置的map键值对内容匹配
		messageProperties.setHeader("header1", "value1");
		messageProperties.setHeader("header2", "value2");
		Message obj = new Message(msg.getBytes(), messageProperties);
		
		amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHANGE,"",obj);
	}
	
	//下面是上面的redisService的转化数据类型函数
		public static  String beanToString(T value) {
		if(value == null) {
			return null;
		}
		Class clazz = value.getClass();
		if(clazz == int.class || clazz == Integer.class) {
			 return ""+value;
		}else if(clazz == String.class) {
			 return (String)value;
		}else if(clazz == long.class || clazz == Long.class) {
			return ""+value;
		}else {
			return JSON.toJSONString(value);
		}
	}
}

4.MQReceiver.class消息接收者实现类

@Service
public class MQReceiver {
	private static Logger log = LoggerFactory.getLogger(MQReceiver.class);

	//RabbitListener实现对HEADERS_QUEUE1队列进行监听
	@RabbitListener(queues=MQConfig.HEADERS_QUEUE1)
	public void receiverHeader(byte[] messsage){
		log.info("headerQueue1队列接受的信息"+new String(messsage));
	}
}

5.编写测试请求Controller

@Controller
@RequestMapping("/demo")
public class SampleController {

	//引入相关的Mq发送信息类
	@Autowired
	MQSender mqSender;
	
    @RequestMapping("/mqHeader")
    @ResponseBody
    public String home() {
    	mqSender.sendHeader("hello RabbitMQ---HeadersExchange模式");
        return "success";
    }
}

6.测试

浏览器输入:
http://localhost:8080/demo/mqHeader

控制台相关操作:

.rabbitmq.MQSender : headersExchange模式下MQSender发送的数据:hello RabbitMQ---HeadersExchange模式
.rabbitmq.MQReceiver : headerQueue1队列接受的信息hello RabbitMQ---HeadersExchange模式

因此通过HeadersExchange交换机实现的消息发送需要建立在map内容匹配的情况下进行判断发送。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/741845.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号