1、教程初衷,最近想要使用kettle实现单表增量同步,在各种搜索中,实现成功后,很是感激,也想通过教程复述的方式记录方法的实现过程、理清思路,好了好了,废话多多少少有点多了。
2、介绍具体环境
想要将input_test表中数据实时同步到output_test表中,小编的这两个表在mysql的同一个数据库中,当然在mysql的不同数据库也可以实现。
3、实现思路
(1)获取上一次同步时间。
(2)同步本次更新数据。
(3)记录本次同步时间。
4、使用工具kettle和navicat premium15,需要安装好Java。小编都放到这里啦。
navicat premium15链接:https://pan.baidu.com/s/1mPRnVLaZTph4UM4r8oVHtg
提取码:guan
Java安装程序链接:https://pan.baidu.com/s/157WdYbHnn0jPQbDY88jf0w
提取码:guan
kettle及mysql连接的jar包(把jar包放在lib文件夹下,重启kettle就可以生效)
链接:https://pan.baidu.com/s/1EydEaM1apeqrc53Y6KrByA
提取码:guan
4、实际操作步骤
步骤一:
提到上一次同步时间,我们需要创建一个日志表(rz)来记录每次的同步信息,表字段包括同步id,同步人,同步结果,同步时间,也需要input_test和output_test中都存在一列更新时间(将更新时间与同步时间作比较来判断是否为增量数据)
小编使用navicat premium15,来实现创建日志表和插入更新时间列操作。
日志表,表名:rz , 结构如下:
并且添加了一行数据
input_test和out_test表结构一致,添加了gxsj列。
input_test表里插入几条数据,最后两条的更新时间是在同步时间之后的。input_test表内现在暂无数据。
把添加的gxsj列设为系统时间。
update input_test set gxsj=SYSDATE()
update output_test set gxsj=SYSDATE()
步骤二:使用kettle新建转换,命名为:1-获取上一次同步时间
添加步骤 表输入
编辑表输入->输入sql语句,SELECT IFNULL(max(tbsj),SYSDATE()-INTERVAL 1 DAY) from rz;
点击预览,预览结果如下:
添加步骤 设置变量 ->点击获取字段-> 确定。(把TBSJ设为全局变量)
步骤三:使用kettle新建转换,命名为:2-同步本次更新数据
添加步骤 表输入:
编辑表输入->输入sql语句,SELECt * FROM input_test WHERe gxsj>"${TBSJ}";
勾选替换SQL语句里的变量->点击预览。选择出了更新时间大于同步时间的数据
添加步骤 字段选择->编辑字段选择->元数据->获取改变的字段,将字段gxsj设置为Date类型,格式为yyyy/MM/dd HH:mm:ss。如下图所示。因为在navicat里精确到了毫秒,但是在mysql中精确到了秒。
添加步骤 插入/更新 ->编辑步骤->获取字段 只保留表字段name->获取和更新字段
将name字段更新设为N。如下图所示。
步骤四:使用kettle新建转换,命名为:3-记录本次同步时间。
添加步骤 执行SQL脚本->编辑步骤 添加sql语句
INSERT INTO rz(id,bm,result,tbsj) VALUES(nextval(“seq_rz”),‘t_user’,1,SYSDATE());
在插入rz表的值中,我们注意到了nextval(“seq_rz”)。
nextval用来获取序列号为seq_rz的下一个squence的值,我们需要新建sequence表,以及function nextval、 currval、setval。
CREATE TABLE `sequence` (`name` VARCHAR ( 50 ) COLLATE utf8_bin NOT NULL COMMENT '序列的名字',`current_value` INT ( 11 ) NOT NULL COMMENT '序列的当前值',`increment` INT ( 11 ) NOT NULL DEFAULT '1' COMMENT '序列的自增值',PRIMARY KEY ( `name` ) ) ENGINE = INNODB DEFAULT CHARSET = utf8 COLLATE = utf8_bin;
DROp FUNCTION IF EXISTS nextval; DELIMITER $ CREATE FUNCTION nextval (seq_name VARCHAR(50)) RETURNS INTEGER LANGUAGE SQL DETERMINISTIC CONTAINS SQL SQL SECURITY DEFINER COMMENT '' BEGIN UPDATE sequence SET current_value = current_value + increment WHERe name = seq_name; RETURN currval(seq_name); END $ DELIMITER ;
DROP FUNCTION IF EXISTS currval; DELIMITER $ CREATE FUNCTION currval (seq_name VARCHAR(50)) RETURNS INTEGER LANGUAGE SQL DETERMINISTIC CONTAINS SQL SQL SECURITY DEFINER COMMENT '' BEGIN DECLARE value INTEGER; SET value = 0; SELECT current_value INTO value FROM sequence WHERe name = seq_name; RETURN value; END $ DELIMITER ;
```sql DROP FUNCTION IF EXISTS setval; DELIMITER $ CREATE FUNCTION setval (seq_name VARCHAR(50), value INTEGER) RETURNS INTEGER LANGUAGE SQL DETERMINISTIC CONTAINS SQL SQL SECURITY DEFINER COMMENT '' BEGIN UPDATE sequence SET current_value = value WHERe name = seq_name; RETURN currval(seq_name); END $ DELIMITER ;
步骤五:新建job,在job中添加步骤start、3个转换、成功。把前面编辑好的转换添加到job中,run就可以了。
我们来看运行结果。
(1)运行日志。output_test中插入了两条数据
(2)output_test表结果,新增两条数据。
(3)日志表。日志表中插入了一条同步信息数据,id是自增的。
总结:增量同步可以实现,没有尝试改数据后同步的实现,需要多加一步比较步骤?期待友友们的答案。



