栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

基于数据库Binlog记录操作日志-摸索篇

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

基于数据库Binlog记录操作日志-摸索篇

前言
  1. 本文章主要参考《我们已经不用AOP做操作日志了!》
架构设计

  1. 业务应用生成每次操作的traceid,并更新到操作的业务表中,发送1条业务消息,包含当前操作人相关的信息
    1.1 因为binlog的数据反映真实数据变动,脱离业务逻辑。所以通过tracid来关联操作人信息和数据变动
  2. 日志处理应用
    2.1 利用cannal采集和解析业务哭的binlog日志并投递到kafka中,解析后的记录中记录了当前操作的操作类型,如属于删除、修改、新增,和新旧值的记录,格式如下:
    		{"data":[{"id":"122158992930664499","bill_type":"1","create_time":"2020-04-2609:15:13","update_time":"2020-04-2613:45:46","version":"2","trace_id":"exclude-f04ff706673d4e98a757396efb711173"}],
    		"database":"yl_spmibill_8",
    		"es":1587879945200,
    		"id":17161259,
    		"isDdl":false,
    		"mysqlType":{"id":"bigint(20)",
    		"bill_type":"tinyint(2)",
    		"create_time":"timestamp",
    		"update_time":"timestamp",
    		"version":"int(11)",
    		"trace_id":"varchar(50)"},
    		"old":[{"update_time":"2020-04-2613:45:45",
    		"version":"1",
    		"trace_id":"exclude-36aef98585db4e7a98f9694c8ef28b8c"}],
    		"pkNames":["id"],"sql":"",
    		"sqlType":{"id":-5,"bill_type":-6,"create_time":93,"update_time":93,"version":4,"trace_id":12},
    		"table":"xxx_transfer_bill_117",
    		"ts":1587879945698,"type":"UPDATE"}
    
    2.2 处理完binlon日志转换后的操作日志,如下:
     {
    	  "id":"120716921250250776",
    	  "relevanceInfo":"XX0000097413282,",
    	  "remark":"签收财务网点编码由【】改为【380000】,
    	  签收网点名称由【】改为【泉州南安网点】,签收网点code由【】改为【2534104】,运单状态code由【204】改为【205】,签收财务网点名称由【】改为【福建代理区】,签收网点id由【0】改为【461】,签收标识,1是,0否由【0】改为【1】,签收时间由【null】改为【2020-04-24 21:09:47】,签收财务网点id由【0】改为【400】,",
    	  "traceId":"120716921250250775"
    	  }
    
  3. 日志收集应用对业务日志和转换后的binlog日志做整合,提供对外的日志查询搜索API
Cannal对接RokcetMQ
  1. 因为RocketMQ是Java写的,所以在队列选型时,我选择RocketMQ
  2. 对接官方文档:Canal Kafka RocketMQ QuickStart
安装RocketMQ
  1. 下载(本次下载的是4.9.1)
  2. 设置环境变量
    ROCKETMQ_HOME="D:rocketmq"
    NAMESRV_ADDR="localhost:9876"
    
  3. 启动nameserver
    	.binmqnamesrv.cmd
    
  4. 启动Broker
    	.binmqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
    
修改配置参数

a. 修改instance 配置文件 vi conf/example/instance.properties

#  按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config
canal.mq.topic=example
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\..*,.*\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################

b. 修改canal 配置文件vi /usr/local/canal/conf/canal.properties

# ...
# 可选项: tcp(默认), kafka, RocketMQ
canal.serverMode = rocketMQ
# ...
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 
canal.mq.servers = 127.0.0.1:9876
canal.mq.retries = 0
# flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下请将该值改大, 建议50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
canal.mq.flatMessage = false
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投递是否使用事务
canal.mq.transaction = false
MQ数据消费
  1. rocketMQ模式: com.alibaba.otter.canal.example.rocketmq.CanalRocketMQClientExample(见cannal源码)
    1.1 注意canal.mq.flatMessage配置,在cannal-v1.1.5版本中,默认为true,但在exmaple中rocketmq的消费flatMessage为false,这样会导致消费解析时报错
RocketMQ消费解析
  1. 主要关注client模块的订阅方法:RocketMQCanalConnector#subscribe
    1.1 MessageListenerOrderly:从命名感觉是监听顺序消息
  2. 处理监听到的消息:RocketMQCanalConnector#process
    2.1 从下面截图可以看出,根据flatMessage不同,消息解析的方式不一样(和cannal的配置要保持同步)
    (1)flatMessage = true,为JSON的解析方式 --> FlatMessage
    (2)flatMessage = false,为protobuf的解析方式 --> Message
    2.2 将批量消息batchMessage放入阻塞队列messageBlockingQueue(见3.处理阻塞队列messageBlockingQueue)
    2.3 等批量消息处理完毕直到超时
  3. 处理阻塞队列messageBlockingQueue
    3.1 在exmaple模块中,通过死循环拉取阻塞队列消息并打印出来(见下)

    3.2 从阻塞队列拉取消息但不回复RocketMQCanalConnector#getListWithoutAck
    (1)ACK见RocketMQCanalConnector#ack() (见下方提交确认3.3)

    3.3 提交确认:RocketMQCanalConnector#ack
    (1)批量消息的latch减1
    (2)将lastGetBatchMessage属性置空(否则在上方拉取消息会报错)
RocketMQ消费成功

总结
  1. 本篇文章主要分析基于数据库Binlog记录操作日志的方案以及Cannal对接RocketMQ
    1.1 在操作时因Cannal的flatMessage配置和RocketMQ消费的flatMessage不一致导致报错进行了分析
  2. 为了保证文章篇幅不过长,下篇文章接着分析基于数据库Binlog记录操作日志,直到最终代码完成
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/333287.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号