栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ的相关操作2--轮训分发消息

RabbitMQ的相关操作2--轮训分发消息

目录
  • 1. 创建SpringBoot项目,引入相关依赖
  • 2. 为了减少重复代码书写,编写连接工厂创建信道的工具类
  • 3. 编写生产者代码
  • 4. 编写消费者代码
  • 5. 运行代码测试

1. 创建SpringBoot项目,引入相关依赖
    
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
            commons-io
            commons-io
            2.6
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
            
                
                    org.junit.vintage
                    junit-vintage-engine
                
            
        
        
            org.springframework.amqp
            spring-rabbit-test
            test
        
    
2. 为了减少重复代码书写,编写连接工厂创建信道的工具类
public class RabbitMqUtils {
    public static Channel getChannel() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("120.24.192.216");
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}
3. 编写生产者代码
public class Task01 {
    public static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //从控制台中接收信息发送
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("发送消息完成:"+message);
        }
    }
}

4. 编写消费者代码
public class Worker01 {
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //消息的接收
        DeliverCallback deliverCallback = (consumerTag,message) ->{
            System.out.println("接收到的消息:"+new String(message.getBody()));
        };
        //消息接收被取消时执行的内容
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("消息者取消消息消费接口回调逻辑");
        };
        System.out.println("C2等待接收消息......");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}
5. 运行代码测试

多进程运行两个消费者代码,首先设置idea可以多进程运行,选中Allow multiple instances

运行生产者代码,并手动输入需要发送的消息

观察消费者接收消息的情况

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/612598.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号