栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ 客户端源码系列 - Connection

RabbitMQ 客户端源码系列 - Connection

前言

本次方木打算直接上干货分享 RabbitMQ Java 客户端一系列的源码分析 (com.rabbitmq:amqp-client:4.8.3)

ps: 最近接收到公司的任务就是阅读和分析 spring-rabbit、amqp-client,因此打算一同和大家分享 amqp-client。由于 RabbitMQ 是 Erlang 语言开发(暂时没有对这块分享的计划)

友情提醒:本次分享适合的人群,需要对 RabbitMQ 有一定的了解

RabbitMQ Getstarted: https://www.rabbitmq.com/#getstartedJava Client API Guide: https://www.rabbitmq.com/api-guide.html

废话不多话,开整!!!

Java Client Connection Demo

我们先看一个官网提供的 Java Client Connecting to RabbitMQ Demo

ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);

Connection conn = factory.newConnection();
Channel channel = connection.createChannel();

byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
channel.close();
connection.close();
AMQP 协议交互流程

已经使用过 RabbitMQ 的同学相信已经不陌生,因此就简单的描述下:与 RabbitMQ Broker 建立 Connection 和 Channel,发送消息后,关闭 Connection 和 Channel 的过程。下图是 针对这个过程使用 Wireshark 抓包查看整个 AMQP 协议的交互流程(172.30.0.74 为客户端即本机 ip;192.168.17.160 为 RabbitMQ Broker 的 ip)

client 与 broker 创建Connection、Channel、发送消息

client 与 broker 发送心跳(Heartbeat)、关闭Connection、Channel

为了让读者更容易看得源码,我先给大家描述下 client 与 broker 之间 AMQP 协议的交互流程描述(AMQP 协议中 不少命令都是成对存在的,抓包协议中 Info 里的命令是 -,而代码里的是 驼峰式 此处以代码为准):

    将 AMQP 0-9-1 的连接头写入底层套接字,包含指定的版本信息(客户端告诉 broker 自己使用的协议及版本,底层使用 java 自带的 socket)

    客户端等待 broker 发送的 Connection.Start (broker 告诉客户端 通信的协议和版本、SASL认证机制(详细见)、语言环境以及RabbitMQ的版本信息和支持能力)

    客户端接收后 发送 Connection.StartOk (客户端告诉 broker 连接使用的帐号和密码、认证机制、语言环境、客户的信息以及能力)

    客户端等待 broker 发送的 Connection.Tune (broker 与 客户端 进行参数协商)

    客户端接收后 发送 Connection.TuneOk (客户端 参数 [ChannelMax、frameMax、Heartbeat] 协商完成后告诉 broker)

    客户端发送 Connection.Open (客户端 告诉 broker 打开一个连接,并请求设置_virtualHost [vhost])

    broker 接收到后返回 Connection.OpenOk (client 对 vhost 进行验证,成功则返回如下此信息)

    客户端发送 Channel.Open,broker 接收到后返回 Channel.OpenOk (客户端 创建通道;broker 收到并创建通道完成)

    客户端发送 /confirm/i.Select,broker 接收到后返回 /confirm/i.SelectOk(客户端告诉 broker 消息需要使用 /confirm/i的机制,broker收到并回复)

    客户端发送消息 Basic.Publish,broker 应答返回 Basic.Ack

    期间 客户端和 broker 会相互检查彼此的心跳 heartbeat

    客户端 关闭通道 Channel.Close,broker 应答返回 Channel.CloseOk

    客户端 关闭连接 Connection.Close,broker 应答返回 Connection.CloseOk

源码分析

熟悉完AMQP 协议的交互流程易于后续理解源码,开始本次主要介绍 Connection 相关的源码:ConnectionFactory.newConnection --> AMQConnection.start

ConnectionFactory.newConnection()

public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName)
        throws IOException, TimeoutException {
        if(this.metricsCollector == null) {
            this.metricsCollector = new NoOpMetricsCollector();
        }
        // make sure we respect the provided thread factory
  			// 创建 socketFactory 和 初始化相应的配置
        frameHandlerFactory fhFactory = createframeHandlerFactory();
  			// 初始化 Connection 涉及到的参数
        ConnectionParams params = params(executor);
        // set client-provided via a client property
        if (clientProvidedName != null) {
            Map properties = new HashMap(params.getClientProperties());
            properties.put("connection_name", clientProvidedName);
            params.setClientProperties(properties);
        }

  			// 这块逻辑属于 rabbit提供自动回复连接的逻辑
        if (isAutomaticRecoveryEnabled()) {
            // see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
            AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);

            conn.init();
            return conn;
        } else {
            List
addrs = addressResolver.getAddresses(); Exception lastException = null; for (Address addr : addrs) { try { // 创建、连接 socket 并封装成 返回 SocketframeHandler (socket 不采用Negale算法[Negale算法,大家有兴趣可以了解下这块针对socket缓存性能的优化]) frameHandler handler = fhFactory.create(addr); // 初始化配置、_channel0、_channelManager等等 AMQConnection conn = createConnection(params, handler, metricsCollector); // 启动 AMQConnection 后续会进行详细介绍 conn.start(); this.metricsCollector.newConnection(conn); return conn; } catch (IOException e) { lastException = e; } catch (TimeoutException te) { lastException = te; } } if (lastException != null) { if (lastException instanceof IOException) { throw (IOException) lastException; } else if (lastException instanceof TimeoutException) { throw (TimeoutException) lastException; } } throw new IOException("failed to connect"); } }

AMQP 协议的交互流程中 1~6 的逻辑属于 AMQConnection.start() 的重点逻辑,也是本次给大家主要介绍的点

public void start()
            throws IOException, TimeoutException {
  			// 初始化工作线程
        initializeConsumerWorkService();
  			// 初始化心跳发送
        initializeHeartbeatSender();
  			// 将 Connection标志位 启动
        this._running = true;

  			// 确认客户端 第一件事 发送header头部协议
        AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =
            new AMQChannel.SimpleBlockingRpcContinuation();

  			// 进入Rpc队列进行阻塞,等待broker返回 connection.start method
        _channel0.enqueueRpc(connStartBlocker);
        try {
            // The following two lines are akin to AMQChannel's
            // transmit() method for this pseudo-RPC.
            _frameHandler.setTimeout(handshakeTimeout);
          	// 1. 发送header头部协议 AMQP 0-9-1
            _frameHandler.sendHeader();
        } catch (IOException ioe) {
            _frameHandler.close();
            throw ioe;
        }

  			// 初始化启动 startMainLoop -- 为了接收和处理broker发送的消息
        this._frameHandler.initialize(this);

        AMQP.Connection.Start connStart;
        AMQP.Connection.Tune connTune = null;
        try {
          	// 2. 客户端等待 broker 发送的 Connection.Start
            connStart =
                    (AMQP.Connection.Start) connStartBlocker.getReply(handshakeTimeout/2).getMethod();

          	// 通信的协议和版本、SASL认证机制(详细见)、语言环境以及RabbitMQ的版本信息和支持能力
            _serverProperties = Collections.unmodifiableMap(connStart.getServerProperties());

            Version serverVersion =
                    new Version(connStart.getVersionMajor(),
                                       connStart.getVersionMinor());
						
          	// 版本比对
            if (!Version.checkVersion(clientVersion, serverVersion)) {
                throw new ProtocolVersionMismatchException(clientVersion,
                                                                  serverVersion);
            }

            String[] mechanisms = connStart.getMechanisms().toString().split(" ");
            SaslMechanism sm = this.saslConfig.getSaslMechanism(mechanisms);
            if (sm == null) {
                throw new IOException("No compatible authentication mechanism found - " +
                                              "server offered [" + connStart.getMechanisms() + "]");
            }

            String username = credentialsProvider.getUsername();
            String password = credentialsProvider.getPassword();
            LongString challenge = null;
            LongString response = sm.handleChallenge(null, username, password);

            do {
                // 3. 客户端接收后 发送 `Connection.StartOk`
                Method method = (challenge == null)
                                        ? new AMQP.Connection.StartOk.Builder()
                                                  .clientProperties(_clientProperties)
                                                  .mechanism(sm.getName())
                                                  .response(response)
                                                  .build()
                                        : new AMQP.Connection.SecureOk.Builder().response(response).build();

                try {
                    Method serverResponse = _channel0.rpc(method, handshakeTimeout/2).getMethod();
                    if (serverResponse instanceof AMQP.Connection.Tune) {
                        // 4. 客户端等待 broker 发送的 Connection.Tune
                        connTune = (AMQP.Connection.Tune) serverResponse;
                    } else {
                        challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();
                        response = sm.handleChallenge(challenge, username, password);
                    }
                } catch (ShutdownSignalException e) {
                    Method shutdownMethod = e.getReason();
                    if (shutdownMethod instanceof AMQP.Connection.Close) {
                        AMQP.Connection.Close shutdownClose = (AMQP.Connection.Close) shutdownMethod;
                        if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) {
                            throw new AuthenticationFailureException(shutdownClose.getReplyText());
                        }
                    }
                    throw new PossibleAuthenticationFailureException(e);
                }
            } while (connTune == null);
        } catch (TimeoutException te) {
            _frameHandler.close();
            throw te;
        } catch (ShutdownSignalException sse) {
            _frameHandler.close();
            throw AMQChannel.wrap(sse);
        } catch(IOException ioe) {
            _frameHandler.close();
            throw ioe;
        }

        try {
          	// 最大通道数
            int channelMax =
                negotiateChannelMax(this.requestedChannelMax,
                                    connTune.getChannelMax());
            _channelManager = instantiateChannelManager(channelMax, threadFactory);
						
          	// 帧最大的大小
            int frameMax =
                negotiatedMaxValue(this.requestedframeMax,
                                   connTune.getframeMax());
            this._frameMax = frameMax;

          	// 心跳
            int heartbeat =
                negotiatedMaxValue(this.requestedHeartbeat,
                                   connTune.getHeartbeat());

            setHeartbeat(heartbeat);

            // 5. 客户端接收后 发送 Connection.TuneOk
            _channel0.transmit(new AMQP.Connection.TuneOk.Builder()
                                .channelMax(channelMax)
                                .frameMax(frameMax)
                                .heartbeat(heartbeat)
                              .build());
            // 6. 客户端发送 Channel.Open
            _channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder()
                                      .virtualHost(_virtualHost)
                                    .build());
        } catch (IOException ioe) {
            _heartbeatSender.shutdown();
            _frameHandler.close();
            throw ioe;
        } catch (ShutdownSignalException sse) {
            _heartbeatSender.shutdown();
            _frameHandler.close();
            throw AMQChannel.wrap(sse);
        }

        // We can now respond to errors having finished tailoring the connection
        this._inConnectionNegotiation = false;
    }
最后

本次分享的目的,先让读者对于 RabbitMQ Client 与 RabbitMQ Broker 根据AMQP协议交互流程有个大体的认识,并根据分析Connection源码有一定认知,其中还有很多 Connection 细节源码需要读者慢慢体会


我的微信公众号:Java架构师进阶编程
专注分享Java技术干货,还有我整理的上百份面试题库,期待你的关注!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/784120.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号