栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

03-RabbitMQ-Hello world

03-RabbitMQ-Hello world

我们简单的入门一下我们的Rabbit,使用java完成一个生产者,一个消费者,完成先关的操作。

1、工作原理图

2、创建一个Maven项目,导入对应的依赖

        
        
            com.rabbitmq
            amqp-client
            5.8.0
        
        
        
            commons-io
            commons-io
            2.6
        
    
3、大概流程示意

4、生产者代码
public class Producer {
    // 队列的名字
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        // 1、创建一个连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置连接的主机信息 主机ip,用户名,密码
        connectionFactory.setHost("192.168.115.128");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123");
        // 3、获取一个连接
        Connection connection = connectionFactory.newConnection();
        // 4、获取一个信道
        Channel channel = connection.createChannel();
        // 5、声明一个队列
        
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 6、声明发送的信息
        String message = "hello,world";
        // 7、发送消息
        
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("message 发送结束");
    }
}
5、生产者中代码调用方法解释 5.1、声明队列方法queueDeclare()

该方法是生产者用来声明一个队列,如果该队列不存在就创建一个对列,存在的话就直接使用
方法参数解释:
1、队列的名字
2、队列里面的消息是否支持持计化
3、设置该队列,是否可以供对个消费者消费
4、是否自动删除消息
5、其他参数

   // 5、声明一个队列
        
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
5.2、执行发送消息的方法basicPublish()

basicPublish()方法是生产者,用来推送消息的使用的
参数解释:
1、使用那个交换机
2、推送到那个队列的l路由key
3、其他参数的信息
4、发送消息的消息体

  // 7、发送消息
        
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("message 发送结束");
6、消费者代码
public class Consumer {
    // 队列的名字
    public static final String QUEUE_NAME = "hello";

    //  接受消息
    public static void main(String[] args) throws Exception {
        // 1、创建一个连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置连接的主机信息 主机ip,用户名,密码
        connectionFactory.setHost("192.168.115.128");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123");
        // 3、获取一个连接
        Connection connection = connectionFactory.newConnection();
        // 4、获取一个信道
        Channel channel = connection.createChannel();
        // 5、消费者消费信道
        //声明 接受消息的回调函数时接口对象
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(message.getBody());
        };
        // 取消消息时的回调函数时接口对象
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("消息消费被中断");
        };

        
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
6.1、回调函数DeliverCallback

我们消费消息时,我们需要从消息队列中取到对应的消息,但是存在如何消费消息的问题,这个回调函数就是用来完成对消息的如何消费的。

 //声明 接受消息的回调函数时接口对象
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
        };
6.2、回调函数CancelCallback

消息消费取消时的回调函数

 // 取消消费消息时的回调函数时接口对象
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("消息消费被中断");
        };
6.3、basicConsume()参数解释
  
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
7、演示

启动我们的生产者代码,启动我们的消费者代码
查看消息发送与消息的消费。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/278503.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号