本文主要讲如何通过pg的逻辑复制功能,利用kafka,同步数据到第三方数据库(比如elassticsearch)。
pg逻辑复制
关于pg逻辑复制需要先看以下文章:《PostgreSQL变更事件捕获 》《pg的时间线解析》
逻辑复制几个比较重要的概念:
复制标识
逻辑复制设计之初,是为了将主库数据实时同步到从库的,主库是发布方,从库是订阅方,那么发布方发布一条更新消息,订阅方也就是从库需要知道更新的是哪一行记录,所以需要一个复制标识。复制标识总共有default(主键做默认标识)、index、full、nothing。default是使用默认标识、index是使用唯一索引坐,full是用整行做标识。值得注意的是当使用default和index复制标识的时候,更新删除只包含主键或唯一索引字段的更新前后的值。这种情况是不符合我们项目要求的,因为常常会有这种业务场景:对比更新记录的某个字段(这个字段是不确定的)更新前后值变化去做对应的业务逻辑,比如监听表变动触发某个更新功能,或者监听表变动同步数据到es。所以我们项目是将需要监听的表的复制标识设置为full,以获取整行所有字段更新删除前后的值。
时间线
wal log是记录数据库变更的日志,随着数据库不断运行,会产生与旧的WAL文件重名的文件,这些文件进入归档目录时,会覆盖原来的旧日志,导致恢复数据库需要的WAL文件丢失。为了解决这些问题,引用了时间线的概念。当开始逻辑复制之前需要告诉数据库需要从哪个时间线,wal log哪个postion开始逻辑复制,默认是当天数据库时间线开始的所有wal。我们项目是传的是默认值,也就是从数据库当前时间线开始的所有wal,详情可以看:流复制协议 。
逻辑复制槽
一个槽代表一连串的修改,而且每个槽都有自己的状态, 允许不同的消耗者从数据库修改流的不同的点接收更改。我个人理解(可能有点不准),复制槽就是维护订阅者消费状态的,维护订阅者已经消费wal log的哪个位置这些状态数据。可看:逻辑解码的概念 。
顺序消费kafka消息
同步数据我们需要保证顺序消费消息,不然同步数据到第三方数据库可能会出现旧数据覆盖新数据的情况。比如顺序执行set a=1,set a=2。如果先消费a=2的消息,更新es a=2,然后再消费a=1消息,更新a=1。这样同步到es的数据就是错误的。
我们都知道,kafka的topic消息是分partition存储的,同一Consumer Group中的多个Consumer实例,不同时消费同一个partition,等效于队列模式。所以只要保证Consumer Group的Consumer和partition数量保持一致,消费者就能顺序消费partition消息,而同一个表的数据变更消息只要指定存储在同一个partition,同一张表的变更消息就能顺序消费了。我们项目就是通过acm配置表的顺序,将表变更消息顺序的指定存储在某个某个分区。



