目前正在做一个项目,人力资源方面的,大部分业务都与用户表息息相关,用户服务和其他各个服务相互独立,所以无法进行关联查询。并且实际业务需要,很多都需要去关联到用户表,所以比较纠结,故有如下方案:
方案一:
将用户表经常使用到的字段冗余到各个业务表,用户信息修改之后,发送一个消息到mq,然后涉及到冗余用户字段的服务去订阅这个队列,然后进行修改冗余字段数据,其实这个方法也不错。
优点:此方案其他业务涉及到用户数据的不用进行关联查询,效率高
缺点:冗余字段很难维护,冗余度会随着业务不断的扩张而扩张,如果增加某个字段,很容易漏掉
方案二:
利用canal,单独抽离出一个服务,此服务只用户将用户服务的用户数据,同步到其他服务对应所拆分的库,此服务采用多数据源的循环的去维护各个库中的用户表。
优点:单独抽离出一个服务,方便维护与管理,可扩展性较高,如果增加分库的服务,则增加一个数据源即可,代码基本无需改动。
缺点:每一个库都要建立一个用户表,比较浪费资源,并且会使用的关联查询。
安装参照:安装直通车
一、首先安装canal下载地址:https://github.com/alibaba/canal/releases
这里我选择的是1.1.4版本
点击直接下载。
如果地址无法访问,请加微信:osm164502,我可私发。
修改:安装目录/conf/example/instance.properties文件对应的位置
# position info canal.instance.master.address=127.0.0.1:3306 # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal
修改startup.bat的启动脚本
找到my.ini文件
我的配置文件windows在:C:ProgramDataMySQLMySQL Server 5.7my.ini,如果你的也是5.7,也是在这个位置。
打开文件,找到:Binary Logging.
新增如下配置:
log-bin=mysql-bin #binlog文件名 binlog_format=ROW #选择row模式
找到Server Id,设置server-id,
server-id=1 # mysql实例id,不能和canal的slaveId重复
重启mysql服务
新增用户canal
这个用户名可以改成你想改的,这里的用户密码需要和instance.properties里面的用户名密码对应!
-- 使用命令登录:mysql -u root -p -- 创建用户 用户名:canal 密码:canal create user 'canal'@'%' identified by 'canal'; -- 授权 *.*表示所有库 grant SELECt, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'canal';4、启动,双击startup.bat
到此canal服务就安装好了,然后根据我们的实际业务处理来了。
单独抽离一个springboot服务,来处理即可 1、新建一个springboot项目,pom文件如下:2、配置文件org.springframework.boot spring-boot-starter-parent 2.3.1.RELEASE com.alibaba.otter canal.client 1.1.4 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-jdbc com.alibaba druid-spring-boot-starter 1.1.10 mysql mysql-connector-java runtime org.projectlombok lombok 1.16.18 provided com.baomidou mybatis-plus-boot-starter 3.3.1 com.baomidou mybatis-plus-generator
server:
port: 9999
spring:
datasource:
type: com.alibaba.druid.pool.DruidDataSource
druid:
work-server:
#MySQL配置
driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/db1?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: root
partner-server:
#MySQL配置
driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/db2?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: root
# mybatis-plus配置
mybatis-plus:
mapperLocations: classpath:mapper/*Mapper.xml
configuration:
lazy-loading-enabled: true
aggressive-lazy-loading: false
map-underscore-to-camel-case: true
type-aliases-package: com.xxx.entity
xfr:
canal:
host: 127.0.0.1
port: 11111
subscribe: db.user_info
3、启动类
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
@MapperScan("com.xxx.mapper") // mybatis的包扫描配置
@SpringBootApplication(exclude= {DataSourceAutoConfiguration.class}) // 排除数据源自动配置,因为我们要手动配置多个数据源,需要同步到多个库中
public class CanalApplication {
public static void main(String[] args) {
SpringApplication.run(CanalApplication.class, args);
}
}
4、CanalConfig
package com.xfr.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;
import com.xfr.handler.DataHandler;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.net.InetSocketAddress;
import java.util.List;
@Configuration
@ConfigurationProperties(prefix = "xfr.canal")
public class CanalConfig implements InitializingBean {
@Autowired
private List handlerList;
@Value("${xfr.canal.host}")
private String host;
@Value("${xfr.canal.port}")
private Integer port;
@Value("${xfr.canal.subscribe}")
private String subscribe;
@Override
public void afterPropertiesSet() throws Exception {
// 创建链接
InetSocketAddress inetSocketAddress = new InetSocketAddress(host, port);
CanalConnector connector = CanalConnectors.newSingleConnector(inetSocketAddress, "example", "", "");
try {
//打开连接
connector.connect();
//订阅数据库表,全部表
connector.subscribe(subscribe);
//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(200);
//获取批量ID
long batchId = message.getId();
//获取批量的数量
int size = message.getEntries().size();
//如果没有数据
if (batchId == -1 || size == 0) {
try {
//线程休眠2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//如果有数据,处理数据
for (DataHandler dataHandler : handlerList) {
dataHandler.doHandler(message.getEntries());
}
}
//进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
}
4、动态数据源配置
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
public class DynamicDataSource extends AbstractRoutingDataSource {
private static final ThreadLocal contextHolder = new ThreadLocal<>();
public static Map
import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
@Configuration
@Component
public class DynamicDataSourceConfig {
@Bean
@ConfigurationProperties("spring.datasource.druid.work-server")
public DataSource workServerDataSource(){
return DruidDataSourceBuilder.create().build();
}
@Bean
@ConfigurationProperties("spring.datasource.druid.partner-server")
public DataSource partnerServerDataSource(){
return DruidDataSourceBuilder.create().build();
}
@Bean
@Primary
public DynamicDataSource dataSource(DataSource workServerDataSource, DataSource partnerServerDataSource) {
Map targetDataSources = new HashMap<>();
targetDataSources.put("workServer",workServerDataSource);
targetDataSources.put("partnerServer", partnerServerDataSource);
return new DynamicDataSource(workServerDataSource, targetDataSources);
}
}
4、数据处理器
import com.alibaba.otter.canal.protocol.CanalEntry;
import java.util.List;
public interface DataHandler {
void doHandler(List entrys) throws Exception;
}
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.google.common.base.Objects;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.xfr.datasource.DynamicDataSource;
import com.xfr.entity.UserInfoCopy;
import com.xfr.service.impl.UserInfoServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
@Service
public class SyncUserInfoHandler implements DataHandler {
@Autowired
private UserInfoServiceImpl userInfoService;
@Override
public void doHandler(List entrys) throws InvalidProtocolBufferException {
for (CanalEntry.Entry entry : entrys) {
String tableName = entry.getHeader().getTableName();
CanalEntry.EntryType entryType = entry.getEntryType();
ByteString storevalue = entry.getStorevalue();
if (Objects.equal(entryType, CanalEntry.EntryType.ROWDATA)) {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storevalue);
List userInfoList = new ArrayList<>();
// 删除语句
if (Objects.equal(rowChange.getEventType(), CanalEntry.EventType.DELETE)) {
List delIds = new ArrayList<>();
genDelIds(rowChange, delIds);
userInfoService.removeByIds(delIds);
} else if (Objects.equal(rowChange.getEventType(), CanalEntry.EventType.INSERT)) {
genUserInfoList(rowChange, rowChange.getEventType(), userInfoList);
saveBatch(userInfoList);
} else if (Objects.equal(rowChange.getEventType(), CanalEntry.EventType.UPDATE)) {
genUserInfoList(rowChange, rowChange.getEventType(), userInfoList);
if (!CollectionUtils.isEmpty(userInfoList)) {
userInfoService.updateBatchById(userInfoList);
}
}
userInfoList.forEach(item -> {
System.out.println(item);
});
} else {
System.out.println("当前操作类型为: " + entryType);
}
}
}
private void saveBatch(List userInfoList) {
for (Object key : DynamicDataSource.allDataSources.keySet()) {
System.out.println("Key = " + key);
DynamicDataSource.setDataSource((String) key);
if (!CollectionUtils.isEmpty(userInfoList)) {
userInfoService.saveBatch(userInfoList);
}
}
DynamicDataSource.clearDataSource();
}
private void genDelIds(CanalEntry.RowChange rowChange, List delIds) {
row:for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
column:for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
if (Objects.equal(column.getName(), "id")) {
delIds.add(Integer.valueOf(column.getValue()));
continue column;
}
}
}
}
private void genUserInfoList(CanalEntry.RowChange rowChange, CanalEntry.EventType eventType, List userInfoList) {
if (eventType.equals(CanalEntry.EventType.UPDATE)) {
genUpdate(rowChange, userInfoList);
} else if (eventType.equals(CanalEntry.EventType.INSERT)) {
genInsert(rowChange, userInfoList);
}
}
public void genUpdate(CanalEntry.RowChange rowChange, List userInfoList) {
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
UserInfoCopy userInfo = new UserInfoCopy();
for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
if (Objects.equal(column.getName(), "id")) {
userInfo.setId(Integer.valueOf(column.getValue()));
} else if (Objects.equal(column.getName(), "name") && column.getUpdated()) {
userInfo.setName(column.getValue());
} else if (Objects.equal(column.getName(), "account") && column.getUpdated()) {
userInfo.setAccount(column.getValue());
} else if (Objects.equal(column.getName(), "password") && column.getUpdated()) {
userInfo.setPassword(column.getValue());
} else if (Objects.equal(column.getName(), "phone") && column.getUpdated()) {
userInfo.setPhone(column.getValue());
} else if (Objects.equal(column.getName(), "status") && column.getUpdated()) {
userInfo.setStatus(Integer.valueOf(column.getValue()));
}
}
userInfoList.add(userInfo);
}
}
public void genInsert(CanalEntry.RowChange rowChange, List userInfoList) {
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
UserInfoCopy userInfo = new UserInfoCopy();
for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
if (Objects.equal(column.getName(), "id")) {
userInfo.setId(Integer.valueOf(column.getValue()));
} else if (Objects.equal(column.getName(), "name")) {
userInfo.setName(column.getValue());
} else if (Objects.equal(column.getName(), "account")) {
userInfo.setAccount(column.getValue());
} else if (Objects.equal(column.getName(), "password")) {
userInfo.setPassword(column.getValue());
} else if (Objects.equal(column.getName(), "phone")) {
userInfo.setPhone(column.getValue());
}
}
userInfoList.add(userInfo);
}
}
}
实际真实业务处理在SyncUserInfoHandler类,如果需要增加其他扩展,只需要实现DataHandler接口,并实现doHandler方法即可。
源码地址



