栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

消息中间件之RabbitMQ(二):几种运行模式

消息中间件之RabbitMQ(二):几种运行模式

注意:同一个交换机或队列的声明位置可以在消费者,也可以在生产者,也可以重复声明,如果使用了SpringBoot整合后,就可统一配置到配置类中

RabbitMQ的5种运行模式
    简单队列模式
      使用默认交换机1个生产者+1个消费者
    worker模式
      使用默认交换机1个生产者,多个消费者单个消息只能被1个消费者消费多个消息默认是类似轮询的方式发给多个消费者
    订阅模式(fanout)
      生产者数量>=1,消费者数量>1每一个消费者都有自己的一个队列生产者发送的消息,经过交换机,到达队列,实现一个消息被多个消费者获取的目的
    路由模式(direct)
      同订阅模式,区别是:不再广播,而是根据routingKey路由到指定的消费者多个队列的routingKey可以是一样的,这样就变成了fanout
    主题模式(topic)
      同路由模式,区别是:routingKey添加了通配符概念*可以代替一个单词,#可以替代零个或多个单词
        最后一个单词是 rabbit 的 3 个单词 (*.*.rabbit)第一个单词是 lazy 的多个单词 (lazy.#)
      当一个队列routingKey是#,那么这个队列将接收所有数据,变成 fanout 了如果routingKey没有#和*出现,那么就变成 direct 了

参考:RabbitMQ的五种工作模式

简单队列模式

worker模式也可以用此代码测试生产者核心代码:

//获取信道
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

消费者核心代码:

Channel channel = connection.createChannel();

channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
发布-订阅模式(fanout)
//生产者核心代码
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
channel.basicPublish(EXCHANGE_NAME, "", null, message);

//消费者核心代码
channel.queueBind(queueName, EXCHANGE_NAME, "");//fanout模式,和routingKey无关
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
路由模式(direct)
//生产者核心代码
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message);

//消费者核心代码
channel.queueBind(queueName, EXCHANGE_NAME, "error");//指定routingKey=error
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
主题模式(topic)
//生产者核心代码
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message);
//消费者核心代码
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
路由模式——SpringBoot方式整合

配置类

@Bean
public DirectExchange directExchange() {
    // 支持持久化
    return new DirectExchange(DIRECT_EXCHANGE_NAME, true, false);
}


@Bean
public Queue test1Queue() {
    // 支持持久化
    return new Queue(QUEUE_TEST1, true);
}


@Bean
public Binding test1Binding(DirectExchange directExchange, Queue test1Queue) {
    //这样写spring会自动按名称注入
    return BindingBuilder.bind(test1Queue).to(directExchange).with(ROUTING_KEY_TEST1);//指定routingKey
}



    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1", 5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setPublisher/confirm/iType(CachingConnectionFactory./confirm/iType.SIMPLE);//设置发布确认类型
        return connectionFactory;
    }


@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    return new RabbitTemplate(connectionFactory);
}

生产者rabbitTemplate还有很多send方法,如:

sendAndReceive();convertAndSend();convertSendAndReceive();convertSendAndReceiveAsType();

MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//消息持久化
Message message = new Message("你好,这是发给test1队列的消息".getBytes(), messageProperties);
rabbitTemplate.send(DIRECT_EXCHANGE_NAME, ROUTING_KEY_TEST1, message);

消费者

@RabbitListener(queues = QUEUE_TEST1)
public void consumeMessage1(Message message) {
    System.out.println("test1队列收到:======" + new String(message.getBody(), StandardCharsets.UTF_8));
}
完整代码

完整代码:GitHub

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/775160.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号