栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

JMS(RabbitMQ)的进阶三:Topic(routerkey匹配模式)处理

JMS(RabbitMQ)的进阶三:Topic(routerkey匹配模式)处理

   发送方routerkey在bind routerkey的时候传入的是表达式,包含通配符(包含*/#),发送的时候则需要明确指出routerkey(不能含*/#); 接收方则需要包含通配符(*/#)。

1、定义常量
 public final static String SMS_EXCHANGE_TOPIC ="sms-exchange-topic";
 public final static String SMS_QUEUE_TOPIC ="sms-queue-topic";
 public final static String SMS_KEY_TOPIC ="sms.#";
2、发送方 2.a 发送方的初始化
//绑定topic(routerkey匹配)
    if (!jmsDeclare.isQueueExists(SMSConstants.SMS_QUEUE_TOPIC)){
      jmsDeclare.initQueue(SMSConstants.SMS_QUEUE_TOPIC);
      jmsDeclare.bindQueue(SMSConstants.SMS_QUEUE_TOPIC,SMSConstants.SMS_EXCHANGE_TOPIC,SMSConstants.SMS_KEY_TOPIC,null);
    }
2.b 发送方操作
@Test
  public void testBroadcast(){
    receiveSMSService.setOnMessage((s,sms)->{
      System.out.println(s+" 接收:"+sms.toString());
    });
    SMSBean sms=SMSUtil.genSMS();
    sms.setSubject("这是发布的消息,欢迎订阅");
    sendSMSService.broadcastSMS(sms);
    sms.setSubject("订阅测试2");
	//routerkey只要是sms.开关就行  	  
	amqpTemplate.convertAndSend(SMSConstants.SMS_EXCHANGE_TOPIC,"sms.abcdef",sms);
    SMSUtil.sleep(2000);
  }
3、接收方
//订阅接收 通过topic交换机发出的匹配SMS_KEY_TOPIC键值的都能接收
  @RabbitListener(bindings = @QueueBinding(value=@Queue(SMSConstants.SMS_QUEUE_TOPIC),exchange = @Exchange(value = SMSConstants.SMS_EXCHANGE_TOPIC,
          type = ExchangeTypes.TOPIC),key = SMSConstants.SMS_KEY_TOPIC))
  @RabbitHandler
  protected void onBroadcastReceive(Message message, SMSBean sms, Channel channel) throws IOException {
    log.info("=========onBroadcastReceive已接收===========");
    if (consumer!=null) {
      consumer.accept(SMSConstants.SMS_KEY_TOPIC+"-"+SMSConstants.SMS_QUEUE_TOPIC, sms);
    }
  }

测试结果:

2022-01-10 11:42:59.513  INFO 5180 --- [ntContainer#4-1] c.f.j.r.service.ReceiveSMSServiceImp     : =========onBroadcastReceive已接收===========
sms.#-sms-queue-topic 接收:SMSBean(subject=这是发布的消息,欢迎订阅, content=this is a sms message, from=sender1, to=sendto1, timestamp=Mon Jan 10 11:42:59 CST 2022)
2022-01-10 11:43:00.596  INFO 5180 --- [ntContainer#4-1] c.f.j.r.service.ReceiveSMSServiceImp     : =========onBroadcastReceive已接收===========
sms.#-sms-queue-topic 接收:SMSBean(subject=订阅测试2, content=this is a sms message, from=sender1, to=sendto1, timestamp=Mon Jan 10 11:42:59 CST 2022)

代码链接,觉得有用的话请点个Star: https://gitee.com/tigera15/jms-samples

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/701660.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号