消息队列作用:解耦、异步、削峰
https://www.cnblogs.com/terry-love/p/11492397.html
二、RabbitMQ介绍三、RabbitMQ安装市面上比较火爆的几款MQ:
ActiveMQ,RocketMQ,Kafka,RabbitMQ。
语言的支持:ActiveMQ,RocketMQ只支持Java语言,Kafka可以支持多们语言,RabbitMQ支持多种语言。
效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别,RabbitMQ是微秒级别的。
消息丢失,消息重复问题: RabbitMQ针对消息的持久化,和重复问题都有比较成熟的解决方案。
学习成本:RabbitMQ非常简单。
RabbitMQ是由Rabbit公司去研发和维护的,最终是在Pivotal。
RabbitMQ严格的遵循AMQP协议,高级消息队列协议,帮助我们在进程之间传递异步消息。
1.创建 docker-compose.yaml 文件
[root@master java2109]# mkdir docker-compose-rabbitmq
[root@master java2109]# cd docker-compose-rabbitmq/
[root@master docker-compose-rabbitmq]# ls
[root@master docker-compose-rabbitmq]# vim docker-compose.yaml
version: "3.1" services: rabbitmq: image: daocloud.io/library/rabbitmq:management restart: always container_name: rabbitmq ports: - 5672:5672 - 15672:15672 volumes: - ./data:/var/lib/rabbitmq
2.启动rabbitmq
[root@master docker-compose-rabbitmq]# docker-compose up
登陆 (注意Google浏览器有兼容问题,使用IE)
http://虚拟机IP:15672/ 用户名guest 密码guest
四、RabbitMQ架构【重点】
4.1 官方的简单架构图
Publisher - 生产者:发布消息到RabbitMQ中的Exchange
Consumer - 消费者:监听RabbitMQ中的Queue中的消息
Exchange - 交换机:和生产者建立连接并接收生产者的消息
Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进行交互
Routes - 路由:交换机以什么样的策略将消息发布到Queue
| 简单架构图 |
|---|
完整架构图
| 完整架构图 |
|---|
5.1 RabbitMQ的通讯方式
5.2 Java连接RabbitMQ 5.2.1 创建maven项目
5.2.2 导入依赖…………
5.2.3 创建工具类连接RabbitMQ com.rabbitmq amqp-client5.6.0 junit junit4.12
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMqUtils {
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.12.145");
// 客户端连接的端口
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// VirtualHost 相当于文件夹,消息存在那个位置
connectionFactory.setVirtualHost("/");
return connectionFactory.newConnection();
}
}
5.3 Hello-World模式
一个生产者,一个默认的交换机,一个队列,一个消费者
创建测试类进行测试
import com.qfedu.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class HelloWorldTest {
private Connection connection;
@Before // 在@Test 之前调用初始化数据
public void init() throws IOException, TimeoutException {
connection = RabbitMqUtils.getConnection();
}
@Test // 进行单元测试
public void consumerTest() throws IOException {
// channel 管道 连接 消费者和队列
Channel channel = connection.createChannel();
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
// chanel 和 队列绑定
channel.queueDeclare("hello-queue",true,true,false,null);
// 抱着每次消费者 消费一条数据
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 从队列中获取消息
System.out.println("接受到消息:"+new String(body,"utf-8") );
}
};
// chanel 和 消费者绑定
// 参数1 队列名称
// 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
// 参数3 消费者
channel.basicConsume("hello-queue", true,consumer);
// 让程序一致卡在这里 消费者 可以一致消费消息
System.in.read();// 等待客户端命令行 的输入
}
@Test
public void publishTest() throws IOException, TimeoutException {
Channel channel = connection.createChannel();
//参数1: 交换机名称 没有就是默认 ""
// 参数2 :队列名称 或者是 消息类型信息 真的会路由规则进行匹配
// 参数3: 指定传递的消息所携带的properties,使用null。
// 参数4:消息
// 向队列 发送消息 " hello-queue"
channel.basicPublish("", "hello-queue",null,"hello-world".getBytes());
channel.close();
}
@After// 在@Test 之后进行 ,是数据销毁
public void destroy() throws IOException {
connection.close();
}
}
5.4 手动ack 机制注意:一定要先启动消费者,在启动生产者
手动ack 机制:保证消息对应的业务 已经真正的处理了,而不是仅仅接收到该消息
package com.qfedu.test;
import com.qfedu.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class HelloWorldTestAck {
private Connection connection;
@Before // 在@Test 之前调用初始化数据
public void init() throws IOException, TimeoutException {
connection = RabbitMqUtils.getConnection();
}
@Test // 进行单元测试
public void consumerTest() throws IOException {
// channel 管道 连接 消费者和队列
final Channel channel = connection.createChannel();
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
// chanel 和 队列绑定
channel.queueDeclare("hello-queue",true,true,false,null);
// 抱着每次消费者 消费一条数据
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 从队列中获取消息 并处理器
System.out.println("接受到消息:"+new String(body,"utf-8") );
// int a = 1/0;
// 所有的业务都完成之后 可以手动的ack
// envelope.getDeliveryTag() // 消息标记 0 1 2
// false ack 之后 不删除
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// chanel 和 消费者绑定
// 参数1 队列名称
// 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
// false 需要手动ack
// 参数3 消费者
channel.basicConsume("hello-queue", false,consumer);
// 让程序一致卡在这里 消费者 可以一致消费消息
System.in.read();// 等待客户端命令行 的输入
}
@Test
public void publishTest() throws IOException, TimeoutException {
Channel channel = connection.createChannel();
//参数1: 交换机名称 没有就是默认 ""
// 参数2 :队列名称 或者是 消息类型信息 真的会路由规则进行匹配
// 参数3: 指定传递的消息所携带的properties,使用null。
// 参数4:消息
// 向队列 发送消息 " hello-queue"
channel.basicPublish("", "hello-queue",null,"hello-world".getBytes());
channel.close();
}
@After// 在@Test 之后进行 ,是数据销毁
public void destroy() throws IOException {
connection.close();
}
}
5.5 worker-queue 模式
package com.qfedu.test;
import com.qfedu.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class WorkerQueueTest {
private Connection connection;
@Before // 在@Test 之前调用初始化数据
public void init() throws IOException, TimeoutException {
connection = RabbitMqUtils.getConnection();
}
@Test // 进行单元测试
public void consumer1Test() throws IOException {
// channel 管道 连接 消费者和队列
final Channel channel = connection.createChannel();
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
// chanel 和 队列绑定
channel.queueDeclare("worker-queue",true,false,false,null);
// 抱着每次消费者 消费一条数据
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 从队列中获取消息 并处理器
System.out.println("消费者1 接受到消息:"+new String(body,"utf-8") );
// 所有的业务都完成之后 可以手动的ack
// envelope.getDeliveryTag() // 消息标记 0 1 2
// false ack 之后 不删除
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// chanel 和 消费者绑定
// 参数1 队列名称
// 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
// false 需要手动ack
// 参数3 消费者
channel.basicConsume("worker-queue", false,consumer);
// 让程序一致卡在这里 消费者 可以一致消费消息
System.in.read();// 等待客户端命令行 的输入
}
@Test // 进行单元测试
public void consumer2Test() throws IOException {
// channel 管道 连接 消费者和队列
final Channel channel = connection.createChannel();
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
// chanel 和 队列绑定
channel.queueDeclare("worker-queue",true,false,false,null);
// 抱着每次消费者 消费一条数据
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 从队列中获取消息 并处理器
System.out.println("消费者2 接受到消息:"+new String(body,"utf-8") );
// 所有的业务都完成之后 可以手动的ack
// envelope.getDeliveryTag() // 消息标记 0 1 2
// false ack 之后 不删除
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// chanel 和 消费者绑定
// 参数1 队列名称
// 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
// false 需要手动ack
// 参数3 消费者
channel.basicConsume("worker-queue", false,consumer);
// 让程序一致卡在这里 消费者 可以一致消费消息
System.in.read();// 等待客户端命令行 的输入
}
@Test
public void publishTest() throws IOException, TimeoutException {
Channel channel = connection.createChannel();
//参数1: 交换机名称 没有就是默认 ""
// 参数2 :队列名称 或者是 消息类型信息 真的会路由规则进行匹配
// 参数3: 指定传递的消息所携带的properties,使用null。
// 参数4:消息
// 向队列 发送消息 " worker-queue"
// 发送10条数据 每个消费者得到5条数据
for (int i = 0; i < 10; i++) {
channel.basicPublish("", "worker-queue",null,("worker-queue--i:" +i).getBytes());
}
channel.close();
}
@After// 在@Test 之后进行 ,是数据销毁
public void destroy() throws IOException {
connection.close();
}
}
5.6 发布订阅模式
package com.qfedu.test;
import com.qfedu.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PublishSubTest {
private Connection connection;
@Before // 在@Test 之前调用初始化数据
public void init() throws IOException, TimeoutException {
connection = RabbitMqUtils.getConnection();
}
@Test // 进行单元测试
public void consumer1Test() throws IOException {
// channel 管道 连接 消费者和队列
final Channel channel = connection.createChannel();
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
// chanel 和 队列绑定
channel.queueDeclare("pubsub-queue1",true,false,false,null);
// 抱着每次消费者 消费一条数据
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 从队列中获取消息 并处理器
System.out.println("消费者1 接受到消息:"+new String(body,"utf-8") );
// 所有的业务都完成之后 可以手动的ack
// envelope.getDeliveryTag() // 消息标记 0 1 2
// false ack 之后 不删除
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// chanel 和 消费者绑定
// 参数1 队列名称
// 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
// false 需要手动ack
// 参数3 消费者
channel.basicConsume("pubsub-queue1", false,consumer);
// 让程序一致卡在这里 消费者 可以一致消费消息
System.in.read();// 等待客户端命令行 的输入
}
@Test // 进行单元测试
public void consumer2Test() throws IOException {
// channel 管道 连接 消费者和队列
final Channel channel = connection.createChannel();
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
// chanel 和 队列绑定
channel.queueDeclare("pubsub-queue2",true,false,false,null);
// 抱着每次消费者 消费一条数据
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 从队列中获取消息 并处理器
System.out.println("消费者2 接受到消息:"+new String(body,"utf-8") );
// 所有的业务都完成之后 可以手动的ack
// envelope.getDeliveryTag() // 消息标记 0 1 2
// false ack 之后 不删除
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// chanel 和 消费者绑定
// 参数1 队列名称
// 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
// false 需要手动ack
// 参数3 消费者
channel.basicConsume("pubsub-queue2", false,consumer);
// 让程序一致卡在这里 消费者 可以一致消费消息
System.in.read();// 等待客户端命令行 的输入
}
@Test
public void publishTest() throws IOException, TimeoutException {
Channel channel = connection.createChannel();
//将 chanel 和 自定义的交换机 绑定 "pubsub-exchange"
//参数1: exchange的名称
//参数2: 指定exchange的类型 FANOUT - pubsub , DIRECT - Routing , TOPIC - Topics
//FANOUT - pubsub 交换机 会将消息发送到 所有的队列中
channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);
//参数1 队列名
//参数2 交换机名
//参数3 路由规则
channel.queueBind("pubsub-queue1", "pubsub-exchange", "");
channel.queueBind("pubsub-queue2", "pubsub-exchange", "");
//参数1: 交换机名称 没有就是默认 ""
// 参数2 :队列名称 或者是 消息类型信息 真的会路由规则进行匹配
// 参数3: 指定传递的消息所携带的properties,使用null。
// 参数4:消息
// 向队列 发送消息
// 发送10条数据 每个消费者得到5条数据
for (int i = 0; i < 10; i++) {
// 消息没有发送到默认的交换机 ,而是发送到 自定义交换机pubsub-exchange
// "pubsub-exchange" 交换机名称
// "" 路由规则
channel.basicPublish("pubsub-exchange", "",null,("pubsub--i:" +i).getBytes());
}
channel.close();
}
@After// 在@Test 之后进行 ,是数据销毁
public void destroy() throws IOException {
connection.close();
}
}
5.7 routing 路由模式
import com.qfedu.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RoutingTest {
private Connection connection;
@Before // 在@Test 之前调用初始化数据
public void init() throws IOException, TimeoutException {
connection = RabbitMqUtils.getConnection();
}
@Test // 进行单元测试
public void consumer1Test() throws IOException {
// channel 管道 连接 消费者和队列
final Channel channel = connection.createChannel();
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
// chanel 和 队列绑定
channel.queueDeclare("routing-info-queue",true,false,false,null);
// 抱着每次消费者 消费一条数据
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 从队列中获取消息 并处理器
System.out.println("消费者1 接受到消息:"+new String(body,"utf-8") );
// 所有的业务都完成之后 可以手动的ack
// envelope.getDeliveryTag() // 消息标记 0 1 2
// false ack 之后 不删除
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// chanel 和 消费者绑定
// 参数1 队列名称
// 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
// false 需要手动ack
// 参数3 消费者
channel.basicConsume("routing-info-queue", false,consumer);
// 让程序一致卡在这里 消费者 可以一致消费消息
System.in.read();// 等待客户端命令行 的输入
}
@Test // 进行单元测试
public void consumer2Test() throws IOException {
// channel 管道 连接 消费者和队列
final Channel channel = connection.createChannel();
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
// chanel 和 队列绑定
channel.queueDeclare("routing-error-queue",true,false,false,null);
// 抱着每次消费者 消费一条数据
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 从队列中获取消息 并处理器
System.out.println("消费者2 接受到消息:"+new String(body,"utf-8") );
// 所有的业务都完成之后 可以手动的ack
// envelope.getDeliveryTag() // 消息标记 0 1 2
// false ack 之后 不删除
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// chanel 和 消费者绑定
// 参数1 队列名称
// 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
// false 需要手动ack
// 参数3 消费者
channel.basicConsume("routing-error-queue", false,consumer);
// 让程序一致卡在这里 消费者 可以一致消费消息
System.in.read();// 等待客户端命令行 的输入
}
@Test
public void publishTest() throws IOException, TimeoutException {
Channel channel = connection.createChannel();
//将 chanel 和 自定义的交换机 绑定 "pubsub-exchange"
//参数1: exchange的名称
//参数2: 指定exchange的类型 FANOUT - pubsub , DIRECT - Routing , TOPIC - Topics
// DIRECT - Routing 交换机 会将消息发送到 所有的队列中
channel.exchangeDeclare("Routing-exchange", BuiltinExchangeType.DIRECT);
//参数1 队列名
//参数2 交换机名
//参数3 路由规则
// 所有消息为 info 的消息都会 由Routing-exchange发送到routing-info-queue 队列中
channel.queueBind("routing-info-queue", "Routing-exchange", "info");
channel.queueBind("routing-error-queue", "Routing-exchange", "error");
//参数1: 交换机名称 没有就是默认 ""
// 参数2 :队列名称 或者是 消息类型信息 真的会路由规则进行匹配
// 参数3: 指定传递的消息所携带的properties,使用null。
// 参数4:消息
// 向队列 发送消息
// 发送10条数据 每个消费者得到5条数据
for (int i = 0; i < 10; i++) {
// 消息没有发送到默认的交换机 ,而是发送到 自定义交换机pubsub-exchange
//参数1: "pubsub-exchange" 交换机名称
//参数2: "" 路由规则
if (i%2==1){// 奇数 info
channel.basicPublish("Routing-exchange", "info",null,("routing--i:" +i).getBytes());
}else{ // 偶数 error
channel.basicPublish("Routing-exchange", "error",null,("routing--i:" +i).getBytes());
}
}
channel.close();
}
@After// 在@Test 之后进行 ,是数据销毁
public void destroy() throws IOException {
connection.close();
}
}
5.8 Topic 主题模式
package com.qfedu.test;
import com.qfedu.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TopicTest {
private Connection connection;
@Before // 在@Test 之前调用初始化数据
public void init() throws IOException, TimeoutException {
connection = RabbitMqUtils.getConnection();
}
@Test // 进行单元测试
public void consumer1Test() throws IOException {
// channel 管道 连接 消费者和队列
final Channel channel = connection.createChannel();
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
// chanel 和 队列绑定
channel.queueDeclare("topic-queue-1",true,false,false,null);
// 抱着每次消费者 消费一条数据
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 从队列中获取消息 并处理器
System.out.println("消费者1 接受到消息:"+new String(body,"utf-8") );
// 所有的业务都完成之后 可以手动的ack
// envelope.getDeliveryTag() // 消息标记 0 1 2
// false ack 之后 不删除
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// chanel 和 消费者绑定
// 参数1 队列名称
// 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
// false 需要手动ack
// 参数3 消费者
channel.basicConsume("topic-queue-1", false,consumer);
// 让程序一致卡在这里 消费者 可以一致消费消息
System.in.read();// 等待客户端命令行 的输入
}
@Test // 进行单元测试
public void consumer2Test() throws IOException {
// channel 管道 连接 消费者和队列
final Channel channel = connection.createChannel();
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
// chanel 和 队列绑定
channel.queueDeclare("topic-queue-2",true,false,false,null);
// 抱着每次消费者 消费一条数据
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 从队列中获取消息 并处理器
System.out.println("消费者2 接受到消息:"+new String(body,"utf-8") );
// 所有的业务都完成之后 可以手动的ack
// envelope.getDeliveryTag() // 消息标记 0 1 2
// false ack 之后 不删除
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// chanel 和 消费者绑定
// 参数1 队列名称
// 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
// false 需要手动ack
// 参数3 消费者
channel.basicConsume("topic-queue-2", false,consumer);
// 让程序一致卡在这里 消费者 可以一致消费消息
System.in.read();// 等待客户端命令行 的输入
}
@Test
public void publishTest() throws IOException, TimeoutException {
Channel channel = connection.createChannel();
//将 chanel 和 自定义的交换机 绑定 "pubsub-exchange"
//参数1: exchange的名称
//参数2: 指定exchange的类型 FANOUT - pubsub , DIRECT - Routing , TOPIC - Topics
// DIRECT - Routing 交换机 会将消息发送到 所有的队列中
channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
//参数1 队列名
//参数2 交换机名
//参数3 路由规则
// 所有消息为 info 的消息都会 由Routing-exchange发送到topic-queue-1 队列中
channel.queueBind("topic-queue-1", "topic-exchange", "*.orange.*");
channel.queueBind("topic-queue-2", "topic-exchange", "big.*.*");
//参数1: 交换机名称 没有就是默认 ""
// 参数2 :队列名称 或者是 消息类型信息 真的会路由规则进行匹配
// 参数3: 指定传递的消息所携带的properties,使用null。
// 参数4:消息
// 向队列 发送消息
// 发送10条数据 每个消费者得到5条数据
for (int i = 0; i < 10; i++) {
// 消息没有发送到默认的交换机 ,而是发送到 自定义交换机pubsub-exchange
//参数1: "pubsub-exchange" 交换机名称
//参数2: "" 路由规则
if (i%2==1){// 奇数 orange
channel.basicPublish("topic-exchange", "xxxasdasd.orange.xfsdf",null,("topic--i:" +i).getBytes());
}else{ // 偶数 error
channel.basicPublish("topic-exchange", "big.xxxx.uii",null,("routing--i:" +i).getBytes());
}
}
channel.close();
}
@After// 在@Test 之后进行 ,是数据销毁
public void destroy() throws IOException {
connection.close();
}
}
六、RabbitMQ整合SpringBoot【重点】
1、创建springboot 工程并导入依赖
org.springframework.boot
spring-boot-starter-parent
2.2.6.RELEASE
org.springframework.boot
spring-boot-starter-amqp
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-test
test
org.junit.vintage
junit-vintage-engine
junit
junit
4.12
2、 编写配置文件
spring.rabbitmq.host=192.168.12.130 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/
3.启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitApplication.class,args);
}
}
4.装配 exchange、queue
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public TopicExchange topicExchange(){
//String name, 交换机名字
// boolean durable, 是否持久化
// boolean autoDelete 是否自动删除
TopicExchange topicExchange = new TopicExchange("springboot-topic-exchange", true,false);
return topicExchange;
}
@Bean
public Queue queue(){
//String name, 队列 名字
// boolean durable, 是否持久化
// boolean exclusive, 是否排外
// boolean autoDelete, 是否自动删除
// @Nullable Map arguments 其他属性
Queue queue = new Queue("springboot-queue", true,false,false,null);
return queue;
}
@Bean
public Binding binding(TopicExchange topicExchange,Queue queue){
// 将交换接和 队列绑定 并匹配 *.java2109.*
Binding binding = BindingBuilder.bind(queue).to(topicExchange).with("*.java2109.*");
return binding;
}
}
5、配置消费者
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
// 该方法作为消费者 接收 中的消息
@RabbitListener(queues = "springboot-queue")
public void consumer1(String msg, Channel channel, Message message){
System.out.println("消费者 得到 msg = " + msg);
System.out.println("msg = " + message);
}
}
6、单元测试
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMqTest {
// rabbitTemplate 就是连接 rabbit 的客户端
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void publishTest(){
// String var1, 交换机
// String var2, 路由规则
// Object var3 发送的数据
rabbitTemplate.convertAndSend("springboot-topic-exchange", "xxxx.java2109.sdfsd","下课么?");
System.out.println("发送消息 ");
}
}
七、 消息的高可靠 7.1 手动ack
手动ack 保证消费者正确消费消息,执行相关的业务
1.在配置文件开启手动ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
2.消费者手动ack
// 该方法作为消费者 接收 中的消息
@RabbitListener(queues = "springboot-queue")
public void consumer1(String msg, Channel channel, Message message) throws IOException {
System.out.println("消费者 得到 msg = " + msg);
System.out.println("msg = " + message);
// 此时发生异常 不可以运行到手动 ack, 此时消息 会发送多次
int i = 1/0;
// 手动ack
// 将该消息的序号 ack message.getMessageProperties().getDeliveryTag()
// 多条消息是否一起ack false
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
7.2 confirm 机制 & return 机制
保证生产者 发送的消息,确保可以到达queue 磁盘存储
confirm 机制 :发生在 生产者 和交换机之间
return 机制:发生在交换机 和 队列之间
1.配置开启confirm return 机制
spring.rabbitmq.publisher-/confirm/i-type=simple spring.rabbitmq.publisher-returns=true
2.配置confirm return 接口
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class ConfirmReturnCallBack implements RabbitTemplate./confirm/iCallback,RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
// 在容器中 加入该bean 会调用 该方法( init 方法 , @PostConstruct 标记方法)
@PostConstruct // 相当 bean.xml 中
public void init(){
// 配置 rabbitTemplate 客户端的回调信息
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
// confirm 机制
@Override
public void confirm(CorrelationData correlationData, boolean /confirm/i, String s) {
System.out.println("correlationData = " + correlationData);
System.out.println("s = " + s);
if (/confirm/i){
System.out.println( "该消息到达 交换机");
}
}
// return 机制 ,一般情况下 不会回调,只有在交换机的消息不能写入队列才会调用
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
// 如果失败 得到message 消息,否则 不调用
System.out.println("message = " + message);
}
}
八、消息不重复3.发送消息 并测试
消息不重复:就是在某些情况下,消费者发生异常(没有手动ack成功),造成消息的重复发送,此时消费者 会再次处理该消息,就会造成 重复消费
重复消费场景:重复扣款,重复订单 都成为不满足幂等
幂等:请求只处理一次
redis 中 setnx如果没有key 则设置成功,并返回true
如果有key,则不做任何操作,返回false
我们处理消息不重复的思路:
1.我们为每一个消息标记一个唯一id 在生产发送消息时
2.在消费者 处理消息时,首先判断该消息id 是否已经被 标记,
如果标记说明已经被执行过,就不需要再次执行
如果没有标记说明没有被执行过,执行业务
1.引入redis
org.springframework.boot spring-boot-starter-data-redis
2.配置redis
#配置redis spring.redis.host=47.94.137.100 spring.redis.port=6379
3.在发送消息时配置唯一 的 id
@Test
public void publishTest(){
// 创建一个唯一id
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString())
;
// String var1, 交换机
// String var2, 路由规则
// Object var3 发送的数据
rabbitTemplate.convertAndSend("springboot-topic-exchange", "xxxx.java2109.sdfsd","下课么?"
// 发送该消息 并配置唯一id
+ System.currentTimeMillis(),correlationData);
System.out.println("发送消息 ");
}
4.消费者处理消息
package com.qfedu.config;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
@Component
public class Consumer {
@Autowired
private StringRedisTemplate redisTemplate;
// 该方法作为消费者 接收 中的消息
@RabbitListener(queues = "springboot-queue")
public void consumer1(String msg, Channel channel, Message message) throws IOException {
System.out.println("消费者 得到 msg = " + msg);
System.out.println("msg = " + message);
// 得到处理消息的唯一id
String id = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
System.out.println("id = " + id);
// 如果能设置成功说明 该消息没有被处理过
// 该 id 值为0 代表 正在处理 1代表处理完成
if (redisTemplate.opsForValue().setIfAbsent(id,"0", 10, TimeUnit.SECONDS)){
System.out.println("消费者 处理该消息业务 msg = " + msg);
//处理完之后: 1.redis 修改处理完成状态 2.手动ack
// 1.redis 修改处理完成状态
redisTemplate.opsForValue().set(id,"1", 10, TimeUnit.SECONDS);
// 手动ack
// 将该消息的序号 ack message.getMessageProperties().getDeliveryTag()
// 多条消息是否一起ack false
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else {
// 如果不能设置key 说明 已经有消费者在处理
if (redisTemplate.opsForValue().get(id).equals("1")){// 如果状态是 1说明业务已经完成,只是没有ack 而已
// 手动ack
// 将该消息的序号 ack message.getMessageProperties().getDeliveryTag()
// 多条消息是否一起ack false
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
}
}



