栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

rabbitmq三种模式(rabbitmq常用命令)

rabbitmq三种模式(rabbitmq常用命令)

一、 安装及配置 1 .安装依赖环境
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make
gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
2. 安装Erlang

上传

erlang-18.3-1.el7.centos.x86_64.rpm

socat-1.7.3.2-5.el7.lux.x86_64.rpm

rabbitmq-server-3.6.5-1.noarch.rpm

# 安装
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
2.2 Erlang版本过低的解决方案

查看当前机器的gblic 版本

strings /lib64/libc.so.6 | grep GLIBC

使⽤yum更新安装依赖
sudo yum install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlitedevel readline-devel tk-devel gcc make -y
下载rpm包
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepoel6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-utils-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepoel6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-static-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepoel6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepoel6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-common-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepoel6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-devel-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepoel6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-headers-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepoel6/epel-6-x86_64/glibc-2.17-55.fc20/nscd-2.17-55.el6.x86_64.rpm &
安装rmp包
sudo rpm -Uvh socat-1.7.3.2-1.1.el7.x86_64.rpm --force --nodeps
安装完毕后再查看glibc版本
3. 安装RabbitMQ
# 安装
rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm --force --nodeps
# 安装
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
4. 开启管理界⾯及配置
# 开启管理界⾯
rabbitmq-plugins enable rabbitmq_management
# 修改默认配置信息
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
# ⽐如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest
5. 启动服务
service rabbitmq-server start # 启动服务
service rabbitmq-server stop # 停⽌服务
service rabbitmq-server restart # 重启服务

设置配置⽂件
cd /usr/share/doc/rabbitmq-server-3.6.5/
cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
二、 RabbitMQ入门 2.1 pom.xml依赖

    com.rabbitmq
    amqp-client
    5.6.0

2.2 生产者
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//主机地址;默认为 localhost
connectionFactory.setHost("localhost");
//连接端口;默认为 5672
connectionFactory.setPort(5672);
//虚拟主机名称;默认为 /
connectionFactory.setVirtualHost("/xzk");
//连接用户名;默认为guest
connectionFactory.setUsername("lxs");
//连接密码;默认为guest
connectionFactory.setPassword("lxs");
//创建连接
Connection connection = connectionFactory.newConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明(创建)队列

channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 要发送的信息
String message = "你好;小兔子!";

channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("已发送消息:" + message);
// 关闭资源
channel.close();
connection.close();

发送消息后 在mq的管控台可以看到队列和消息

2.3 消费者
Connection connection = ConnectionUtil.getConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明(创建)队列

channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override

public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("接收到的消息为:" + new String(body, "utf-8"));
三、 工作模式 3.1 Work queues工作队列模式

 Work Queues 与入门程序的 简单模式 相比,多了一个或一些消费端,多个消费端共同消费同一个队列 中的消息。

应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

代码上只需让多个消费者监听同一个队列就行 , 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。

3.2 订阅模式

 订阅模式中一个交换机发布给多个queue

在订阅模型中,多了一个exchange角色,而且过程略有变化:

P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)

C:消费者,消息的接受者,会一直等待消息到来。

Queue:消息队列,接收消息、缓存消息。

Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消 息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于 Exchange的类型。

Exchange有常见以下3种类型:

         Fanout:广播,将消息交给所有绑定到交换机的队列

        Direct:定向,把消息交给符合指定routing key 的队列

        Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑 定,或者没有符合路由规则的队列,那么消息会丢失!

3.3.1 Publish/Subscribe发布与订阅模式

发布订阅模式: 1、每个消费者监听自己的队列。 2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的  每个  队列,每个绑定交换机的队列都将接收到消息。

 // 消费者一方   将交换机声明为 fanout
channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);

channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
//队列绑定交换机
channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHAGE, "");
3.3.2 Routing路由模式

队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey (路由key)

消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey 。

Exchange不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列 的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息

// 声明交换机
channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
//  绑定
channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE, "insert");
 3.3.3 Topics通配符模式

Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,

例如: item.insert

通配符规则: # :匹配一个或多个词

                        * :匹配不多不少恰好1个词 举例:

item.# :能够匹配 item.insert.abc 或者 item.insert     

item.* :只能匹配 item.insert

// producer 一方 交换机声明
channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
// consumer 一方 队列绑定交换机
channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHAGE,"item.*");

 

 四、 spring整合RabbitMQ

依赖如下


    
        org.springframework
        spring-context
        5.1.7.RELEASE
    
    
        org.springframework.amqp
        spring-rabbit
        2.1.8.RELEASE
    
    
        junit
        junit
        4.12
    
    
        org.springframework
        spring-test
        5.1.7.RELEASE
    

1. 创建 spring-rabbitmq-producersrcmainresourcespropertiesrabbitmq.properties 连接参数等配置文件;

rabbitmq.host=192.168.31.222
rabbitmq.port=5672
rabbitmq.username=cxx
rabbitmq.password=cxx
rabbitmq.virtual-host=/xzk

2. 创建 spring-rabbitmq-producersrcmainresourcesspringspring-rabbitmq.xml 整合 配置文件;








































测试类发送消息

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring/spring-rabbitmq.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void queueTest(){
//路由键与队列同名
rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消
息。");
}

@Test
public void fanoutTest(){

rabbitTemplate.convertAndSend("spring_fanout_exchange", "", "发送到
spring_fanout_exchange交换机的广播消息");
}

@Test
public void topicTest(){

rabbitTemplate.convertAndSend("spring_topic_exchange", "lxs.bj", "发送到
spring_topic_exchange交换机lxs.bj的消息");
rabbitTemplate.convertAndSend("spring_topic_exchange", "lxs.bj.1", "发送
到spring_topic_exchange交换机lxs.bj.1的消息");
rabbitTemplate.convertAndSend("spring_topic_exchange", "lxs.bj.2", "发送
到spring_topic_exchange交换机lxs.bj.2的消息");
rabbitTemplate.convertAndSend("spring_topic_exchange", "xzk.cn", "发送到
spring_topic_exchange交换机xzk.cn的消息");
}
}

消费者创建监听器

spring-rabbitmq-consumersrcmainjavacomlxsrabbitmqlistenerSpringQueueListener.java

public class SpringQueueListener implements MessageListener {
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s
n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}

广播监听器 

创建 spring-rabbitmq-consumersrcmainjavacomlxsrabbitmqlistenerFanoutListener1.java

public class FanoutListener1 implements MessageListener {
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("广播监听器1:接收路由名称为:%s,路由键为:%s,队列名为:
%s的消息:%s n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}

五、 SpringBoot 整合RabbitMQ

在Spring项目中,可以使用Spring-Rabbit去操作

RabbitMQ https://github.com/spring-projects/sprin g-amqp

尤其是在spring boot项目中只需要引入对应的amqp启动器依赖即可,方便的使用RabbitTemplate发 送消息,使用注解接收消息。

一般在开发过程中:

生产者工程: 1. application.yml文件配置RabbitMQ相关信息; 2. 在生产者工程中编写配置类,用于创建交换机和队列,并进行绑定 3. 注入RabbitTemplate对象,通过RabbitTemplate对象发送消息到交换机

消费者工程: 1. application.yml文件配置RabbitMQ相关信息    2. 创建消息处理类,用于接收队列中的消息并进行处理

依赖如下

com.lxs
springboot-rabbitmq-producer
1.0-SNAPSHOT


org.springframework.boot
spring-boot-starter-amqp


org.springframework.boot
spring-boot-starter-test

配置文件

springboot中就不使用配置文件来绑定交换机 , 使用 @Configuration 注解的配置类

五、 高级特性 5.1 消息的可靠投递

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我 们提供了两种方式用来控制消息的投递可靠性模式。

        confirm 确认模式

        return 退回模式

rabbitmq 整个消息投递的路径为:

producer--->rabbitmq broker--->exchange--->queue--->consumer

消息从 producer 到 exchange 则会返回一个 confirmCallback 。

消息从 exchange-->queue 投递失败则会返回一个 returnCallback 。

5.1.1 确认模式

1. 确认模式开启:ConnectionFactory中开启publisher-confirms="true"

2. 在rabbitTemplate定义/confirm/iCallBack回调函数

rabbitTemplate.setConfirmCallback (){....}

5.1.2 退回模式

消息从 exchange-->queue 投递失败则会返回一个 returnCallback

1. 开启回退模式:publisher-returns="true"

2. 设置ReturnCallBack

        rabbitTemplate.setReturnCallback(){....}

3. 设置Exchange处理消息失败的模式:setMandatory

        rabbitTemplate.setMandatory(true);

5.2 Consumer Ack

ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。

有三种确认方式:

        自动确认:acknowledge="none"

        手动确认:acknowledge="manual"

        根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么 该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手 动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

1. 设置手动签收。acknowledge="manual"

 2. 让监听器类实现ChannelAwareMessageListener接口(本来实现的是MessageListener)

 3. 如果消息成功处理,则调用channel的 basicAck()签收

4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer

5.3 消费端限流

Consumer 限流机制 *

        1. 确保ack机制为手动确认。 

        2. listener-container配置属性

        perfetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉取下一条消息。

 5.4 TTL

Time To Live,消息过期时间设置

代码实现

在配置文件 定义queue时 添加如下

 5.5 死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX

 消息成为死信的三种情况:

        1. 队列消息长度到达限制;

        2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队 列,requeue=false;

        3. 原队列存在消息过期设置,消息到达超时时间未被消费;

队列

死信交换机:

        给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key





5.6 延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

需求:

1. 下单后,30分钟未支付,取消订单,回滚库存。 2. 新用户注册成功7天后,发送短信问候。

实现方式:TTL+死信队列 组合实现延迟队列的效果。

即: 在普通队列中不设置监听 , 在消息过期 进入死信交换机后, 再消费

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/772032.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号