CM和CDH版本:5.13.1已安装Impala在集群中 88.2 操作演示
拉链表设计
表USER,用于存储用户最新的全量信息,如下图:建表:
create table user( id bigint, username string, birthday timestamp ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY 'n' STORED AS parquet;
初始数据:
INSERT INTO user values (10001, 'fayson', '1989-08-28'), (10002, 'zhangsan', '1979-07-28'), (10003, 'lisi', '1980-06-18'), (10004, 'wangwu', '1977-01-20');
拉链表USER_HIS,如下图:建表:
使用分区表,是为了能够实现拉链数据的更新和删除
create table user_his( id bigint, username string, birthday timestamp, start_dt timestamp ) partitioned by (end_dt string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY 'n' STORED AS parquet;
使用上面的表创建USER和USER_HIS表,并初始化USER表数据
拉链实现
USER_HIS表中创建一个’9999-12-31’的分区用于存储所有用户开链数据
ALTER TABLE user_his ADD PARTITION (end_dt='9999-12-31');
首次USER_HIS表中无任何数据,通过USER表数据初始化拉链表USER_HIS表数据,插入所有用户的开链数据
INSERT overwrite TABLE user_his PARTITION (end_dt = '9999-12-31')
SELECt id,
username,
birthday,
from_timestamp(adddate(now(), -3), 'yyyy-MM-dd')
FROM USER;
USER表的username修改为如下
为了与拉链表对比用户数据的变更
INSERT overwrite TABLE USER
SELECt id,
concat(username,'1'),
birthday
FROM USER;
拉链表上创建”2018-01-16”的分区
--ALTER TABLE user_his ADD PARTITION (end_dt= from_timestamp(now(), 'yyyy-MM-dd')); ALTER TABLE user_his ADD PARTITION (end_dt= "2018-01-16");
将修改的USER表用户数据与USER_HIS表中开链数据比对,将可以闭链的数据插入”2018-01-16”分区
INSERT overwrite TABLE user_his PARTITION (end_dt = "2018-01-16")
SELECt b.id,
b.username,
b.birthday,
b.start_dt
FROM USER a
LEFT JOIN user_his b ON a.id=b.id
WHERe b.end_dt = '9999-12-31'
AND (a.username != b.username
OR a.birthday != b.birthday);
在用户表中新增一条用户信息,模拟用户表数据不存在拉链表的开链数据中
INSERT INTO user VALUES (10005, 'zhaoda', '1976-02-09');
更新拉链表USER_HIS的开链数据
包含已更新的用户、未更新用户和新增用户
INSERT overwrite TABLE user_his PARTITION(end_dt = '9999-12-31')
SELECt a.id,
a.username,
a.birthday,
b.end_dt AS start_dt
FROM USER a
LEFT JOIN user_his b ON a.id = b.id
WHERe b.end_dt = "2018-01-16"
union all
SELECt b.id,
b.username,
b.birthday,
b.start_dt
FROM user_his b
WHERe NOT EXISTS
(SELECt id
FROM user_his c
WHERe c.end_dt = "2018-01-16" and b.id = c.id)
AND b.end_dt = '9999-12-31'
union ALL
SELECt a.id,
a.username,
a.birthday,
"2018-01-16" AS start_dt
FROM USER a
WHERe NOT EXISTS
(SELECt 1
FROM user_his b
WHERe end_dt = '9999-12-31'
AND a.id = b.id);
模拟更新部分用户信息,验证拉链业务是否正常
USER表数据
INSERT INTO user values (10001, 'fayson2', '1989-09-27'), (10002, 'zhangsan2', '1979-07-28'), (10003, 'lisi1', '1980-06-18'), (10004, 'wangwu1', '1977-01-20'), (10005, 'zhaoda', '1976-02-09');
创建USRE_HIS表“2018-01-17”分区
ALTER TABLE user_his ADD PARTITION (end_dt= "2018-01-17");
用户的闭链数据插入到“2018-01-17”分区
INSERT overwrite TABLE user_his PARTITION (end_dt = "2018-01-17")
SELECt b.id,
b.username,
b.birthday,
b.start_dt
FROM USER a
LEFT JOIN user_his b ON a.id=b.id
WHERe b.end_dt = '9999-12-31'
AND (a.username != b.username
OR a.birthday != b.birthday);
根据USER和USER_HIS中“2018-01-17”分区的闭链数据,更新所有用户开链数据:
含新增用户、闭链用户和开链用户
INSERT overwrite TABLE user_his PARTITION(end_dt = '9999-12-31')
SELECt a.id,
a.username,
a.birthday,
b.end_dt AS start_dt
FROM USER a
LEFT JOIN user_his b ON a.id = b.id
WHERe b.end_dt = "2018-01-17"
union all
SELECt b.id,
b.username,
b.birthday,
b.start_dt
FROM user_his b
WHERe NOT EXISTS
(SELECt id
FROM user_his c
WHERe c.end_dt = "2018-01-17" and b.id = c.id)
AND b.end_dt = '9999-12-31'
union ALL
SELECt a.id,
a.username,
a.birthday,
"2018-01-17" AS start_dt
FROM USER a
WHERe NOT EXISTS
(SELECt 1
FROM user_his b
WHERe end_dt = '9999-12-31'
AND a.id = b.id);
完整脚本
执行脚本的前置条件,拉链表已存在且已创建了开链分区,脚本中将分区替换为当前日期按照每天的一次的频率执行
use test_db;
--创建当天闭链分区
ALTER TABLE user_his ADD PARTITION(end_dt= from_timestamp(now(), 'yyyy-MM-dd'));
--将闭链数据插入当天闭链分区中
INSERT overwrite TABLE user_his PARTITION(end_dt = from_timestamp(now(), 'yyyy-MM-dd'))
SELECt b.id,
b.username,
b.birthday,
b.start_dt
FROM USER a
LEFT JOIN user_his b ON a.id=b.id
WHERe b.end_dt = '9999-12-31'
AND (a.username != b.username
OR a.birthday != b.birthday);
--更新拉链表数据开链数据(包含已更新的用户、未更新用户和新增用户)
INSERT overwrite TABLE user_his PARTITION(end_dt = '9999-12-31')
SELECt a.id,
a.username,
a.birthday,
b.end_dt AS start_dt
FROM USER a
LEFT JOIN user_his b ON a.id = b.id
WHERe b.end_dt = from_timestamp(now(), 'yyyy-MM-dd')
union all
SELECt b.id,
b.username,
b.birthday,
b.start_dt
FROM user_his b
WHERe NOT EXISTS
(SELECt id
FROM user_his c
WHERe c.end_dt = from_timestamp(now(), 'yyyy-MM-dd') and b.id = c.id)
AND b.end_dt = '9999-12-31'
union ALL
SELECt a.id,
a.username,
a.birthday,
from_timestamp(now(), 'yyyy-MM-dd') AS start_dt
FROM USER a
WHERe NOT EXISTS
(SELECt 1
FROM user_his b
WHERe end_dt = '9999-12-31'
AND a.id = b.id);
大数据视频推荐:
CSDN
大数据语音推荐:
企业级大数据技术应用
大数据机器学习案例之推荐系统
自然语言处理
大数据基础
人工智能:深度学习入门到精通



