栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMq简单使用

RabbitMq简单使用

一.RabbitMq简介

AMQP(高级消息队列协议)是一种消息协议,它使符合标准的客户端应用程序能够与符合标准的消息中间件代理进行通信。

二.核心概念 1.Publisher

消息的生产者,将消息发送给交换机

2.Consumer

消息的消费者,绑定队列并消费消息

3.Virtual Hosts

虚拟主机,虚拟主机提供资源的逻辑分组和分离,拥有自己的队列、交换器、绑定和权限机制;一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange和Queue。

4.Message

消息,由Properties和Body组成(消息头和消息体)。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body这就是消息体内容

5.Exchange

交换机,接受生产者发送的消息并路由到相关的队列。主要有以下几种方式:

    Direct exchange
    直连模式,当具有路由键 R 的新消息到达直接交换时,如果 K = R,则交换将其路由到队列。Fanout Exchange
    扇出模式,将消息路由到绑定到它的所有队列,并且忽略路由键。Topic Exchange
    主题模式,基于消息路由键与用于将队列绑定到交换的模式之间的匹配将消息路由到一个或多个队列。Headers Exchange
    头模式,用于在多个属性上进行路由,这些属性比路由键更容易表示为消息标头。标头交换忽略路由键属性。相反,用于路由的属性取自 headers 属性。如果标头的值等于绑定时指定的值,则认为消息匹配。
    可以使用多个用于匹配的标头将队列绑定到标头交换。在这种情况下,代理需要应用程序开发人员提供的更多信息,即,它应该考虑与任何标头匹配的消息,还是所有这些消息?这就是“x-match”绑定参数的用途。当“x-match”参数设置为“any”时,只需一个匹配的标头值就足够了。或者,将“x-match”设置为“all”要求所有值必须匹配。
6.Queue

也称为Message Queue,消息队列,保存消息并将它们转发给消费者。它是消息的容器,也是消息的终点。一个消息可以投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列上将其取走。
队列有以下参数:
5. Name 队列名称
6. Durable 是否存盘,队列将在代理重新启动后是否继续存在
7. Exclusive 仅由一个连接使用
8. Auto-delete 当最后一个消费者退订时删除
9. Arguments 可选;由插件和特定于代理的功能使用,例如消息 TTL、队列长度限制等

7.Binding

绑定。Exchange和Queue之间的虚拟连接,binding中可以包含routing key。

8.Routing key

路由键,一个路由规则,虚拟机可用它来确定如何路由一个特定消息。

9.Connection

生产端与消费端应用与Broker的TCP网络连接。

10.Broker

接受客户端的连接,实现AMQP实体服务

11.Channel

信道,是TCP里面的虚拟连接;Channel是进行消息读写的通道。一个链接里面可以有多个Channel。

三.消息的可靠性

消息的可靠性主要分为生产端的消息可靠投递,及消费端的消息可靠接受处理。

1.生产端 1.1 基于AMQP事务

生产端事务是基于channel,主要包含事务开启、事务提交或事务回滚:

 	try {
 			//声明队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            String message = String.format("时间 => %s,向%s队列发送一条消息", new Date().getTime(), QUEUE_NAME);
            log.info(message);
            // 声明事务
            channel.txSelect();
            //发布消息
            channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
            //事务提交
            channel.txCommit();
        } catch (Exception e) {
            try {
                //事务回滚
                channel.txRollback();
            } catch (Exception e1) {
                e1.printStackTrace();
            }
1.2 确认机制

消息确认机制基于channel,分为同步确认及异步确认;
10. 同步确认

 //声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = String.format("时间 => %s,向%s队列发送一条消息", new Date().getTime(), QUEUE_NAME);
log.info(message);
// 开启确认模式
channel./confirm/iSelect();
//发布消息
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
//等待broke返回结果  超时时间为1秒
channel.waitFor/confirm/is(1000);
//channel.waitForConfirmsOrDie(10000);  这个与channel.waitForConfirms(1000)的区别在于,返回失败后关闭通道,后续生产者不能继续发送消息
    异步确认
    异步确认是采用监听的方式来监听消息发送的结果
 //声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = String.format("时间 => %s,向%s队列发送一条消息", new Date().getTime(), QUEUE_NAME);
log.info(message);
// 开启确认模式
channel./confirm/iSelect();
//发布消息
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
channel.add/confirm/iListener(new /confirm/iListener() {
	
	public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("succuss ack");
            }
    
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
       System.out.printf("defeat ack");
    }
});
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/746482.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号