栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ工作模式(一)---简单模式和工作模式

RabbitMQ工作模式(一)---简单模式和工作模式

一、简单模式 (1)、消息生产者

步骤1、连接RabbitMQ,创建信道

//创建连接工厂
ConnectionFactory factory = new ConnectionFacory();
factory.setHost("127.0.0.1");
factory.setPort(15672);
factory.setUserName("guest");
factory.setPassword("guest");
//创建连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel();

步骤2、声明队列

//参数1:队列名称
//参数2:队列是否持久化
//参数3:队列中消息是否共享
//参数4:队列中消息是否自动删除
//参数5:其他参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

步骤3:发送消息

String message - "Hello RabbitMQ";
//发送消息
//参数1:交换机名称
//参数2:routing key  这里认为是队列名称
//参数3:其他参数信息
//参数4:消息转化为二进制
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
(二)、消费者

步骤1、连接RabbitMQ,创建信道

//创建连接工厂
ConnectionFactory factory = new ConnectionFacory();
factory.setHost("127.0.0.1");
factory.setPort(15672);
factory.setUserName("guest");
factory.setPassword("guest");
//创建连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel();

步骤二、确认消息回调和取消消息回调

//消费信息回调
DeliverCallback deliverCallback = (consumerTag, message) ->{
    //打印消息
    System.out.println(new String(message.getBody()));
};
//取消消费时,执行下面内容
CancelCallback cancelCallback = consumerTag->{
    //打印取消消息的tag
    System.out.println(consumerTag);
};

步骤3:消费消息

//参数1:消费哪一个队列
//参数2:消费成功后是否要自动应答 true为自动应答,false为手动应答
//参数3:消费者成功消费的回调
//参数4:消费者取消消费的回调
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
二 、工作模式 (1)生产者
//创建连接工厂
ConnectionFactory factory = new ConnectionFacory();
factory.setHost("127.0.0.1");
factory.setPort(15672);
factory.setUserName("guest");
factory.setPassword("guest");
//创建连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel();

//声明一个队列
//参数1:队列名称
//参数2:队列是否持久化(磁盘), 默认情况下存储在内存中
//参数3:该队列是否只供一个消费者进行消费,是否进行消息共享。true为可以多个消费者消费,默认不允许多个消费者消费
//参数4:是否自动删除,最后一个消费者断开后,最后一句是否自动删除,true为自动删除。
//参数5:其他参数
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
    String message = scanner.next();
    //参数1:发送到交换机
    //参数2:路由的key值,本次是队列名
    //参数3:其他参数信息,设置消息为持久化,保存到磁盘中
    //参数4:发送的信息
    channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,         message.getBytes());
    System.out.println("发送信息成功:"+ message);
}
(2)消费者设置为两个(此时公平发送,你一条我一条)
//创建连接工厂
ConnectionFactory factory = new ConnectionFacory();
factory.setHost("127.0.0.1");
factory.setPort(15672);
factory.setUserName("guest");
factory.setPassword("guest");
//创建连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel();

//消费信息回调
DeliverCallback deliverCallback = (consumerTag, message) ->{
    //打印消息
    System.out.println(new String(message.getBody()));
    
};
//取消消费时,执行下面内容
CancelCallback cancelCallback = consumerTag->{
    //打印取消消息的tag
    System.out.println(consumerTag);
};

//参数1:消费哪一个队列
//参数2:消费成功后是否要自动应答 true为自动应答,false为手动应答
//参数3:消费者成功消费的回调
//参数4:消费者取消消费的回调
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
优化---消费者端改为手动应答,并设置能者多劳(basicQos()方法)
//创建连接工厂
ConnectionFactory factory = new ConnectionFacory();
factory.setHost("127.0.0.1");
factory.setPort(15672);
factory.setUserName("guest");
factory.setPassword("guest");
//创建连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel();

//消费信息回调
DeliverCallback deliverCallback = (consumerTag, message) ->{
    //打印消息
    System.out.println(new String(message.getBody()));
    //应答
    //参数1:消息的标记
    //参数2:是否批量  false为不批量处理
    channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
//取消消费时,执行下面内容
CancelCallback cancelCallback = consumerTag->{
    //打印取消消息的tag
    System.out.println(consumerTag);
};

//设置为0时代表公平分发 (你一条我一条)
//设置为1时代表不公平分发
//设置为2或其他值 时代表预取值
channel.basicQos(1);

//参数1:消费哪一个队列
//参数2:消费成功后是否要自动应答 true为自动应答,false为手动应答
//参数3:消费者成功消费的回调
//参数4:消费者取消消费的回调
channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/680104.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号