栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ六种工作模式—发布订阅模式

RabbitMQ六种工作模式—发布订阅模式

1.发布订阅模式下,消息会群发给所有的消费者,同一条消息所有的消费者都可以接收到

2.交换机:fanout交换机

3.生产者:定义交换机,向交换机发送消息

4.消费者:

(1)定义交换机

(2)定义随机队列

(3)与交换机绑定

(4)接收消息

 5.发布订阅模式实战

package org.example.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;


public class Producer {
    // 交换机名称
    public static final String EXCHANGE_NAME="fanout_logs";
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("112.124.16.34");
        factory.setUsername("admin");
        factory.setPassword("123456");
        factory.setPort(5672);
        Connection connection=factory.newConnection();
        Channel channel=connection.createChannel();
        
        Scanner s=new Scanner(System.in);
        while(s.hasNext()){
            String message=s.next();
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
        }
        System.out.println("发送完毕");
        // 关闭通道和连接
        channel.close();
        // ***关闭连接***
        connection.close();
    }
}
package org.example.fanout;

import com.rabbitmq.client.*;


public class ReceiveLogs01 {
    // 交换机的名称
    public static final String EXCHANGE_NAME="fanout_logs";
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("112.124.16.34");
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection= factory.newConnection();
        Channel channel=connection.createChannel();
        // 声明一个fanout交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        // 声明一个队列 非持久化,独占,自动删除
        // 在Java客户端中,当我们不向queueDeclare()提供任何参数时,会创建一个具有
        // 生成名称(随机)、非持久化的、独占的、自动删除的队列
        String queueName=channel.queueDeclare().getQueue();
        // 绑定
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        
        // 接收和处理消息的回调对象
        DeliverCallback deliverCallback=(consumerTag, message)->{
            String mes=new String(message.getBody(),"UTF-8");
            System.out.println("logs01接收到消息:"+mes);
        };
        // 消费者取消时的回调对象
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println("消费被中断");
        };
        channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
        // channel.close();
        // ***关闭连接***
        // connection.close();
    }
}
package org.example.fanout;

import com.rabbitmq.client.*;

public class ReceiveLogs02 {
    // 交换机的名称
    public static final String EXCHANGE_NAME="fanout_logs";
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("112.124.16.34");
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection= factory.newConnection();
        Channel channel=connection.createChannel();
        // 声明一个fanout交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        // 声明一个队列 非持久化,独占,自动删除
        String queueName=channel.queueDeclare().getQueue();
        // 绑定
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        
        // 接收和处理消息的回调对象
        DeliverCallback deliverCallback=(consumerTag, message)->{
            String mes=new String(message.getBody(),"UTF-8");
            System.out.println("logs02接收到消息:"+mes);
        };
        // 消费者取消时的回调对象
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println("消费被中断");
        };
        channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
        // channel.close();
        // ***关闭连接***
        // connection.close();
    }
}

发送第二条消息的时候启动消费者2 ,只接收当前的最新消息

 

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/344478.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号