《Aeron是什么?》
《Aeron中这么多空闲策略选哪个?》
《Aeron是如何实现的?—— Conductor》
《Aeron 是如何实现的?—— Ipc Publication》
《Aeron 是如何实现的?—— Ipc Subscription》
0. 简介上文分析了 Ipc Subscription 的逻辑,本文继续分析一下 ipc 场景异常情况处理。
1. 两个心跳总的来说,Client 与 Driver 之间是通过心跳超时判断对方是否异常。
-
Client 周期性检查 Driver 是否超过 driverTimeoutMs 没有心跳。
-
Driver 周期性检查 Client 是否超过 clientLivenessTimeoutMs 没有心跳。
Driver 没有心跳是个严重的问题,Client 默认将会自杀;Client 没有心跳,Driver 将会把相关的资源(pub 和 sub 关系等)关闭。下面详细看一下。
2. Client Conductor 2.1 命令处理超时 driverTimeoutNsClient Conductor 向 Driver Conductor 发出的命令都有一个超时时间 driverTimeoutNs,如果超出了抛出DriverTimeoutException,默认值 10_000 ms。
2.2 ClientConductor的AgentRunner停顿时间interServiceTimeoutNsClientConductor 的 AgentRunner 停顿时间大于interServiceTimeoutNs,那么 Client 会抛出ConductorServiceTimeoutException,并且设置isTerminating = true,此时整个Aeron就不能用了,需要重新Aeron.connect()。interServiceTimeoutNs的值取的是CncFileDescriptor.clientLivenessTimeoutNs(cncmetaDataBuffer),也就是 cnc.dat 中的 Client Liveness Timeout,默认 10 秒。
2.3 keepAliveIntervalNs这是个“自觉性”很高的操作,因为超过这个 timeout,Driver 会认为 Client 异常,进行资源清理,此时 Client 主动进行“自我处理”,节省了一次异常交互。
与 interServiceTimeoutNs 相关的还有个keepAliveIntervalNs,在数值上要求keepAliveIntervalNs 小于interServiceTimeoutNs,默认值为 500 ms这个参数是 Client Conductor 更新自己心跳的周期,另外还检查 Driver Conductor 是否存活。Driver Conductor 的心跳时间放跟新在 cnc.dat 的 to-driver Buffer 尾部consumerHeartbeatTime,Driver Conductor 的 AgentRunner 会定期更新这个心跳时间。Client 的检查分两步:
-
如果心跳时间超过 driverTimeoutMs(也就是 Client 认为 Driver 异常了),那么抛出DriverTimeoutException,这是个不可恢复异常,默认会导致程序退出。
-
否则再检查 Counters Buffer 中的心跳计数器 heartbeatTimestamp,如果心跳计数器异常(也就是 Driver 认为 Client 超时,对计数器进行了清理),那么抛出AeronException,设置 isTerminating = true;如果正常的话,就更新自己的心跳。
在 Client 这里 Publication 和 Subscription 没有太多异常抛出。对于 Publication 的 offer,如果返回值为CLOSED,表示这个通道已经关闭了,可能是 Driver 进行了资源清理,也可能是正常 close 了;如果返回MAX_POSITION_EXCEEDED,也表示这个通道不能再用了,需要重新 add 一个。
还有个小细节,如果 offer 的数据大于maxMessageLength,那么抛出 IllegalArgumentException,这需要在业务层面处理,Aeron 框架对消息大小就是有这个硬性限制。
对于 Subscription 的 poll,无论是没有可用的 Images,还是 Images 已经关闭了,都返回 0。对于 pub 或 sub 后 Client 崩溃退出,留下“脏”资源的异常情况,都在 Driver 中处理。
4. Driver ConductorDriver Conductor 对异常情况检查的逻辑基本都在processTimers方法中:首先会更新consumerHeartbeatTime,避免让 Client 认为 Driver 已经故障了,产生不可恢复的异常,对应上面 2.3 节的内容。然后检查各种资源,对于 ipc 场景有这 4 个:
-
clients:客户端
-
publicationlinks:pub 的关系
-
subscriptionlinks:sub 的关系
-
ipcPublications:pub 实体的封装,也就是 logbuffer 共享内存
资源检查的逻辑是通用的:
4.1 AeronClient上文 2.3 节提到的heartbeatTimestamp如果超过clientLivenessTimeoutMs没有心跳更新,那么 Driver 就认为这个 AeronClient 失联了,进行资源关闭。
清理掉heartbeatTimestamp这个计数器,Client 下次更新心跳时就能识别出异常。
4.2 Publicationlink检查与该 pub 关系对应的 AeronClient 是否超时,如果超时了,那么也关闭这个关系。
在 ipc 场景就是对关联的 pub 实体减一个引用,直到引用减为 0 才会对 pub 实体进行处理。
主要是设置状态为DRAINING
4.3 IpcSubscriptionlink检查与该 sub 关系对应的 AeronClient 是否超时,如果超时了,那么也关闭这个关系。
找到所有对应的 pub 实体,取消该 sub 关系。
更新清理相关的位置计数器,如果 sub 关系都关闭了,那么标识一下该 pub 实体没有连接了,Client 的 Publication 就能感知到。
4.4 IpcPublicationpub 实体的检查稍微复杂一些。
如果还有 pub 关系,那么处于ACTIVE状态,如果 pub 关系都关闭了,那么处于DRAINING状态。正常来说应该是 Active 状态,但是如果上面 3.2 的 Publicationlink 都关闭了,那么关联的 IpcPublication 就进入了 DRAINING 状态。主要处理了以下几个典型情况:
-
Active 状态,并发写入的情况如果某个 Client 写了一半异常退出了(此处用了 Blocked 这个词,判断阻塞的默认时间是 15 秒),那么会造成脏数据,订阅端会一直卡住,那么在 checkForBlockedPublisher 这个方法里处理。
-
DRAINING 状态此时没有 pub 关系了,那么处理的主要逻辑是,等待所有的 sub 关系都消费完,然后通知ON_UNAVAILABLE_IMAGE,进入LINGER状态。
-
LINGER 状态清理清理所有 sub 关系,标识该 pub 实体可以关闭。
关闭的逻辑就比较简单了



