注: 案例素材来自于中国工信出版社出版书籍 hive实战. 译者- 唐富年
素材下载,免费
素材下载,免费
素材下载,免费
目录书籍中的素材及操作比较繁琐,建议学习后使用自己的方式实现,对你是一个锻炼!
- 准备数据
- 加载原数据- ODS层
- 建库
- 创建表并装载rawfirstname.csv文件
- 创建rawlastname并装载
- 创建rawperson并装载
- 装载rawdatetime
- 创建retrievedb.rawaddress表并装载数据
- 创建retrievedb.rawaddresshistory
- 创建retrievedb.rawaccount
- 存取数据 - DWD层
- 清洗firstname表
- 创建临时表assessdb.firstname001 -- 去掉第一行的字段列名
- 创建临时表assessdb.firstname002,去掉firstname001表中字符多余的空格
- 创建临时表assessdb.firstname003,转换数据类型
- 创建firstname表
- 清洗lastname数据
- 清洗Person表 -- 一步到位
- 清洗datetime表 -- 字段多多多!!!
- 清洗postaddress表
- 清洗addresshistory表
- 组合查询
- 清洗account表
- 组合表
- 组合personfull表 [Person + firstname + lastname + sex]
- 过程数据处理 - DWT 层
- 创建数据库processdb
- 创建processdb.personhub表
- 创建表processdb.personsexsatellite
- 创建processdb.objectbankaccountsatellite
- 创建processdb.locationhub
- 创建processdb.locationgeospacesatellite系列表
- 与事件相关的数据表
- 与时间相关的表结构
- 链接表
- 转换数据库 - DWS层
- 创建dimaccount表
- 创建fctpersonaccount表
- 创建dimaddress表
- 组织数据库
- 报表数据库 - ADS 层
1, 删除不需要的标记,如标题 2, 删除数据记录中不需要的空格 3, 将string类型转换成需要的类型准备数据
上传00rawdata的文件到hdfs hadoop fs -mkdir /rawdata hdfs dfs -put 00rawdata /rawdata/加载原数据- ODS层 建库
-- 创建检索数据库,加载原始数据 create database if not exists retrievedb comment "load data" location '/hive_data/retrievedb.db'; -- beeline客户端连接 beeline -u jdbc:hive2://node02:10000/retrievedb -n god -p 1234创建表并装载rawfirstname.csv文件
use retrievedb; create table if not exists retrievedb.rawfirstname ( firstnameid string, firstname string, sex string ) row format delimited fields terminated by ','; -- 加载数据 LOAD DATA INPATH 'hdfs://mycluster/rawdata/00rawdata/rawfirstname.csv' OVERWRITE INTO TABLE retrievedb.rawfirstname;创建rawlastname并装载
CREATE TABLE IF NOT EXISTS retrievedb.rawlastname ( lastnameid string, lastname string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; LOAD DATA INPATH '/rawdata/00rawdata/rawlastname.csv' OVERWRITE INTO TABLE retrievedb.rawlastname;创建rawperson并装载
CREATE TABLE IF NOT EXISTS retrievedb.rawperson ( persid string, firstnameid string, lastnameid string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; LOAD DATA INPATH '/rawdata/00rawdata/rawperson.csv' OVERWRITE INTO TABLE retrievedb.rawperson;装载rawdatetime
CREATE TABLE IF NOT EXISTS retrievedb.rawdatetime ( id string, datetimes string, monthname string, yearnumber string, monthnumber string, daynumber string, hournumber string, minutenumber string, ampm string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; LOAD DATA INPATH '/rawdata/00rawdata/rawdatetime.csv' OVERWRITE INTO TABLE retrievedb.rawdatetime;创建retrievedb.rawaddress表并装载数据
CREATE TABLE IF NOT EXISTS retrievedb.rawaddress ( id string, Postcode string, Latitude string, Longitude string, Easting string, Northing string, GridRef string, District string, Ward string, DistrictCode string, WardCode string, Country string, CountyCode string, Constituency string, TypeArea string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; LOAD DATA INPATH '/rawdata/00rawdata/rawaddress.csv' OVERWRITE INTO TABLE retrievedb.rawaddress;创建retrievedb.rawaddresshistory
CREATE TABLE IF NOT EXISTS retrievedb.rawaddresshistory ( id string, pid string, aid string, did1 string, did2 string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; LOAD DATA INPATH '/rawdata/00rawdata/rawaddresshistory.csv' OVERWRITE INTO TABLE retrievedb.rawaddresshistory;创建retrievedb.rawaccount
CREATE TABLE IF NOT EXISTS retrievedb.rawaccount ( id string, pid string, accountno string, balance string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; LOAD DATA INPATH '/rawdata/00rawdata/rawaccount.csv' OVERWRITE INTO TABLE retrievedb.rawaccount;存取数据 - DWD层
-- 创建数据库 CREATE DATAbase IF NOT EXISTS assessdb comment "access data" location '/hive_data/assessdb.db'; USE assessdb;清洗firstname表 创建临时表assessdb.firstname001 – 去掉第一行的字段列名
CREATE TABLE IF NOT EXISTS assessdb.firstname001 (
firstnameid string,
firstname string,
sex string
)
CLUSTERED BY (firstnameid) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
TRUNCATE TABLE assessdb.firstname001;
INSERT INTO TABLE assessdb.firstname001
SELECt firstnameid, firstname, sex
FROM retrievedb.rawfirstname
WHERe firstnameid <> '"id"'; -- 去掉第一行的字段列名
创建临时表assessdb.firstname002,去掉firstname001表中字符多余的空格
CREATE TABLE IF NOT EXISTS assessdb.firstname002 (
firstnameid string,
firstname string,
sex string
)
CLUSTERED BY (firstnameid) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
TRUNCATE TABLE assessdb.firstname002;
INSERT INTO TABLE assessdb.firstname002
SELECt firstnameid, rtrim(ltrim(firstname)), rtrim(ltrim(sex))
FROM assessdb.firstname001;
创建临时表assessdb.firstname003,转换数据类型
CREATE TABLE IF NOT EXISTS assessdb.firstname003 (
firstnameid int,
firstname string,
sex string
)
CLUSTERED BY (firstnameid) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
TRUNCATE TABLE assessdb.firstname003;
INSERT INTO TABLE assessdb.firstname003
SELECt CAST(firstnameid as INT), SUBSTRING(firstname,2,LENGTH(firstname)-2), SUBSTRING(sex,2,LENGTH(sex)-2)
FROM assessdb.firstname002;
创建firstname表
CREATE TABLE IF NOT EXISTS assessdb.firstname (
firstnameid int,
firstname string,
sex string
)
CLUSTERED BY (firstnameid) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
TRUNCATE TABLE assessdb.firstname;
INSERT INTO TABLE assessdb.firstname
SELECt firstnameid, firstname, sex
FROM assessdb.firstname003
ORDER BY firstnameid; -- 尽量不要用order by 全表排序
TRUNCATE TABLE assessdb.firstname;
INSERT INTO TABLE assessdb.firstname
SELECt firstnameid, firstname, sex
FROM assessdb.firstname003
SORT BY firstnameid;
SELECt firstnameid, firstname, sex from assessdb.firstname SORT BY firstname LIMIT 10;
清洗lastname数据
-- 1. 去掉第一行列名
-- 2. 字符串格式化-去掉 "及空格
-- 3. 数据格式转换
CREATE TABLE IF NOT EXISTS assessdb.lastname (
lastnameid int,
lastname string
)
CLUSTERED BY (lastnameid) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
INSERT OVERWRITE TABLE assessdb.lastname
SELECt
cast(lastnameid as int),
substr(trim(lastname),2,length(trim(lastname))-2)
FROM retrievedb.rawlastname
WHERe lastnameid <> '"id"'
SORT BY lastnameid;
-- 测试
select * from assessdb.lastname limit 30;
清洗Person表 – 一步到位
# 清洗Person表
1. 去掉第一行
2. 去掉空格
3. 转类型
drop table if exists assessdb.person;
CREATE TABLE IF NOT EXISTS assessdb.person (
persid int,
firstnameid int,
lastnameid int
)
CLUSTERED BY (lastnameid) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
-- truncate table assessdb.person;
from retrievedb.rawperson
insert into table assessdb.person
select cast(trim(persid) as int),cast(trim(firstnameid) as int),cast(trim(lastnameid) as int)
where rawperson.persid <> '"id"';
清洗datetime表 – 字段多多多!!!
-- 查看表结构
desc retrievedb.rawdatetime;
CREATE TABLE IF NOT EXISTS assessdb.dates (
id int,
datetimes string,
monthname string,
yearnumber int,
monthnumber int,
daynumber int,
hournumber int,
minutenumber int,
ampm string
)
CLUSTERED BY (id) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
-- 1. 去掉第一行
-- 2. 去空格
-- 3. 转类型
insert overwrite table assessdb.dates
select cast(r.id as int),
substr(trim(r.datetimes),2,length(trim(r.datetimes))-2),
substr(trim(r.monthname),2,length(trim(r.monthname))-2),
cast(r.yearnumber as int),
cast(r.monthnumber as int),
cast(r.daynumber as int),
cast(r.hournumber as int),
cast(r.minutenumber as int),
substr(trim(r.ampm),2,length(trim(r.ampm))-2)
from retrievedb.rawdatetime r
where r.id <> '"id"'
sort by id;
-- 查看数据
select * from assessdb.dates limit 10;
清洗postaddress表
desc retrievedb.rawaddress;
drop table if exists assessdb.postaddress;
CREATE TABLE IF NOT EXISTS assessdb.postaddress (
id int,
postcode string,
latitude decimal(18,9),
longitude decimal(18,9),
easting int,
northing int,
gridRef string,
District string,
ward string,
districtCode string,
wardCode string,
country string,
countyCode string,
constituency string,
typeArea string
)
CLUSTERED BY (id) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
insert overwrite table assessdb.postaddress
select
cast(r.id as int),
substr(trim(r.postcode),2,length(trim(r.postcode))-2),
cast(trim(r.latitude) as decimal(18,9)),
cast(trim(r.longitude) as decimal(18,9)),
cast(r.easting as int),
cast(r.northing as int),
substr(trim(r.gridRef),2,length(trim(r.gridRef))-2),
substr(trim(r.District),2,length(trim(r.District))-2),
substr(trim(r.ward),2,length(trim(r.ward))-2),
substr(trim(r.districtCode),2,length(trim(r.districtCode))-2),
substr(trim(r.wardCode),2,length(trim(r.wardCode))-2),
substr(trim(r.country),2,length(trim(r.country))-2),
substr(trim(r.countyCode),2,length(trim(r.countyCode))-2),
substr(trim(r.constituency),2,length(trim(r.constituency))-2),
substr(trim(r.typeArea),2,length(trim(r.typeArea))-2)
from retrievedb.rawaddress r
where id <> '"id"'
sort by id;
select * from assessdb.postaddress limit 10;
清洗addresshistory表
-- 查看源表结构
desc retrievedb.rawaddresshistory;
drop table if exists assessdb.addresshistory;
-- 创建完整表
CREATE TABLE IF NOT EXISTS assessdb.addresshistory (
id int,
pid int,
aid int,
did1 int,
did2 int
)
CLUSTERED BY (id) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
insert into table assessdb.addresshistory
select
cast(r.id as int),
cast(r.pid as int),
cast(r.aid as int),
cast(r.did1 as int),
cast(r.did2 as int)
from retrievedb.rawaddresshistory r
where r.id <> '"id"';
组合查询
SELECt
addresshistory.id,
addresshistory.pid,
personfull.firstname,
personfull.lastname,
addresshistory.aid,
postaddress.postcode,
addresshistory.did1,
dates1.datetimes as startdate,
addresshistory.did2,
dates2.datetimes as enddate
FROM
assessdb.addresshistory
JOIN
assessdb.personfull
ON
addresshistory.pid = personfull.persid
JOIN
assessdb.postaddress
ON
addresshistory.aid = postaddress.id
JOIN
assessdb.dates as dates1
ON
addresshistory.did1 = dates1.id
JOIN
assessdb.dates as dates2
ON
addresshistory.did2 = dates2.id
LIMIT 10;
清洗account表
desc retrievedb.rawaccount;
drop table if exists assessdb.account;
CREATE TABLE IF NOT EXISTS assessdb.account (
id int,
pid int,
accountno string,
balance decimal(18,9)
)
CLUSTERED BY (id) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
insert into table assessdb.account
select
cast(id as int),
cast(pid as int),
concat('AC',accountno),
cast(balance as decimal(18,9))
from retrievedb.rawaccount
where id <> 'id';
组合表
组合personfull表 [Person + firstname + lastname + sex]
drop table if exists assessdb.personfull;
CREATE TABLE IF NOT EXISTS assessdb.personfull (
persid int,
firstnameid int,
firstname string,
lastnameid int,
lastname string,
sex string
)
CLUSTERED BY (lastnameid) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
insert into table assessdb.personfull
select p.persid,fn.firstnameid,fn.firstname,ln.lastnameid,ln.lastname,fn.sex
from assessdb.person p
join
firstname fn
on
p.firstnameid = fn.firstnameid
join
lastname ln
on
p.lastnameid = ln.lastnameid;
过程数据处理 - DWT 层
创建数据库processdb
CREATE DATAbase IF NOT EXISTS processdb comment "process data in here" location '/hive_data/processdb.db';创建processdb.personhub表
create table if not exists processdb.personhub (
id int,
keyid string,
firstname string,
lastname string
)
CLUSTERED BY (id) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
INSERT INTO TABLE processdb.personhub
SELECt
ROW_NUMBER() OVER (ORDER BY firstname, lastname),
CONCAT(unix_timestamp(), '/', ROW_NUMBER() OVER (ORDER BY firstname, lastname)),
firstname,
lastname
FROM
assessdb.personfull;
创建表processdb.personsexsatellite
CREATE TABLE IF NOT EXISTS processdb.personsexsatellite (
id INT,
keyid STRING,
sex STRING,
timestmp BIGINT
)
CLUSTERED BY (id) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
INSERT Overwrite TABLE processdb.personsexsatellite
SELECt
ROW_NUMBER() OVER (ORDER BY p1.keyid),
p1.keyid,
p2.sex,
unix_timestamp()
FROM
processdb.personhub p1
join
assessdb.personfull p2
on
p1.firstname = p2.firstname
and
p1.lastname = p2.lastname;
创建processdb.objectbankaccountsatellite
CREATE TABLE IF NOT EXISTS processdb.objectbankaccountsatellite (
id int,
accountid int,
transactionid int,
balance DECIMAL(18, 9),
timestmp bigint
)
CLUSTERED BY (id) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
TRUNCATE TABLE processdb.objectbankaccountsatellite;
INSERT INTO TABLE processdb.objectbankaccountsatellite
SELECt
ROW_NUMBER() OVER (ORDER BY accountno,id),
substr(accountno,3) as accountid,
id as transactionid,
balance,
unix_timestamp()
FROM
assessdb.account
group by accountno,id,balance;
select * from processdb.objectbankaccountsatellite e limit 10;
创建processdb.locationhub
## 创建processdb.locationhub
CREATE TABLE IF NOT EXISTS processdb.locationhub (
id INT,
locationtype STRING,
locationname STRING,
locationid INT
)
CLUSTERED BY (id) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
TRUNCATE processdb.locationhub;
INSERT OVERWRITE TABLE processdb.locationhub
SELECt DISTINCT
ROW_NUMBER() OVER (ORDER BY id),
'intangible',
'geospace',
id as locationid
FROM
assessdb.postaddress;
创建processdb.locationgeospacesatellite系列表
CREATE TABLE IF NOT EXISTS processdb.locationgeospace1satellite
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true')
as select
ROW_NUMBER() OVER (ORDER BY id) as id,
id as locationid,
postcode,
unix_timestamp() as timestmp
FROM
assessdb.postaddress;
ALTER TABLE processdb.locationgeospace1satellite CLUSTERED BY (id,locationid) INTO 1 BUCKETS;
-- 包含id,locationid,latitude,longitude和timestmp
CREATE TABLE IF NOT EXISTS processdb.locationgeospace2satellite
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true')
as select
ROW_NUMBER() OVER (ORDER BY id) as id,
id as locationid,
latitude,
longitude,
unix_timestamp() as timestmp
FROM
assessdb.postaddress;
ALTER TABLE processdb.locationgeospace2satellite CLUSTERED BY (id,locationid) INTO 1 BUCKETS;
-- 包含id,locationid,easting,northing和timestmp
CREATE TABLE IF NOT EXISTS processdb.locationgeospace3satellite
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true')
as select
ROW_NUMBER() OVER (ORDER BY id) as id,
id as locationid,
easting,
northing,
unix_timestamp() as timestmp
FROM
assessdb.postaddress;
ALTER TABLE processdb.locationgeospace3satellite CLUSTERED BY (id,locationid) INTO 1 BUCKETS;
-- 复刻assessdb.postaddress
CREATE TABLE IF NOT EXISTS processdb.locationgeospace4satellite
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true')
as select
ROW_NUMBER() OVER (ORDER BY id) as id,
id as locationid,
postcode,
latitude,
longitude,
easting,
northing,
gridref,
district,
ward,
districtcode,
wardcode,
country,
countycode,
constituency,
typearea,
unix_timestamp() as timestmp
from
assessdb.postaddress;
ALTER TABLE processdb.locationgeospace4satellite CLUSTERED BY (id,locationid) INTO 1 BUCKETS;
与事件相关的数据表
CREATE TABLE IF NOT EXISTS processdb.eventhub
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true')
AS SELECt
DISTINCT ROW_NUMBER() OVER (ORDER BY id) as id,
'intangible' as eventtype,
'banktransaction' as eventname,
id as eventid
FROM
assessdb.account;
ALTER TABLE processdb.eventhub CLUSTERED BY (eventtype, eventname,id) INTO 1 BUCKETS;
-- eventbanktransactionsatellite
CREATE TABLE IF NOT EXISTS processdb.eventbanktransactionsatellite
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true')
AS SELECt
ROW_NUMBER() OVER (ORDER BY accountno,id) as id,
accountno,
id as transactionid,
balance,
unix_timestamp() as timestmp
FROM
assessdb.account;
ALTER TABLE processdb.eventbanktransactionsatellite CLUSTERED BY (id) INTO 1 BUCKETS;
与时间相关的表结构
CREATE TABLE IF NOT EXISTS processdb.timehub
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true')
AS SELECt
DISTINCT ROW_NUMBER() OVER (ORDER BY id) as id,
id as timeid
FROM
assessdb.dates;
ALTER TABLE processdb.timehub CLUSTERED BY (id) INTO 1 BUCKETS;
CREATE TABLE IF NOT EXISTS processdb.time1satellite
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true')
AS SELECt
DISTINCT ROW_NUMBER() OVER (ORDER BY id) as id,
id as timeid,
datetimes,
unix_timestamp() as timestmp
FROM
assessdb.dates
where yearnumber = 2015 or yearnumber=2016
ORDER BY id;
ALTER TABLE processdb.time1satellite CLUSTERED BY (id) INTO 1 BUCKETS;
链接表
+-------------------------+ | tab_name | +-------------------------+ | event_event_link | | event_time_link | | location_event_link | | location_location_link | | location_time_link | | object_event_link | | object_location_link | | object_object_link | | object_time_link | | person_event_link | | person_location_link | | person_object_link | | person_person_link | | person_time_link | | time_time_link | +-------------------------+ desc event_event_link; +-----------+------------+----------+ | col_name | data_type | comment | +-----------+------------+----------+ | id | int | | | eventid1 | int | | | eventid2 | int | | +-----------+------------+----------+ desc event_time_link; +-----------+------------+----------+ | col_name | data_type | comment | +-----------+------------+----------+ | id | int | | | eventid | int | | | timeid | int | | +-----------+------------+----------+ desc location_event_link; +-------------+------------+----------+ | col_name | data_type | comment | +-------------+------------+----------+ | id | int | | | locationid | int | | | eventid | int | | +-------------+------------+----------+转换数据库 - DWS层
CREATE DATAbase IF NOT EXISTS transformdb
comment "transform data in here"
location '/hive_data/transformdb.db';
CREATE TABLE IF NOT EXISTS transformdb.dimperson (
personkey BIGINT,
firstname STRING,
lastname STRING
)
CLUSTERED BY (firstname, lastname,personkey) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
CREATE TABLE IF NOT EXISTS transformdb.dimperson001 (
firstname STRING,
lastname STRING
)
CLUSTERED BY (firstname, lastname) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
INSERT INTO TABLE transformdb.dimperson001
SELECt DISTINCT
firstname,
lastname
FROM
processdb.personhub;
CREATE TABLE IF NOT EXISTS transformdb.dimperson002 (
personkey BIGINT,
firstname STRING,
lastname STRING
)
CLUSTERED BY (firstname, lastname,personkey) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
INSERT INTO TABLE transformdb.dimperson002
SELECt
ROW_NUMBER() OVER (ORDER BY firstname, lastname),
firstname,
lastname
FROM
transformdb.dimperson001;
INSERT INTO TABLE transformdb.dimperson
SELECt
personkey,
firstname,
lastname
FROM
transformdb.dimperson002
ORDER BY firstname, lastname, personkey;
INSERT INTO TABLE transformdb.dimperson
VALUES
(999997,'Ruff','Hond'),
(999998,'Robbie','Rot'),
(999999,'Helen','Kat');
DROp TABLE transformdb.dimperson001;
DROP TABLE transformdb.dimperson002;
创建dimaccount表
CREATE TABLE IF NOT EXISTS transformdb.dimaccount (
accountkey BIGINT,
accountnumber INT
)
CLUSTERED BY (accountnumber,accountkey) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
CREATE TABLE IF NOT EXISTS transformdb.dimaccount001 (
accountnumber INT
)
CLUSTERED BY (accountnumber) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
INSERT INTO TABLE transformdb.dimaccount001
SELECT DISTINCT
objectid
FROM
processdb.objecthub
WHERe objecttype = 'intangible'
AND objectname = 'bankaccount';
CREATE TABLE IF NOT EXISTS transformdb.dimaccount002 (
accountkey BIGINT,
accountnumber INT
)
CLUSTERED BY (accountnumber,accountkey) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
INSERT INTO TABLE transformdb.dimaccount002
SELECt DISTINCT
ROW_NUMBER() OVER (ORDER BY accountnumber DESC),
accountnumber
FROM
transformdb.dimaccount001;
INSERT INTO TABLE transformdb.dimaccount
SELECt DISTINCT
accountkey,
accountnumber
FROM
transformdb.dimaccount002
ORDER BY accountnumber;
INSERT INTO TABLE transformdb.dimaccount
VALUES
(88888887,208887),
(88888888,208888),
(88888889,208889);
DROp TABLE transformdb.dimaccount001;
DROP TABLE transformdb.dimaccount002;
创建fctpersonaccount表
CREATE TABLE IF NOT EXISTS transformdb.fctpersonaccount (
personaccountkey BIGINT,
personkey BIGINT,
accountkey BIGINT,
balance DECIMAL(18, 9)
)
CLUSTERED BY (personkey,accountkey) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
CREATE TABLE IF NOT EXISTS transformdb.fctpersonaccount001 (
personkey BIGINT,
accountkey BIGINT,
balance DECIMAL(18, 9)
)
CLUSTERED BY (personkey,accountkey) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
INSERT INTO TABLE transformdb.fctpersonaccount001
VALUES
(999997,88888887,10.60),
(999997,88888887,400.70),
(999997,88888887,-210.90),
(999998,88888888,1000.00),
(999998,88888888,1990.60),
(999998,88888888,900.70),
(999999,88888889,160.60),
(999999,88888889,180.70),
(999999,88888889,100.60),
(999999,88888889,120.90),
(999999,88888889,180.69),
(999999,88888889,130.30);
CREATE TABLE IF NOT EXISTS transformdb.fctpersonaccount002 (
personkey BIGINT,
accountkey BIGINT,
balance DECIMAL(18, 9)
)
CLUSTERED BY (personkey,accountkey) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
INSERT INTO TABLE transformdb.fctpersonaccount002
SELECT
CAST(personkey AS BIGINT),
CAST(accountkey AS BIGINT),
CAST(SUM(balance) AS DECIMAL(18, 9))
FROM transformdb.fctpersonaccount001
GROUP BY personkey, accountkey;
INSERT INTO TABLE transformdb.fctpersonaccount
SELECt
ROW_NUMBER() OVER (ORDER BY personkey, accountkey),
CAST(personkey AS BIGINT),
CAST(accountkey AS BIGINT),
CAST(balance AS DECIMAL(18, 9))
FROM transformdb.fctpersonaccount002;
DROp TABLE transformdb.fctpersonaccount001;
DROP TABLE transformdb.fctpersonaccount002;
创建dimaddress表
CREATE TABLE IF NOT EXISTS transformdb.dimaddress(
addresskey BIGINT,
postcode STRING
)
CLUSTERED BY (addresskey) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
INSERT INTO TABLE transformdb.dimaddress
VALUES
(1,'KA12 8RR'),
(2,'FK8 1EJ'),
(3,'EH1 2NG');
CREATE TABLE IF NOT EXISTS transformdb.dimdatetime(
datetimekey BIGINT,
datetimestr STRING
)
CLUSTERED BY (datetimekey) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
INSERT INTO TABLE transformdb.dimdatetime
VALUES
(1,'2015/08/23 16h00'),
(2,'2015/10/03 17h00'),
(3,'2015/11/12 06h00');
CREATE TABLE IF NOT EXISTS transformdb.fctpersonaddressdate(
personaddressdatekey BIGINT,
personkey BIGINT,
addresskey BIGINT,
datetimekey BIGINT
)
CLUSTERED BY (datetimekey) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
INSERT INTO TABLE transformdb.fctpersonaddressdate
VALUES
(1,999997,1,1),
(2,999998,2,2),
(3,999999,3,3);
组织数据库
CREATE DATAbase IF NOT EXISTS organisedb;
USE organisedb;
CREATE TABLE IF NOT EXISTS organisedb.dimperson LIKE transformdb.dimperson;
INSERT INTO TABLE organisedb.dimperson
SELECT
personkey,
firstname,
lastname
FROM
transformdb.dimperson
ORDER BY firstname, lastname, personkey;
CREATE TABLE IF NOT EXISTS organisedb.dimaccount LIKE transformdb.dimaccount;
INSERT INTO TABLE organisedb.dimaccount
SELECt DISTINCT
accountkey,
accountnumber
FROM
transformdb.dimaccount
ORDER BY accountnumber;
CREATE TABLE IF NOT EXISTS organisedb.fctpersonaccount LIKE transformdb.fctpersonaccount;
INSERT INTO TABLE organisedb.fctpersonaccount
SELECt DISTINCT
personaccountkey,
personkey,
accountkey,
balance
FROM
transformdb.fctpersonaccount
WHERe
personaccountkey = 1
ORDER BY personaccountkey,personkey,accountkey;
CREATE TABLE IF NOT EXISTS organisedb.dimaddress(
addresskey BIGINT,
postcode STRING
)
CLUSTERED BY (addresskey) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
INSERT INTO TABLE organisedb.dimaddress
SELECt DISTINCT
addresskey,
postcode
FROM
transformdb.dimaddress
ORDER BY addresskey;
CREATE TABLE IF NOT EXISTS organisedb.dimaddress(
addresskey BIGINT,
postcode STRING
)
CLUSTERED BY (addresskey) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
INSERT INTO TABLE organisedb.dimaddress
SELECt DISTINCT
addresskey,
postcode
FROM
transformdb.dimaddress
ORDER BY addresskey;
CREATE TABLE IF NOT EXISTS organisedb.fctpersonaddressdate(
personaddressdatekey BIGINT,
personkey BIGINT,
addresskey BIGINT,
datetimekey BIGINT
)
CLUSTERED BY (datetimekey) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
INSERT INTO TABLE organisedb.fctpersonaddressdate
SELECt
personaddressdatekey,
personkey,
addresskey,
datetimekey
FROM
transformdb.fctpersonaddressdate
WHERe personaddressdatekey = 1
ORDER BY
personaddressdatekey,
personkey,
addresskey,
datetimekey;
报表数据库 - ADS 层
CREATE DATAbase IF NOT EXISTS reportdb;
USE reportdb;
CREATE TABLE IF NOT EXISTS reportdb.report001(
firstname STRING,
lastname STRING,
accountnumber INT,
balance DECIMAL(18, 9)
)
CLUSTERED BY (firstname, lastname) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');
INSERT INTO TABLE reportdb.report001
SELECt
dimperson.firstname,
dimperson.lastname,
dimaccount.accountnumber,
fctpersonaccount.balance
FROM
organisedb.fctpersonaccount
JOIN
organisedb.dimperson
ON
fctpersonaccount.personkey = dimperson.personkey
JOIN
organisedb.dimaccount
ON
fctpersonaccount.accountkey = dimaccount.accountkey;
SELECt * FROM reportdb.report001;



