栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SpringBoot与RabbitMQ的整合

SpringBoot与RabbitMQ的整合

文章目录[隐藏]
  • Publish/Subscribe(发布订阅模式)
  • Rounting(路由模式)
  • Topics(通配符模式)
  • 源码下载

Publish/Subscribe(发布订阅模式)

以用户注册成功后,同时发送邮件通知和短信通知场景为例,实现整合

基于API的方式 定制消息发送组件
@RunWith(SpringRunner.class)
@SpringBootTest
public class Chapter08ApplicationTests {
    @Autowired
    private AmqpAdmin amqpAdmin;

    
    @Test
    public void amqpAdmin() {
        // 1、定义fanout类型的交换器
        amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));
        // 2、定义两个默认持久化队列,分别处理email和sms
        amqpAdmin.declareQueue(new Queue("fanout_queue_email"));
        amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));
        // 3、将队列分别与交换器进行绑定
        amqpAdmin.declareBinding(new Binding("fanout_queue_email",Binding.DestinationType.QUEUE,"fanout_exchange","",null));
        amqpAdmin.declareBinding(new Binding("fanout_queue_sms",Binding.DestinationType.QUEUE,"fanout_exchange","",null));
    }
}

执行单元测试,访问RabbitMQ的可视化管理界面:

可以看到,出现了一个名为fanout_exchange的交换机,点击查看:

展示有其具体信息,其中,红框内的内容为与之绑定的两个消息队列。切换至Queues面板,查看定制生成的消息的队列信息

消息发送者发送消息

传递一个实体类封装的消息

@Data
public class User {
    private Integer id;
    private String username;
}

测试类中

@Test
public void psubPublisher() {
    User user=new User();
    user.setId(1);
    user.setUsername("石头");
    rabbitTemplate.convertAndSend("fanout_exchange","",user);
}

此时执行测试方法,会出现异常:

SimpleMessageConverter only supports String,byte[] and Serializable payloads.

此时可以实现序列化,或者定制其他类型的消息转化器:

@Configuration
public class RabbitMQConfig {
    
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

再次执行测试,会出现如下效果:

目前未提供消息消费者,因此消息会暂存至队列

消息消费者接收消息
public class RabbitMQService {
    
    @RabbitListener(queues = "fanout_queue_email")
    public void psubConsumerEmail(Message message) {
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println("邮件业务接收到消息: "+s);

    }
    
    @RabbitListener(queues = "fanout_queue_sms")
    public void psubConsumerSms(Message message) {
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println("短信业务接收到消息: "+s);
    }
}

此时启动整个项目,就会对所监听的队列:“fanout_queue_email”和“fanout_queue_sms”中的消息进行消费,效果即为打印至控制台。

至此,使用api方式进行一条信息的发送、消息中间件的存储,以及消息消费的发布/订阅工作模式的业务案例已经实现

基于配置类的方式
@Configuration
public class RabbitMQConfig {
    
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    
    // 1、定义fanout类型的交换器
    @Bean
    public Exchange fanout_exchange() {
        return ExchangeBuilder.fanoutExchange("fanout_exchange").build();
    }

    // 2、定义两个不同名称的消息队列
    @Bean
    public Queue fanout_queue_email() {
        return new Queue("fanout_queue_email");
    }

    @Bean
    public Queue fanout_queue_sms() {
        return new Queue("fanout_queue_sms");
    }

    // 3、将两个不同名称的消息队列与交换器进行绑定
    @Bean
    public Binding bindingEmail() {
        return BindingBuilder.bind(fanout_queue_email()).to(fanout_exchange()).with("").noargs();
    }

    @Bean
    public Binding bindingSms() {
        return BindingBuilder.bind(fanout_queue_sms()).to(fanout_exchange()).with("").noargs();
    }

}

该步骤完成了消息组建的定制,此外,还需进行消息发送者和消息消费者的编写,上一种方式已经实现,可以直接使用,测试效果完全相同

基于注解的方式
@Service
public class RabbitMQService {
//    
//    @RabbitListener(queues = "fanout_queue_email")
//    public void psubConsumerEmail(Message message) {
//        byte[] body = message.getBody();
//        String s = new String(body);
//        System.out.println("邮件业务接收到消息: "+s);
//
//    }
//    
//    @RabbitListener(queues = "fanout_queue_sms")
//    public void psubConsumerSms(Message message) {
//        byte[] body = message.getBody();
//        String s = new String(body);
//        System.out.println("短信业务接收到消息: "+s);
//    }

    
    @RabbitListener(bindings =@QueueBinding(value =@Queue("fanout_queue_email"), exchange =@Exchange(value = "fanout_exchange",type = "fanout")))
    public void psubConsumerEmailAno(User user) {
        System.out.println("邮件业务接收到消息: "+user);
    }
    
    @RabbitListener(bindings =@QueueBinding(value =@Queue("fanout_queue_sms"),exchange =@Exchange(value = "fanout_exchange",type = "fanout")))
    public void psubConsumerSmsAno(User user) {
        System.out.println("短信业务接收到消息: "+user);
    }
}

该方式中,@RabbitListener定制了消息组件的消费者,其运行效果完全相同

Rounting(路由模式) 基于注解方式定制消息组件和消息消费者
@RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_error"),
        exchange = @Exchange(value = "routing_exchange", type = "direct"),
        key = "error_routing_key"))
public void routingConsumerError(String message) {
    System.out.println("接收到error级别日志消息: " + message);
}


@RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_all"),
        exchange = @Exchange(value = "routing_exchange", type = "direct"),
        key = {"error_routing_key", "info_routing_key", "warning_routing_key"}))
public void routingConsumerAll(String message) {
    System.out.println("接收到info、error、warning等级别日志消息: " + message);
}

注解定制了路由模式下的消息服务组件,Rounting路由模式下,交换器类型type属性为direct,且必须指定key属性(每个消息队列可以映射多个路由键)

消息发送者发送消息

使用RabbitTemplate实现Rounting路由模式下的消息发送:

@Test
public void routingPublisher() {
    String message = "This is a error message";
    rabbitTemplate.convertAndSend("routing_exchange", "error_routing_key", message);
}

执行测试方法,效果如下:

修改rountKey及信息,如下,再次测试

@Test
public void routingPublisher2() {
    String message = "This is a info message";
    rabbitTemplate.convertAndSend("routing_exchange", "info_routing_key", message);
}

也自动生成了Rounting路由模式下的消息组件,并进行了自动绑定:

Topics(通配符模式) 使用基于注解的方式定制消息组件和消息消费者
@RabbitListener(bindings = @QueueBinding(value = @Queue("topic_queue_email"),
        exchange = @Exchange(value = "topic_exchange",
                type = "topic"), key = "info.#.email.#"))
public void topicConsumerEmail(String message) {
    System.out.println("接收到邮件订阅需求处理消息: " + message);
}


@RabbitListener(bindings = @QueueBinding(value = @Queue("topic_queue_sms"),
        exchange = @Exchange(value = "topic_exchange",
                type = "topic"), key = "info.#.sms.#"))
public void topicConsumerSms(String message) {
    System.out.println("接收到短信订阅需求处理消息: " + message);
}

使用方式与Rounting基本一致,主要是将交换机的类型type修改为topic,以及使用通配符的样式指定路由键。

消息发送者发送消息

使用RabbitTemplate实现Topics通配符模式下的消息发送:

    @Test
    public void topicPublisher() {
        // 1、只发送邮件订阅用户消息
//        rabbitTemplate.convertAndSend("topic_exchange","info.email","topics send  email message");
        // 2、只发送短信订阅用户消息
//     rabbitTemplate.convertAndSend("topic_exchange","info.sms","topics send  sms message");
        // 3、发送同时订阅邮件和短信的用户消息
        rabbitTemplate.convertAndSend("topic_exchange", "info.email.sms", "topics send  email and sms message");
    }

测试结果类似,不在展示,同时也进行了自动绑定

源码下载

文件下载

  文件名称:本节代码.zip  文件大小:2.62MB
  下载声明:本站文件大多来自于网络,仅供学习和研究使用,不得用于商业用途,如有版权问题,请联系博主!
  下载地址:点击下载
提取码

注意:本段内容须“登录”后方可查看!


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/350191.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号