发送方routerkey在bind routerkey的时候传入的是表达式,包含通配符(包含*/#),发送的时候则需要明确指出routerkey(不能含*/#); 接收方则需要包含通配符(*/#)。
1、定义常量public final static String SMS_EXCHANGE_TOPIC ="sms-exchange-topic"; public final static String SMS_QUEUE_TOPIC ="sms-queue-topic"; public final static String SMS_KEY_TOPIC ="sms.#";2、发送方 2.a 发送方的初始化
//绑定topic(routerkey匹配)
if (!jmsDeclare.isQueueExists(SMSConstants.SMS_QUEUE_TOPIC)){
jmsDeclare.initQueue(SMSConstants.SMS_QUEUE_TOPIC);
jmsDeclare.bindQueue(SMSConstants.SMS_QUEUE_TOPIC,SMSConstants.SMS_EXCHANGE_TOPIC,SMSConstants.SMS_KEY_TOPIC,null);
}
2.b 发送方操作
@Test
public void testBroadcast(){
receiveSMSService.setOnMessage((s,sms)->{
System.out.println(s+" 接收:"+sms.toString());
});
SMSBean sms=SMSUtil.genSMS();
sms.setSubject("这是发布的消息,欢迎订阅");
sendSMSService.broadcastSMS(sms);
sms.setSubject("订阅测试2");
//routerkey只要是sms.开关就行
amqpTemplate.convertAndSend(SMSConstants.SMS_EXCHANGE_TOPIC,"sms.abcdef",sms);
SMSUtil.sleep(2000);
}
3、接收方
//订阅接收 通过topic交换机发出的匹配SMS_KEY_TOPIC键值的都能接收
@RabbitListener(bindings = @QueueBinding(value=@Queue(SMSConstants.SMS_QUEUE_TOPIC),exchange = @Exchange(value = SMSConstants.SMS_EXCHANGE_TOPIC,
type = ExchangeTypes.TOPIC),key = SMSConstants.SMS_KEY_TOPIC))
@RabbitHandler
protected void onBroadcastReceive(Message message, SMSBean sms, Channel channel) throws IOException {
log.info("=========onBroadcastReceive已接收===========");
if (consumer!=null) {
consumer.accept(SMSConstants.SMS_KEY_TOPIC+"-"+SMSConstants.SMS_QUEUE_TOPIC, sms);
}
}
测试结果:
2022-01-10 11:42:59.513 INFO 5180 --- [ntContainer#4-1] c.f.j.r.service.ReceiveSMSServiceImp : =========onBroadcastReceive已接收=========== sms.#-sms-queue-topic 接收:SMSBean(subject=这是发布的消息,欢迎订阅, content=this is a sms message, from=sender1, to=sendto1, timestamp=Mon Jan 10 11:42:59 CST 2022) 2022-01-10 11:43:00.596 INFO 5180 --- [ntContainer#4-1] c.f.j.r.service.ReceiveSMSServiceImp : =========onBroadcastReceive已接收=========== sms.#-sms-queue-topic 接收:SMSBean(subject=订阅测试2, content=this is a sms message, from=sender1, to=sendto1, timestamp=Mon Jan 10 11:42:59 CST 2022)
代码链接,觉得有用的话请点个Star: https://gitee.com/tigera15/jms-samples



