- 本文章主要参考《我们已经不用AOP做操作日志了!》
- 业务应用生成每次操作的traceid,并更新到操作的业务表中,发送1条业务消息,包含当前操作人相关的信息
1.1 因为binlog的数据反映真实数据变动,脱离业务逻辑。所以通过tracid来关联操作人信息和数据变动 - 日志处理应用
2.1 利用cannal采集和解析业务哭的binlog日志并投递到kafka中,解析后的记录中记录了当前操作的操作类型,如属于删除、修改、新增,和新旧值的记录,格式如下:{"data":[{"id":"122158992930664499","bill_type":"1","create_time":"2020-04-2609:15:13","update_time":"2020-04-2613:45:46","version":"2","trace_id":"exclude-f04ff706673d4e98a757396efb711173"}], "database":"yl_spmibill_8", "es":1587879945200, "id":17161259, "isDdl":false, "mysqlType":{"id":"bigint(20)", "bill_type":"tinyint(2)", "create_time":"timestamp", "update_time":"timestamp", "version":"int(11)", "trace_id":"varchar(50)"}, "old":[{"update_time":"2020-04-2613:45:45", "version":"1", "trace_id":"exclude-36aef98585db4e7a98f9694c8ef28b8c"}], "pkNames":["id"],"sql":"", "sqlType":{"id":-5,"bill_type":-6,"create_time":93,"update_time":93,"version":4,"trace_id":12}, "table":"xxx_transfer_bill_117", "ts":1587879945698,"type":"UPDATE"}2.2 处理完binlon日志转换后的操作日志,如下:{ "id":"120716921250250776", "relevanceInfo":"XX0000097413282,", "remark":"签收财务网点编码由【】改为【380000】, 签收网点名称由【】改为【泉州南安网点】,签收网点code由【】改为【2534104】,运单状态code由【204】改为【205】,签收财务网点名称由【】改为【福建代理区】,签收网点id由【0】改为【461】,签收标识,1是,0否由【0】改为【1】,签收时间由【null】改为【2020-04-24 21:09:47】,签收财务网点id由【0】改为【400】,", "traceId":"120716921250250775" } - 日志收集应用对业务日志和转换后的binlog日志做整合,提供对外的日志查询搜索API
- 因为RocketMQ是Java写的,所以在队列选型时,我选择RocketMQ
- 对接官方文档:Canal Kafka RocketMQ QuickStart
- 下载(本次下载的是4.9.1)
- 设置环境变量
ROCKETMQ_HOME="D:rocketmq" NAMESRV_ADDR="localhost:9876"
- 启动nameserver
.binmqnamesrv.cmd
- 启动Broker
.binmqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
a. 修改instance 配置文件 vi conf/example/instance.properties
# 按需修改成自己的数据库信息 ################################################# ... canal.instance.master.address=192.168.1.20:3306 # username/password,数据库的用户名和密码 ... canal.instance.dbUsername = canal canal.instance.dbPassword = canal ... # mq config canal.mq.topic=example # 针对库名或者表名发送动态topic #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\..*,.*\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #库名.表名: 唯一主键,多个表之间用逗号分隔 #canal.mq.partitionHash=mytest.person:id,mytest.role:id #################################################
b. 修改canal 配置文件vi /usr/local/canal/conf/canal.properties
# ... # 可选项: tcp(默认), kafka, RocketMQ canal.serverMode = rocketMQ # ... # kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 canal.mq.servers = 127.0.0.1:9876 canal.mq.retries = 0 # flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限 canal.mq.batchSize = 16384 canal.mq.maxRequestSize = 1048576 # flatMessage模式下请将该值改大, 建议50-200 canal.mq.lingerMs = 1 canal.mq.bufferMemory = 33554432 # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下) canal.mq.canalBatchSize = 50 # Canal get数据的超时时间, 单位: 毫秒, 空为不限超时 canal.mq.canalGetTimeout = 100 # 是否为flat json格式对象 canal.mq.flatMessage = false canal.mq.compressionType = none canal.mq.acks = all # kafka消息投递是否使用事务 canal.mq.transaction = falseMQ数据消费
- rocketMQ模式: com.alibaba.otter.canal.example.rocketmq.CanalRocketMQClientExample(见cannal源码)
1.1 注意canal.mq.flatMessage配置,在cannal-v1.1.5版本中,默认为true,但在exmaple中rocketmq的消费flatMessage为false,这样会导致消费解析时报错
- 主要关注client模块的订阅方法:RocketMQCanalConnector#subscribe
1.1 MessageListenerOrderly:从命名感觉是监听顺序消息
- 处理监听到的消息:RocketMQCanalConnector#process
2.1 从下面截图可以看出,根据flatMessage不同,消息解析的方式不一样(和cannal的配置要保持同步)
(1)flatMessage = true,为JSON的解析方式 --> FlatMessage
(2)flatMessage = false,为protobuf的解析方式 --> Message
2.2 将批量消息batchMessage放入阻塞队列messageBlockingQueue(见3.处理阻塞队列messageBlockingQueue)
2.3 等批量消息处理完毕直到超时
- 处理阻塞队列messageBlockingQueue
3.1 在exmaple模块中,通过死循环拉取阻塞队列消息并打印出来(见下)
3.2 从阻塞队列拉取消息但不回复RocketMQCanalConnector#getListWithoutAck
(1)ACK见RocketMQCanalConnector#ack() (见下方提交确认3.3)
3.3 提交确认:RocketMQCanalConnector#ack
(1)批量消息的latch减1
(2)将lastGetBatchMessage属性置空(否则在上方拉取消息会报错)
- 本篇文章主要分析基于数据库Binlog记录操作日志的方案以及Cannal对接RocketMQ
1.1 在操作时因Cannal的flatMessage配置和RocketMQ消费的flatMessage不一致导致报错进行了分析 - 为了保证文章篇幅不过长,下篇文章接着分析基于数据库Binlog记录操作日志,直到最终代码完成



