- Canal
- 认识Canal
- 安装和配置Canal
- 开启MySQL主从
- 开启binlog
- 设置用户权限
- 安装Canal
- 创建网络
- 安装Canal
- 监听Canal
- 引入依赖:
- 编写配置:
- 修改Item实体类
- 编写监听器
- 注意
- 多级缓存总结
Canal 认识Canal
Canal [kə’næl],译意为水道/管道/沟渠,canal是阿里巴巴旗下的一款开源项目,基于Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。GitHub的地址:https://github.com/alibaba/canal
Canal是基于mysql的主从同步来实现的,MySQL主从同步的原理如下:
- 1)MySQL master 将数据变更写入二进制日志( binary log),其中记录的数据叫做binary log events
- 2)MySQL slave 将 master 的 binary log events拷贝到它的中继日志(relay log)
- 3)MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
而Canal就是把自己伪装成MySQL的一个slave节点,从而监听master的binary log变化。再把得到的变化信息通知给Canal的客户端,进而完成对其它数据库的同步。
安装和配置Canal
下面我们就开启mysql的主从同步机制,让Canal来模拟salve
开启MySQL主从Canal是基于MySQL的主从同步功能,因此必须先开启MySQL的主从功能才可以。
这里以之前用Docker运行的mysql为例:
开启binlog打开mysql容器挂载的日志文件,我的在/tmp/mysql/conf目录:
log-bin=/var/lib/mysql/mysql-bin binlog-do-db=item
配置解读:
- log-bin=/var/lib/mysql/mysql-bin:设置binary log文件的存放地址和文件名,叫做mysql-bin
- binlog-do-db=heima:指定对哪个database记录binary log events,这里记录heima这个库
最终效果:
[mysqld] #跳过域名解析 skip-name-resolve #指定服务器级别的字符集 character_set_server=utf8 #指定数据存放目录 datadir=/var/lib/mysql #MySQL服务的ID server-id=1000 #mysql二进制文件 log-bin=/var/lib/mysql/mysql-bin #需要复制的数据库 binlog-do-db=item
设置用户权限
接下来添加一个仅用于数据同步的账户,出于安全考虑,这里仅提供对item这个库的操作权限。
create user canal@'%' IDENTIFIED by 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%' identified by 'canal'; FLUSH PRIVILEGES;
replication slave 的级别是global,所以不能只作用于某一数据库,而是全局
如果权限缩小为某一个数据库,那么会报错: Incorrect usage of DB GRANT and GLOBAL PRIVILEGES
重启mysql容器即可
docker restart mysql
测试设置是否成功:在mysql控制台,或者Navicat中,输入命令:
show master status;
安装Canal 创建网络
我们需要创建一个网络,将MySQL、Canal放到同一个Docker网络中:
docker network create item
让mysql加入这个网络:
docker network connect item mysql
查看当前网络细节:
docker network inspect item
安装Canal
拉取docker镜像:
docker pull canal/canal-server
然后运行命令创建Canal容器:
docker run -p 11111:11111 --name canal -e canal.destinations=item -e canal.instance.master.address=mysql:3306 -e canal.instance.dbUsername=canal -e canal.instance.dbPassword=canal -e canal.instance.connectionCharset=UTF-8 -e canal.instance.tsdb.enable=true -e canal.instance.gtidon=false -e canal.instance.filter.regex=item\..* --network item -d canal/canal-server
注意:mysql:3306,这里3306是容器中开放的端口,也就是容器中mysql启动时的端口,而不是映射到宿主机上的端口,这点必须注意,因为通过容器名直接访问,是容器与容器之间直接进行通信,不需要通过宿主机。
说明:
- -p 11111:11111:这是canal的默认监听端口
- -e canal.instance.master.address=mysql:3306:数据库地址和端口,如果不知道mysql容器名字,可以通过docker inspect 容器id来查看,因为这里是在同一个自定义网络下,因此可以用容器名字进行通信
- -e canal.instance.dbUsername=canal:数据库用户名
- -e canal.instance.dbPassword=canal :数据库密码
- -e canal.instance.filter.regex=:要监听的表名称
表名称监听支持的语法:
mysql 数据解析关注的表,Perl正则表达式. 多个正则之间以逗号(,)分隔,转义符需要双斜杠(\) 常见例子: 1. 所有表:.* or .*\..* 2. canal schema下所有表: canal\..* 3. canal下的以canal打头的表:canal\.canal.* 4. canal schema下的一张表:canal.test1 5. 多个规则组合使用然后以逗号隔开:canal\..*,mysql.test1,mysql.test2
监听Canal
Canal提供了各种语言的客户端,当Canal监听到binlog变化时,会通知Canal的客户端。
我们可以利用Canal提供的Java客户端,监听Canal通知消息。当收到变化的消息时,完成对缓存的更新。
不过这里我们会使用GitHub上的第三方开源的canal-starter客户端。地址:https://github.com/NormanGyllenhaal/canal-client
与SpringBoot完美整合,自动装配,比官方客户端要简单好用很多。
将项目拉取下来,然后安装到本地仓库中
引入依赖:
编写配置:top.javatool canal-spring-boot-starter 1.2.6-RELEASE
canal: destination: item # canal的集群名字,要与安装canal时设置的名称一致 server: 192.168.150.101:11111 # canal服务地址修改Item实体类
通过@Id、@Column、等注解完成Item与数据库表字段的映射:
@Data
@TableName("tb_item")
public class Item {
@TableId(type = IdType.AUTO)
@Id
private Long id;//商品id
@Column(name = "name")
private String name;//商品名称
private String title;//商品标题
private Long price;//价格(分)
private String image;//商品图片
private String category;//分类名称
private String brand;//品牌名称
private String spec;//规格
private Integer status;//商品状态 1-正常,2-下架
private Date createTime;//创建时间
private Date updateTime;//更新时间
@TableField(exist = false)
@Transient
private Integer stock;
@TableField(exist = false)
@Transient
private Integer sold;
}
编写监听器
通过实现EntryHandler
- 实现类通过@CanalTable("tb_item")指定监听的表信息
- EntryHandler的泛型是与表对应的实体类
@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler- {
@Autowired
private RedisHandler redisHandler;
@Autowired
private Cache
itemCache;
@Override
public void insert(Item item) {
// 写数据到JVM进程缓存
itemCache.put(item.getId(), item);
// 写数据到redis
redisHandler.saveItem(item);
}
@Override
public void update(Item before, Item after) {
// 写数据到JVM进程缓存
itemCache.put(after.getId(), after);
// 写数据到redis
redisHandler.saveItem(after);
}
@Override
public void delete(Item item) {
// 删除数据到JVM进程缓存
itemCache.invalidate(item.getId());
// 删除数据到redis
redisHandler.deleteItemById(item.getId());
}
}
在这里对Redis的操作都封装到了RedisHandler这个对象中,是我们之前做缓存预热时编写的一个类,内容如下:
@Component
public class RedisHandler implements InitializingBean {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private IItemService itemService;
@Autowired
private IItemStockService stockService;
private static final ObjectMapper MAPPER = new ObjectMapper();
@Override
public void afterPropertiesSet() throws Exception {
// 初始化缓存
// 1.查询商品信息
List- itemList = itemService.list();
// 2.放入缓存
for (Item item : itemList) {
// 2.1.item序列化为JSON
String json = MAPPER.writeValueAsString(item);
// 2.2.存入redis
redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
}
// 3.查询商品库存信息
List
stockList = stockService.list();
// 4.放入缓存
for (ItemStock stock : stockList) {
// 2.1.item序列化为JSON
String json = MAPPER.writeValueAsString(stock);
// 2.2.存入redis
redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
}
}
public void saveItem(Item item) {
try {
String json = MAPPER.writeValueAsString(item);
redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public void deleteItemById(Long id) {
redisTemplate.delete("item:id:" + id);
}
}
超详细canal入门,看这篇就够了
注意
如果canal出现了问题,通过进入canal容器内部,查看输出日志,来排查问题:
docker exec -it canal bash
查看日志:
进入容器内部canal-server/logs目录下,查看item目录下的,item.log日志
我在这里碰到一个问题是,无法识别mysql的主机地址:
原因: 一开始我运行canal容器的时候,标注的mysql地址为mysql:3307
我的mysql容器内端口为3306,宿主机映射端口为3307,我错误的将mysql宿主机映射端口写了上去,这是错误的,因为我们这里采用自定义网络进行通信,通过mysql容器名直接或容器内ip直接完成两个容器之间的通信,不需要经过宿主机,因此不能用宿主机映射端口
Docker重学系列之高级网络篇
多级缓存总结
对于不共享的本地缓存,通常采用的做法是,通过hash路由,让固定的uri去访问固定某台服务器,确保缓存命中,并且不至于缓存多分份



