栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java > SpringBoot

跟我学习Java Spring boot 整合RabbitMQ(四):路由(Routing)-B2B2C小程序电子商务

SpringBoot 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

跟我学习Java Spring boot 整合RabbitMQ(四):路由(Routing)-B2B2C小程序电子商务

在本文中,我们将实现另一个功能 —— 只订阅一部分消息。例如,我们只需要把严重的错误日志信息写入日志文件(存储到磁盘),但同时仍然把所有的日志信息输出到控制台中

了解springcloud架构可以加求求:三五三六二四七二五九

绑定(Binding)

在之前的例子中,我们已经创建了绑定。可以在我们的 Tut3Config 文件中回忆一下这样的代码:

@Bean
public Binding binding1(FanoutExchange fanout, Queue autoDeleteQueue1) {
    return BindingBuilder.bind(autoDeleteQueue1).to(fanout);
}

绑定(binding)是指交换器(exchange)和队列(queue)的关系。可以简单理解为:这个队列(queue)对这个交换器(exchange)的消息感兴趣。

绑定可以使用一个额外的参数 routingKey。我们将交换器和队列传入到 BindingBuilder,并将 routingKey 绑定到交换器,如下所示:

@Bean
public Binding binding1a(DirectExchange direct, Queue autoDeleteQueue1) {
    return BindingBuilder.bind(autoDeleteQueue1)
        .to(direct)
        .with("orange");
}

routingKey 含义取决于交换类型。比如我们之前使用的 fanout exchange,会忽略它的值。

发布消息

我们将使用以上这个模型作为我们的路由系统,将消息发送到 direct exchange 而不是 fanout exchange。我们将使用颜色作为路由键,这样消费者将能通过选择想要接收(或订阅)的颜色来消费对应的消息。


我们在 Tut4Config 中做一些 Spring 启动配置,需要先建立一个交换器

@Bean
public DirectExchange direct() {
    return new DirectExchange("tut.direct");
}

接收消息的方式与上一个教程中的一样,但也有一些不同 —— 我们需要为每个感兴趣的颜色创建一个新的绑定。
了解springcloud架构可以加求求:三五三六二四七二五九

@Bean
public Binding binding1a(DirectExchange direct, Queue autoDeleteQueue1) {
    return BindingBuilder.bind(autoDeleteQueue1)
        .to(direct)
        .with("orange");

生产者

public class Tut4Sender {
   @Autowird 
    private AmqpTemplate template;
    @Autowird
    private DirectExchange direct;
    private int index;
    private int count;
    private final String[] keys = {"orange", "black", "green"};
    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void send() {
        StringBuilder builder = new StringBuilder("Hello to ");
        if (++this.index == 3) {
            this.index = 0;
        }
        String key = keys[this.index];
        builder.append(key).append(' ').append(Integer.toString(++this.count));
        String message = builder.toString();
        template.convertAndSend(direct.getName(), key, message);
        System.out.println(" [x] Sent '" + message + "'");
    }
}

消费者

public class Tut4Receiver {
    @RabbitListener(queues = "#{autoDeleteQueue1.name}")
    public void receiver1(String in) throws InterruptedException {
        receiver(in, 1);
    }
    @RabbitListener(queues = "#{autoDeleteQueue2.name}")
    public void receiver2(String in) throws InterruptedException {
        receiver(in, 2);
    }
    private void receiver(String in, int instance) throws InterruptedException {
        StopWatch watch = new StopWatch();
        watch.start();
        System.out.println("instance " + instance + " [x] Received '" + in + "'");
        doWork(in);
        watch.stop();
        System.out.println("instance " + instance + " [x] Done in " +
                watch.getTotalTimeSeconds() + "s");
    }
    private void doWork(String in) throws InterruptedException {
        for (char ch : in.toCharArray()) {
            if (ch == '.') {
                Thread.sleep(1000);
            }
        }
    }
}

配置类

@Profile({"tut4", "routing"})
@Configuration
public class Tut4Config {
    @Bean
    public DirectExchange direct() {
        return new DirectExchange("tut.direct");
    }
   @Profile ("receiver")
    private static class ReceiverConfig {
        @Bean
        public Queue autoDeleteQueue1() {
            return new AnonymousQueue();
        }
        @Bean
        public Queue autoDeleteQueue2() {
            return new AnonymousQueue();
        }
        @Bean
        public Binding binding1a(DirectExchange direct, Queue autoDeleteQueue1) {
            return BindingBuilder.bind(autoDeleteQueue1)
                    .to(direct)
                    .with("orange");
        }
        @Bean
        public Binding binding1b(DirectExchange direct, Queue autoDeleteQueue1) {
            return BindingBuilder.bind(autoDeleteQueue1)
                    .to(direct)
                    .with("black");
        }
        @Bean
        public Binding binding2a(DirectExchange direct, Queue autoDeleteQueue2) {
            return BindingBuilder.bind(autoDeleteQueue2)
                    .to(direct)
                    .with("green");
        }
        @Bean
        public Binding binding2b(DirectExchange direct, Queue autoDeleteQueue2) {
            return BindingBuilder.bind(autoDeleteQueue2)
                    .to(direct)
                    .with("black");
        }
       @Bean 
        public Tut4Receiver receiver() {
            return new Tut4Receiver();
        }
    }
   @Profile("sender")
    @Bean
    public Tut4Sender sender() {
        return new Tut4Sender();
    }
}


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/234591.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号