- 1.RabbitMQ入门玩耍
- 1.安装
- 2.RabbitMQ的几大模式
- 3.动手兄弟们,冲
- 准备工作:
- 1.Hello World模式
- 生产者代码:
- 消费者代码:
- 2.Work Queues模式
- 工具类代码如下:
- 两个消费者代码
- 生产者代码
- 能者多劳的消费者:
- 3.Publish/Subscrible(发布订阅)模式
- 生产者:
- 消费者:
- 4.Routing模式
- 生产者:
- 消费者:
- 5.Topics模式
- 生产者:
- 消费者:
- 2.Spring 如何使用RabbitMQ
- 1.创建项目
- 2.依赖(完全可以创建的时候勾选上)
- 3.配置
- 4.代码
- HelloWorld模型:
- WorkQueues模式:
- Publish/Subscrible模式
- Routing模式:
- Topics模式:
- 3.RabbitMQ集群
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang(面向并发、函数式编程)语言编写的 1.RabbitMQ入门玩耍
官网下载:https://rabbitmq.com/download.html
官网7种模式介绍:https://www.rabbitmq.com/getstarted.html
1.安装直接Docker,(后面会写linux服务器安装RabbitMQ,时间嘛,就说不准了 )洛
官网也写了docker安装运行的命令,复制,cmd命令行运行即可(前提安装了docker)
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management
图中可以看到RabbitMQ的http端口是15672,所以浏览器访问一下:
账号和密码都是:guest,它拥有管理员的权限
暂时就知道这些
2.RabbitMQ的几大模式咱们下面一个一个来
3.动手兄弟们,冲 准备工作:新建一个项目,不多说
引入依赖:
com.rabbitmq amqp-client 5.13.1
在开始代码之前,我们需要去创建一个虚拟主机:
(虚拟主机相当于数据库,不同的项目应有自己的数据库,虚拟主机就可以区分同一台RabbitMQ主机的不同项目)
如果你要创建用户(了解):
如果创建的虚拟主机权限赋给该用户(了解):
下面开始:
1.Hello World模式可以看到:这种模式就是一个生产者生产消息放到队列,消费者直接从队列里消费消息,并没有涉及到交换机(其实使用了默认交换机)。其缺点就是容易产生消息堆积
生产者代码:import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置RabbitMQ的主机,ip地址(cmd中ipconfig命令最后一块ipv4就是)
connectionFactory.setHost("192.168.1.3");
//设置端口,5672不改
connectionFactory.setPort(5672);
//设置连接哪个虚拟主机
connectionFactory.setVirtualHost("/study");
//用户名
connectionFactory.setUsername("guest");
//密码
connectionFactory.setPassword("guest");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//创建连接通道
Channel channel = connection.createChannel();
//通道绑定消息队列
channel.queueDeclare("hello",false,false,false,null);
//发布消息
channel.basicPublish("","hello",null,"HelloWorld".getBytes());
}
}
参数说明:
如果队列要持久化:durable设置为true,不然rabbitmq重启队列就消失
如果队列的消息也要持久化:发布消息时需要将props写为如下内容,就是持久化消息
运行:查看web管理界面的队列:
可以看到运行了两次,页面中确实存在了hello队列,并且有两条未被消费的消息
消费者代码:import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Constumer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置RabbitMQ的主机
connectionFactory.setHost("192.168.1.3");
//设置端口,5672
connectionFactory.setPort(5672);
//设置连接哪个虚拟主机
connectionFactory.setVirtualHost("/study");
//用户名
connectionFactory.setUsername("guest");
//密码
connectionFactory.setPassword("guest");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//创建连接通道
Channel channel = connection.createChannel();
//通道绑定消息队列
channel.queueDeclare("hello",false,false,false,null);
//消费消息
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
System.out.println(new String(body));
}
});
}
}
现在我的hello队列里有一条消息:
启动Consumer:
消息被消费,控制台也打印出了消息内容
2.Work Queues模式通过上面这张图片我们可以看出:它和helloworld模式区别不大,只是消费者不再是一个。
为了方便,我创建了工具类,因为代码大部分都是重复的。
工具类代码如下:public class ConnUtil {
public static Connection getConn() throws IOException, TimeoutException {
//创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置RabbitMQ的主机
connectionFactory.setHost("192.168.1.3");
//设置端口,5672
connectionFactory.setPort(5672);
//设置连接哪个虚拟主机
connectionFactory.setVirtualHost("/study");
//用户名
connectionFactory.setUsername("guest");
//密码
connectionFactory.setPassword("guest");
//获取连接对象
Connection connection = connectionFactory.newConnection();
return connection;
}
public static void close(Channel channel, Connection connection) throws IOException, TimeoutException {
channel.close();
connection.close();
}
}
两个消费者代码
消费者1:
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//获取通道
Channel channel = ConnUtil.getChannel();
//绑定队列
channel.queueDeclare("workqueue",false,false,true,null);
//消费消息
channel.basicConsume("workqueue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
System.out.println(new String(body));
}
});
}
}
消费者2:代码一样
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//获取通道
Channel channel = ConnUtil.getChannel();
//绑定队列
channel.queueDeclare("workqueue",false,false,true,null);
//消费消息
channel.basicConsume("workqueue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
System.out.println(new String(body));
}
});
}
}
先将两个消费者跑起来
生产者代码public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection conn = ConnUtil.getConn();
//获取通道
Channel channel = conn.createChannel();
//绑定队列
channel.queueDeclare("workqueue",false,false,true,null);
//发送50条消息
for (int i = 1; i <=20; i++) {
channel.basicPublish("","workqueue",null,("这是消息"+i).getBytes());
}
//释放资源
ConnUtil.close(channel,conn);
}
}
生产者跑起来:
可以看到,消费者一和消费者二是平均分配消息的,一人一半。
这种平均分配是会出现问题的,当其中一个消费者处理速度很慢,那么会造成总的处理速度下降,我另一个消费者早就消费完了,而你才消费几个,为什么处理快的和处理慢的要平均分呢?。所以能者多劳才是最好的分配方式。
在说如何实现能者多劳之前,我们需要知道的是RabbitMQ的消息确认机制,
就是上面这个参数,这里我们是开启了自动确认,所谓自动确认是指消费者拿到消息后就会向队列确认,不管消息有没有被消费掉。而且上面的代码中我们并没有设置消费者一次从队列中拿几条消息,没有设置RabbitMQ就会一人一半分好,然后一次性全部拿给消费者。可以想象,一次性拿给消费者,自动确认的情况下,如果消费者执行了一半挂了,后面的消息都没有被消费,而你在拿到所有消息的时候就自动的向队列确认了-----“所有的消息都执行了”,这就出大问题!
所以要实现能者多劳,我们需要对上面代码做如下修改:
- 消费者一次从队列拿一条消息
- 关闭自动确认
- 在消费者的回调中,手动消息确认
消费者1(消费快):
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//获取通道
Channel channel = ConnUtil.getChannel();
//绑定队列
channel.queueDeclare("workqueue",false,false,true,null);
//一次拿一条消息
channel.basicQos(1);
//消费消息
channel.basicConsume("workqueue",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
System.out.println(new String(body));
//消费完成手动确认消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
消费者2(消费慢):
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//获取通道
Channel channel = ConnUtil.getChannel();
//绑定队列
channel.queueDeclare("workqueue",false,false,true,null);
//一次拿一条消息
channel.basicQos(1);
//消费消息
channel.basicConsume("workqueue",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
//模拟该消费者处理慢,睡1s
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(new String(body));
//消费完成手动确认消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
这里为了模拟消费速度慢,让它每次睡一秒
重新运行:先跑两个消费者,再跑生产者
可以看到因为消费者2要睡一秒,所以消费者1会处理更多的消息,这就是能者多劳模式
3.Publish/Subscrible(发布订阅)模式x代表交换机,该模式需要明确指定使用哪个交换机,我们之前的交换机都是写的"",这其实是使用了默认交换机,该模式的大致流程:
- 生成者将消息发送到交换机
- 交换机可绑定多个临时队列
- 每个队列一个消费者
该模式正如其名,一个消息可以发布给很多消费者,只要消费者绑定的队列与该交换机绑定。
生产者:public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接对象
Connection conn = ConnUtil.getConn();
//创建通道
Channel channel = conn.createChannel();
//通道绑定交换机,没有会自己创建
channel.exchangeDeclare("publish","fanout");
//发送消息
channel.basicPublish("publish","",null,"这是发布订阅模式".getBytes());
}
}
在运行之前,我们先打开管理界面看看RabbitMQ默认有哪些交换机,一共有7个交换机,分别对应7中模式,而且可以看出,每个虚拟主机都有自己的7个交换机。但是这些自带的好像不能用,需要自己创建
这里的fanout交换机类型就是发布订阅需要使用的交换机类型。
运行一下:
消费者:public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接对象
Connection conn = ConnUtil.getConn();
//创建信道
Channel channel = conn.createChannel();
//信道绑定交换机
channel.exchangeDeclare("publish","fanout");
//创建临时队列,这种方式能创建临时的队列
String queue = channel.queueDeclare().getQueue();
//临时队列与交换机绑定
channel.queueBind(queue,"publish","");
//一次拿一条消息
channel.basicQos(1);
//消费消息
channel.basicConsume(queue,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
System.out.println(new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
将此消费者复制两个即可,代码一样
为了更直观的看到结果,我将生产者停止运行,管理界面删掉了我创建的publish的交换机,
先启动两个消费者,在启动生产者:
可以看到,两个消费者都拿到了同一条消息
看看队列管理界面
所谓临时队列,就是消费者断开连接后,队列就自动销毁,你可以试试现在关闭一个消费者,看看临时队列的变化。
4.Routing模式Routing模式可以理解为细化或者加强版的发布订阅模式,我们知道发布订阅模式是生产者发送一个消息,所有绑定的消费者都能接收到该消息,而Routing模式,加上了路由筛选,临时队列与交换机绑定时,需要指定哪些路由能发送到该队列,生产者发送消息时,也会指定该消息要发送的路由。只有绑定了这个路由的队列才会接收到消息
使用Routing模式,我们需要将交换机的类型设置为direct
生产者:指定接收路由为vip
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接对象
Connection conn = ConnUtil.getConn();
//获取信道
Channel channel = conn.createChannel();
//信道绑定交换机
channel.exchangeDeclare("routing","direct");
//发送消息,,routingkey必须写
channel.basicPublish("routing","vip",null,"这条消息vip才能看到".getBytes());
}
}
消费者:
消费者1:路由设置为vip
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接对象
Connection conn = ConnUtil.getConn();
//获取信道
Channel channel = conn.createChannel();
//信道绑定交换机
channel.exchangeDeclare("routing","direct");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
//对列与交换机绑定
channel.queueBind(queue,"routing","vip");
//一次拿一条消息
channel.basicQos(1);
//消费
channel.basicConsume(queue,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
System.out.println(new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
消费者2:路由设置为notvip
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接对象
Connection conn = ConnUtil.getConn();
//获取信道
Channel channel = conn.createChannel();
//信道绑定交换机
channel.exchangeDeclare("routing","direct");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
//队列与交换机绑定
channel.queueBind(queue,"routing","notvip");
//一次拿一条消息
channel.basicQos(1);
//消费
channel.basicConsume(queue,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
System.out.println(new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
先运行消费者,再运行生产者:
的确只有设置了路由为vip的消费者1拿到了消息。
现在我们改一下消费者2,给它也加上vip的路由:
如下即可:
重跑消费者2和生产者,效果:
其实就是在发布订阅的模式上,生产者发送消息必须指定路由,消费者所在队列绑定交换机时必须指明接收的路由。很简单
5.Topics模式Topics这个模型更简单,它是基于Routing模式,相当于动态路由,交换机类型必须是topic,它可以使用#和*进行统配:
#:普配多个
*:匹配一个
Topics的路由要求我们使用点点点的写法:test.a.b,这样。
下面我的生产者的发送路由为:test.c.b
消费者队列的路由一个设置为test.*.b ,另一个为test.#
生产者:public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接对象
Connection conn = ConnUtil.getConn();
//获取信道
Channel channel = conn.createChannel();
//信道绑定交换机
channel.exchangeDeclare("topic","topic");
//发送消息,,routingkey必须写
channel.basicPublish("topic","test.c.b",null,"Topics模式".getBytes());
}
}
消费者:
消费者1:
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接对象
Connection conn = ConnUtil.getConn();
//获取信道
Channel channel = conn.createChannel();
//信道绑定交换机
channel.exchangeDeclare("topic","topic");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
//信道绑定队列
channel.queueBind(queue,"topic","test.*.b");
//一次拿一条消息
channel.basicQos(1);
//消费
channel.basicConsume(queue,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
System.out.println(new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
消费者2:
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接对象
Connection conn = ConnUtil.getConn();
//获取信道
Channel channel = conn.createChannel();
//信道绑定交换机
channel.exchangeDeclare("topic","topic");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
//信道绑定队列
channel.queueBind(queue,"topic","test.#");
//一次拿一条消息
channel.basicQos(1);
//消费
channel.basicConsume(queue,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
System.out.println(new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
结果,
这样发送,两个消费者都能匹配到该消息:
下面,我们改一下,生产者的路由为:test.c
思考一下,哪个消费者能收到消息?
消费者2.
的确如此。
以上就是常用的模式了。
2.Spring 如何使用RabbitMQ 1.创建项目 2.依赖(完全可以创建的时候勾选上)3.配置org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-web
spring:
rabbitmq:
host: localhost
username: guest
password: guest
virtual-host: /study
port: 5672
4.代码
Spring为我们提供了一个RabbitMQ的模板类,同redis那些一样,再配上一些注解就可以了
HelloWorld模型:生产者测试方法:(因为必须启动Spring,所以只能写在测试类中)
@SpringBootTest
public class SpringRabbitMQTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void helloWorld(){
//发送消息就这么简单
rabbitTemplate.convertAndSend("spring","Spring+RabbitMQ");
}
}
消费者:(创建一个类即可)
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "spring",durable = "false",exclusive = "false"))
public class Consumer {
@RabbitHandler
public void handler(String message){
System.out.println(message);
}
}
运行生产者测试类:
可以看到,消费者确实拿到了消息
队列也创建了
WorkQueues模式:生产者测试方法:
@Test
public void workQueues(){
//WorkQueues模式
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work","这是Spring,WorkQueues模式");
}
}
消费者:@RabbitListener也能作用于方法,同样表明这是一个消费者,此时@RabbitHandler不写了
@Component
public class Consumer {
//消费者一,没错@RabbitListener也能作用于方法,表明这是一个消费者
@RabbitListener(queuesToDeclare = @Queue(value = "work"))
public void consumer1(String message){
System.out.println("consumer1:"+message);
}
//消费者二
@RabbitListener(queuesToDeclare = @Queue(value = "work"))
public void consumer2(String message){
System.out.println("consumer2:"+message);
}
}
运行生产者测试方法:
能者多劳的WorkQueues需要一些额外的配置
Publish/Subscrible模式生产者测试方法:
//发布订阅模式
@Test
public void Publish(){
rabbitTemplate.convertAndSend("SpringPublish","publish","这是SpringRabbitMQ的发布订阅模式");
}
消费者类:
@Component
public class PublishConsumer {
//消费者1
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,exchange = @Exchange(value = "SpringPublish",type = "fanout"))
})
public void consumer1(String message){
System.out.println("消费者1:"+message);
}
//消费者2
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,exchange = @Exchange(value = "SpringPublish",type = "fanout"))
})
public void consumer2(String message){
System.out.println("消费者2:"+message);
}
}
运行生产者测试方法:
Routing模式:生产者测试方法:
//路由模式
@Test
public void routing(){
rabbitTemplate.convertAndSend("SpringRouting","svip9","这是SpringRabbitMQ的Routing模式,svip9可见");
}
消费者类:
@Component
public class RoutingConsumer {
//消费者1
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,exchange = @Exchange(value = "SpringRouting",type = "direct"),
key = {"svip1","svip2","svip3"})
})
public void consumer1(String message){
System.out.println("消费者1:"+message);
}
//消费者2
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,exchange = @Exchange(value = "SpringRouting",type = "direct"),
key = {"svip7","svip8","svip9"})
})
public void consumer2(String message){
System.out.println("消费者2:"+message);
}
}
运行:
Topics模式:生产者测试类:
//Topics模式
@Test
public void topics(){
rabbitTemplate.convertAndSend("SpringTopic","hao.fan","这是SpringRabbitMQ的Topics模式");
}
消费者类:
@Component
public class TopicsConsumer {
//消费者1
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,exchange = @Exchange(value = "SpringTopic",type = "topic"),
key = {"woc.*","hao.#"})
})
public void consumer1(String message){
System.out.println("消费者1:"+message);
}
//消费者2
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,exchange = @Exchange(value = "SpringTopic",type = "topic"),
key = {"A.B.#","C.*.A"})
})
public void consumer2(String message){
System.out.println("消费者2:"+message);
}
}
运行一下测试类:
3.RabbitMQ集群待更新。。。。。



