栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Go语言

etcd watch原理

Go语言 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

etcd watch原理

etcd watch原理
    • 服务初始化
    • 接收 watch 请求
      • 1.接收 watch 请求 recvLoop
      • 2. 接收 watch 请求 sendLoop
    • MVCC watch
      • watch方法:
    • MVCC 消息生成
    • newWatchableStore方法
      • 慢速处理

粗浅了解,如有错误之处,请指教!

服务初始化

etcd 启动时会注册 WatchServer[1], pb.WatchServer 用于处理 watch 请求

接收 watch 请求
  1. 每一个 watch 流都创建一个 serverWatchStream 结构体
  2. 开启两个 goroutine, sendLoop 用于发送 watch 消息到流中,recvLoop 接受请求
  3. select 阻塞直到流关闭,或是超时退出。
1.接收 watch 请求 recvLoop

recvLoop 从 gRPCStream 读出 req, 然后分别处理类型为 CreateRequest, CancelRequest, ProgressRequest 的情况

  1. CreateRequest: 监听的可能是一个范围,所以构建 key 和 RangeEnd. 处理 StartRevision, 如果为 0, 那么使用当前 系统最新的 Rev+1. 调用 mvcc 层的 watchStream.Watch, 返回一个 watchid, 将这个 id封装到watchResponse,再将watchResponse 写到 ctrlStream
  2. CancelRequest: 还是调用 mvcc 层的 watchableStore.Cancel 取消订阅,然后清除状态信息
  3. ProgressRequest: broadcast 广播当前系统的 Rev 版本
2. 接收 watch 请求 sendLoop

在 watchid 生成前,可能就有消息触发了,此时还没有 id, 所以消息会堆积到 pending 中。整个函数主要从 mvcc.watchStream.Chan() 中处理读取订阅的消息,处理 ctrlStream 控制消息和处理 progressTicker

  1. Chan(): 如果 needPrevKV, 需要填充。watchid 不存在的话,暂时移到 pending 队列中。Fragment 查看是否需要分包,这里阈值是 1.5M, 不需要的话直接调用 sws.gRPCStream.Send 发送即可。如果有数据发送的情况,sws.progress[wresp.WatchID] 置为 false, 不用发进度消息
  2. ctrlStream: 读取控制消息,这里只要是获取 watchid, 然后发送堆积的 pending 消息
  3. progressTicker: 定期调用 RequestProgress 生成进度消息,把当前 Rev 发给 client
MVCC watch

这一块主要是看 mvcc.watchStream, 看下 Watch 如何实现
主要是用来生成 watchid, 自增就可以了。再调用watch方法得到一个watcher和cancelFunc,将watcher和cancelFunc放入watchStream

watch方法:

WatchStream的watchableStore 一共有三个 group: synced, unsynced 与 victims, 当 client watch 时是从历史记录开始的,也就是说此时有一堆消息待发送给 client, 那么将 watcher 结构体扔到 unsynced 组中,否则扔到 synced 组中。为什么这么做呢?因为消息处理有快慢,后面具体代码再讲,只要记住 watcher 会在这三个组中流转即可,当然理想情况一直待在 synced 组中

MVCC 消息生成

底层 Txn 用 watchableStoreTxnWrite 封装了一下,在调用 End 提交事务前,调用 notify 将变更的消息发送出去。
遍历 changes, 判断类型 mvccpb.DELETE 或是 mvccpb.PUT, 然后封装成 envs 事件,调用 tw.s.notify 发送出去后提交。
newWatcherBatch 用于从 synced 组中获取待发送的 watcher, 然后调用 w.send 发送到 channel 里面,如果 channel 满了,那么说明发送不出去,将 watcher 从 synced 组中删除,并添加到 victim 组中,等后后续异步 goroutine syncVictimsLoop 处理。我们看一下,newWatcherBatch 实现
watcherSetByKey 用于返回满足 ev.Kv.Key 的 watcher, 这里内部实现使用 adt 红黑树,可以做到快速的范围匹配。感兴趣的可以看源代码。
send 函数先 apply filter 过滤一遍,然后发送到 w.ch 中,如果满了则返回 false. 这个 w.ch 就是 v3rpc 使用的 channel, 有数据后就发送 http2 stream …

newWatchableStore方法

etcd服务启动创建watchableStore,在 newWatchableStore 时,会生成两个异步 goroutine, syncWatchersLoop 用于将 unsynced 的 watcher 变成 synced watcher, syncVictimsLoop 用于将 victims 的消息尽可能的发送出。

慢速处理

1.慢速处理 victim
调用 moveVictims 尝试去发送堆积的消息
代码很简单,先尝试发送 victims 这些消息,如果失败了,再放到 victims 中。成功了的话,还要看当前系统中的 Rev 是否与该 watcher.minRev 相等,再考滤放到 synced 组还是 unsynced 组中。
2.慢速处理 unsynced
syncWatchersLoop 函数循环调用 syncWatchers 处理 unsynced 组数据

  1. choose 从 unsynced 中选择待发送数据的 watcher groups, 只看版本是否可用,即处于 [compactRev, curRev]
  2. UnsafeRange 从 boltdb 中获取所有满足条件的 keys/values
  3. 遍历 watchers,开始发送符合条件的 keys/values, 成功了那么从 unsynced 中删除,再加到 synced 中,否则加到 victims 队列中

参考链接: 董泽润的技术笔记:一文了解 etcd watch 实现

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/991561.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号