栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hive案例-来自hive实战

Hive案例-来自hive实战

注: 案例素材来自于中国工信出版社出版书籍 hive实战. 译者- 唐富年
素材下载,免费
素材下载,免费
素材下载,免费

书籍中的素材及操作比较繁琐,建议学习后使用自己的方式实现,对你是一个锻炼!

目录
  • 准备数据
  • 加载原数据- ODS层
    • 建库
    • 创建表并装载rawfirstname.csv文件
    • 创建rawlastname并装载
    • 创建rawperson并装载
    • 装载rawdatetime
    • 创建retrievedb.rawaddress表并装载数据
    • 创建retrievedb.rawaddresshistory
    • 创建retrievedb.rawaccount
  • 存取数据 - DWD层
    • 清洗firstname表
      • 创建临时表assessdb.firstname001 -- 去掉第一行的字段列名
      • 创建临时表assessdb.firstname002,去掉firstname001表中字符多余的空格
      • 创建临时表assessdb.firstname003,转换数据类型
      • 创建firstname表
    • 清洗lastname数据
    • 清洗Person表 -- 一步到位
    • 清洗datetime表 -- 字段多多多!!!
    • 清洗postaddress表
    • 清洗addresshistory表
    • 组合查询
    • 清洗account表
    • 组合表
      • 组合personfull表 [Person + firstname + lastname + sex]
  • 过程数据处理 - DWT 层
    • 创建数据库processdb
    • 创建processdb.personhub表
    • 创建表processdb.personsexsatellite
    • 创建processdb.objectbankaccountsatellite
    • 创建processdb.locationhub
    • 创建processdb.locationgeospacesatellite系列表
    • 与事件相关的数据表
    • 与时间相关的表结构
    • 链接表
  • 转换数据库 - DWS层
    • 创建dimaccount表
    • 创建fctpersonaccount表
    • 创建dimaddress表
  • 组织数据库
  • 报表数据库 - ADS 层

	1, 删除不需要的标记,如标题
	2, 删除数据记录中不需要的空格
	3, 将string类型转换成需要的类型
准备数据
上传00rawdata的文件到hdfs
hadoop fs -mkdir /rawdata
hdfs dfs -put 00rawdata /rawdata/
加载原数据- ODS层 建库
-- 创建检索数据库,加载原始数据
create database if not exists retrievedb
comment "load data"
location '/hive_data/retrievedb.db';

-- beeline客户端连接
beeline -u jdbc:hive2://node02:10000/retrievedb -n god -p 1234
创建表并装载rawfirstname.csv文件
use retrievedb;
create table if not exists retrievedb.rawfirstname (
firstnameid string,
firstname string,
sex string
)
row format delimited fields terminated by ',';

-- 加载数据
LOAD DATA INPATH 'hdfs://mycluster/rawdata/00rawdata/rawfirstname.csv' OVERWRITE INTO TABLE retrievedb.rawfirstname;
创建rawlastname并装载
CREATE TABLE IF NOT EXISTS retrievedb.rawlastname (
  lastnameid    string,
  lastname      string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

LOAD DATA  INPATH '/rawdata/00rawdata/rawlastname.csv' OVERWRITE INTO TABLE retrievedb.rawlastname;
创建rawperson并装载
CREATE TABLE IF NOT EXISTS retrievedb.rawperson (
  persid         string,
  firstnameid    string,
  lastnameid     string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

LOAD DATA  INPATH '/rawdata/00rawdata/rawperson.csv' OVERWRITE INTO TABLE retrievedb.rawperson;
装载rawdatetime
CREATE TABLE IF NOT EXISTS retrievedb.rawdatetime (
  id            string,
  datetimes     string,
  monthname     string,
  yearnumber    string,
  monthnumber   string,
  daynumber     string,
  hournumber    string,
  minutenumber  string,
  ampm          string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

LOAD DATA INPATH '/rawdata/00rawdata/rawdatetime.csv' OVERWRITE INTO TABLE retrievedb.rawdatetime;
创建retrievedb.rawaddress表并装载数据
CREATE TABLE IF NOT EXISTS retrievedb.rawaddress (
  id            string,
  Postcode      string,
  Latitude      string,
  Longitude     string,
  Easting       string,
  Northing      string,
  GridRef       string,
  District      string,
  Ward          string,
  DistrictCode  string,
  WardCode      string,
  Country       string,
  CountyCode    string,
  Constituency  string,
  TypeArea      string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

LOAD DATA  INPATH '/rawdata/00rawdata/rawaddress.csv' OVERWRITE INTO TABLE retrievedb.rawaddress;
创建retrievedb.rawaddresshistory
CREATE TABLE IF NOT EXISTS retrievedb.rawaddresshistory (
  id            string,
  pid           string,
  aid           string,
  did1          string,
  did2          string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

LOAD DATA INPATH '/rawdata/00rawdata/rawaddresshistory.csv' OVERWRITE INTO TABLE retrievedb.rawaddresshistory;
创建retrievedb.rawaccount
CREATE TABLE IF NOT EXISTS retrievedb.rawaccount (
  id         string,
  pid        string,
  accountno  string,
  balance    string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

LOAD DATA INPATH '/rawdata/00rawdata/rawaccount.csv' OVERWRITE INTO TABLE retrievedb.rawaccount;
存取数据 - DWD层
-- 创建数据库
CREATE DATAbase IF NOT EXISTS assessdb
comment "access data"
location '/hive_data/assessdb.db';
USE assessdb;
清洗firstname表 创建临时表assessdb.firstname001 – 去掉第一行的字段列名
CREATE TABLE IF NOT EXISTS assessdb.firstname001 (
  firstnameid    string,
  firstname      string,
  sex            string
)
CLUSTERED BY (firstnameid) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

TRUNCATE TABLE assessdb.firstname001;

INSERT INTO TABLE assessdb.firstname001
SELECt firstnameid, firstname, sex
FROM retrievedb.rawfirstname
WHERe firstnameid <> '"id"'; -- 去掉第一行的字段列名
创建临时表assessdb.firstname002,去掉firstname001表中字符多余的空格
CREATE TABLE IF NOT EXISTS assessdb.firstname002 (
  firstnameid    string,
  firstname      string,
  sex            string
)
CLUSTERED BY (firstnameid) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

TRUNCATE TABLE assessdb.firstname002;

INSERT INTO TABLE assessdb.firstname002
SELECt firstnameid, rtrim(ltrim(firstname)), rtrim(ltrim(sex))
FROM assessdb.firstname001;
创建临时表assessdb.firstname003,转换数据类型
CREATE TABLE IF NOT EXISTS assessdb.firstname003 (
  firstnameid    int,
  firstname      string,
  sex            string
)
CLUSTERED BY (firstnameid) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

TRUNCATE TABLE assessdb.firstname003;

INSERT INTO TABLE assessdb.firstname003
SELECt CAST(firstnameid as INT), SUBSTRING(firstname,2,LENGTH(firstname)-2), SUBSTRING(sex,2,LENGTH(sex)-2)
FROM assessdb.firstname002;
创建firstname表
CREATE TABLE IF NOT EXISTS assessdb.firstname (
  firstnameid    int,
  firstname      string,
  sex            string
)
CLUSTERED BY (firstnameid) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

TRUNCATE TABLE assessdb.firstname;

INSERT INTO TABLE assessdb.firstname
SELECt firstnameid, firstname, sex
FROM assessdb.firstname003
ORDER BY firstnameid; -- 尽量不要用order by 全表排序

TRUNCATE TABLE assessdb.firstname;

INSERT INTO TABLE assessdb.firstname
SELECt firstnameid, firstname, sex
FROM assessdb.firstname003
SORT BY firstnameid;

SELECt firstnameid, firstname, sex from assessdb.firstname SORT BY firstname LIMIT 10;
清洗lastname数据
-- 1. 去掉第一行列名
-- 2. 字符串格式化-去掉 "及空格
-- 3. 数据格式转换

CREATE TABLE IF NOT EXISTS assessdb.lastname (
  lastnameid    int,
  lastname      string
)
CLUSTERED BY (lastnameid) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

INSERT OVERWRITE TABLE assessdb.lastname
SELECt
    cast(lastnameid as int),
    substr(trim(lastname),2,length(trim(lastname))-2)
FROM retrievedb.rawlastname
WHERe lastnameid <> '"id"'
SORT BY lastnameid;

--  测试
select * from assessdb.lastname limit 30;
清洗Person表 – 一步到位
# 清洗Person表
1. 去掉第一行
2. 去掉空格
3. 转类型
drop table if exists assessdb.person;
CREATE TABLE IF NOT EXISTS assessdb.person (
  persid         int,
  firstnameid    int,
  lastnameid     int
)
CLUSTERED BY (lastnameid) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

-- truncate table assessdb.person;

from retrievedb.rawperson
insert into table assessdb.person
select cast(trim(persid) as int),cast(trim(firstnameid) as int),cast(trim(lastnameid) as int)
where rawperson.persid <> '"id"';
清洗datetime表 – 字段多多多!!!
-- 查看表结构
desc retrievedb.rawdatetime;

CREATE TABLE IF NOT EXISTS assessdb.dates (
  id            int,
  datetimes     string,
  monthname     string,
  yearnumber    int,
  monthnumber   int,
  daynumber     int,
  hournumber    int,
  minutenumber  int,
  ampm          string
)
CLUSTERED BY (id) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

-- 1. 去掉第一行
-- 2. 去空格
-- 3. 转类型
insert overwrite table assessdb.dates
select cast(r.id as int),
substr(trim(r.datetimes),2,length(trim(r.datetimes))-2),
substr(trim(r.monthname),2,length(trim(r.monthname))-2),
cast(r.yearnumber as int),
cast(r.monthnumber as int),
cast(r.daynumber as int),
cast(r.hournumber as int),
cast(r.minutenumber as int),
substr(trim(r.ampm),2,length(trim(r.ampm))-2)
from retrievedb.rawdatetime r
where r.id <> '"id"'
sort by id;

-- 查看数据
select * from assessdb.dates limit 10;
清洗postaddress表
desc retrievedb.rawaddress;
drop table if exists assessdb.postaddress;
CREATE TABLE IF NOT EXISTS assessdb.postaddress (
  id            int,
  postcode      string,
  latitude      decimal(18,9),
  longitude     decimal(18,9),
  easting       int,
  northing      int,
  gridRef       string,
  District      string,
  ward          string,
  districtCode  string,
  wardCode      string,
  country       string,
  countyCode    string,
  constituency  string,
  typeArea      string
)
CLUSTERED BY (id) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

insert overwrite table assessdb.postaddress
select
    cast(r.id as int),
    substr(trim(r.postcode),2,length(trim(r.postcode))-2),
    cast(trim(r.latitude) as decimal(18,9)),
    cast(trim(r.longitude) as decimal(18,9)),
    cast(r.easting as int),
    cast(r.northing as int),
    substr(trim(r.gridRef),2,length(trim(r.gridRef))-2),
    substr(trim(r.District),2,length(trim(r.District))-2),
    substr(trim(r.ward),2,length(trim(r.ward))-2),
    substr(trim(r.districtCode),2,length(trim(r.districtCode))-2),
    substr(trim(r.wardCode),2,length(trim(r.wardCode))-2),
    substr(trim(r.country),2,length(trim(r.country))-2),
    substr(trim(r.countyCode),2,length(trim(r.countyCode))-2),
    substr(trim(r.constituency),2,length(trim(r.constituency))-2),
    substr(trim(r.typeArea),2,length(trim(r.typeArea))-2)
from retrievedb.rawaddress r
where id <> '"id"'
sort by id;

select * from assessdb.postaddress limit 10;
清洗addresshistory表
-- 查看源表结构
desc retrievedb.rawaddresshistory;
drop table if exists assessdb.addresshistory;
-- 创建完整表
CREATE TABLE IF NOT EXISTS assessdb.addresshistory (
  id            int,
  pid           int,
  aid           int,
  did1          int,
  did2          int
)
CLUSTERED BY (id) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

insert into table assessdb.addresshistory
select
    cast(r.id as int),
    cast(r.pid as int),
    cast(r.aid as int),
    cast(r.did1 as int),
    cast(r.did2 as int)
from  retrievedb.rawaddresshistory r
where r.id <> '"id"';
组合查询
SELECt
      addresshistory.id,
      addresshistory.pid,
      personfull.firstname,
      personfull.lastname,
      addresshistory.aid,
      postaddress.postcode,
      addresshistory.did1,
      dates1.datetimes as startdate,
      addresshistory.did2,
      dates2.datetimes as enddate
FROM
  assessdb.addresshistory
JOIN
  assessdb.personfull
ON
  addresshistory.pid = personfull.persid
JOIN
  assessdb.postaddress
ON
  addresshistory.aid = postaddress.id
JOIN
  assessdb.dates as dates1
ON
  addresshistory.did1 = dates1.id
JOIN
  assessdb.dates as dates2
ON
  addresshistory.did2 = dates2.id
LIMIT 10;
清洗account表
desc retrievedb.rawaccount;

drop table if exists assessdb.account;
CREATE TABLE IF NOT EXISTS assessdb.account (
  id         int,
  pid        int,
  accountno  string,
  balance    decimal(18,9)
)
CLUSTERED BY (id) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

insert into table assessdb.account
select
  cast(id as int),
  cast(pid as int),
  concat('AC',accountno),
  cast(balance as decimal(18,9))
from retrievedb.rawaccount
where id <> 'id';
组合表 组合personfull表 [Person + firstname + lastname + sex]
drop table if exists assessdb.personfull;
CREATE TABLE IF NOT EXISTS assessdb.personfull (
  persid         int,
  firstnameid    int,
  firstname	     string,
  lastnameid     int,
  lastname       string,
  sex            string
)
CLUSTERED BY (lastnameid) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

insert into table assessdb.personfull
select p.persid,fn.firstnameid,fn.firstname,ln.lastnameid,ln.lastname,fn.sex
from assessdb.person p
join
  firstname fn
on
  p.firstnameid = fn.firstnameid
join
  lastname ln
on
  p.lastnameid = ln.lastnameid;
过程数据处理 - DWT 层 创建数据库processdb
CREATE DATAbase IF NOT EXISTS processdb
comment "process data in here"
location '/hive_data/processdb.db';
创建processdb.personhub表
create table if not exists processdb.personhub (
  id           int,
  keyid        string,
  firstname    string,
  lastname     string
)
CLUSTERED BY (id) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

INSERT INTO TABLE processdb.personhub
SELECt
  ROW_NUMBER() OVER (ORDER BY firstname, lastname),
  CONCAT(unix_timestamp(), '/', ROW_NUMBER() OVER (ORDER BY firstname, lastname)),
  firstname,
  lastname
FROM
  assessdb.personfull;
创建表processdb.personsexsatellite
CREATE TABLE IF NOT EXISTS processdb.personsexsatellite (
  id         INT,
  keyid      STRING,
  sex        STRING,
  timestmp   BIGINT
)
CLUSTERED BY (id) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

INSERT Overwrite TABLE processdb.personsexsatellite
SELECt
  ROW_NUMBER() OVER (ORDER BY p1.keyid),
  p1.keyid,
  p2.sex,
  unix_timestamp()
FROM
  processdb.personhub p1
join
  assessdb.personfull p2
on
  p1.firstname = p2.firstname
and
  p1.lastname = p2.lastname;
创建processdb.objectbankaccountsatellite
CREATE TABLE IF NOT EXISTS processdb.objectbankaccountsatellite (
  id                  int,
  accountid           int,
  transactionid       int,
  balance             DECIMAL(18, 9),
  timestmp            bigint
)
CLUSTERED BY (id) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

TRUNCATE TABLE processdb.objectbankaccountsatellite;
INSERT INTO TABLE processdb.objectbankaccountsatellite
SELECt
  ROW_NUMBER() OVER (ORDER BY accountno,id),
  substr(accountno,3) as accountid,
  id as transactionid,
  balance,
  unix_timestamp()
FROM
  assessdb.account
group by accountno,id,balance;

select * from processdb.objectbankaccountsatellite e limit 10;
创建processdb.locationhub
## 创建processdb.locationhub
CREATE TABLE IF NOT EXISTS processdb.locationhub (
  id            INT,
  locationtype  STRING,
  locationname  STRING,
  locationid    INT
)
CLUSTERED BY (id) INTO 1 BUCKETS
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

TRUNCATE processdb.locationhub;

INSERT OVERWRITE TABLE processdb.locationhub
SELECt DISTINCT
  ROW_NUMBER() OVER (ORDER BY id),
  'intangible',
  'geospace',
  id as locationid
FROM
  assessdb.postaddress;
创建processdb.locationgeospacesatellite系列表
CREATE TABLE IF NOT EXISTS processdb.locationgeospace1satellite
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true')
as select
  ROW_NUMBER() OVER (ORDER BY id) as id,
  id as locationid,
  postcode,
  unix_timestamp() as timestmp
FROM
  assessdb.postaddress;
ALTER TABLE processdb.locationgeospace1satellite CLUSTERED BY (id,locationid) INTO 1 BUCKETS;

-- 包含id,locationid,latitude,longitude和timestmp
CREATE TABLE IF NOT EXISTS processdb.locationgeospace2satellite
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true')
as select
  ROW_NUMBER() OVER (ORDER BY id) as id,
  id as locationid,
  latitude,
  longitude,
  unix_timestamp() as timestmp
FROM
  assessdb.postaddress;
ALTER TABLE processdb.locationgeospace2satellite CLUSTERED BY (id,locationid) INTO 1 BUCKETS;

-- 包含id,locationid,easting,northing和timestmp
CREATE TABLE IF NOT EXISTS processdb.locationgeospace3satellite
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true')
as select
  ROW_NUMBER() OVER (ORDER BY id) as id,
  id as locationid,
  easting,
  northing,
  unix_timestamp() as timestmp
FROM
  assessdb.postaddress;
ALTER TABLE processdb.locationgeospace3satellite CLUSTERED BY (id,locationid) INTO 1 BUCKETS;

-- 复刻assessdb.postaddress
CREATE TABLE IF NOT EXISTS processdb.locationgeospace4satellite
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true')
as select
  ROW_NUMBER() OVER (ORDER BY id) as id,
  id as locationid,
  postcode,
  latitude,
  longitude,
  easting,
  northing,
  gridref,
  district,
  ward,
  districtcode,
  wardcode,
  country,
  countycode,
  constituency,
  typearea,
  unix_timestamp() as timestmp
from
  assessdb.postaddress;
ALTER TABLE processdb.locationgeospace4satellite CLUSTERED BY (id,locationid) INTO 1 BUCKETS;

与事件相关的数据表
CREATE TABLE IF NOT EXISTS processdb.eventhub
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true')
AS SELECt
   DISTINCT ROW_NUMBER() OVER (ORDER BY id) as id,
  'intangible' as eventtype,
  'banktransaction' as eventname,
  id as eventid
FROM
   assessdb.account;
ALTER TABLE processdb.eventhub CLUSTERED BY (eventtype, eventname,id) INTO 1 BUCKETS;

-- eventbanktransactionsatellite
CREATE TABLE IF NOT EXISTS processdb.eventbanktransactionsatellite
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true')
AS SELECt
  ROW_NUMBER() OVER (ORDER BY accountno,id) as id,
  accountno,
  id as transactionid,
  balance,
  unix_timestamp() as timestmp
FROM
  assessdb.account;
ALTER TABLE processdb.eventbanktransactionsatellite CLUSTERED BY (id) INTO 1 BUCKETS;

与时间相关的表结构
CREATE TABLE IF NOT EXISTS processdb.timehub
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true')
AS SELECt
  DISTINCT ROW_NUMBER() OVER (ORDER BY id) as id,
  id as timeid
FROM
  assessdb.dates;
ALTER TABLE processdb.timehub CLUSTERED BY (id) INTO 1 BUCKETS;


CREATE TABLE IF NOT EXISTS processdb.time1satellite
STORED AS orc
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true')
AS SELECt
  DISTINCT ROW_NUMBER() OVER (ORDER BY id) as id,
  id as timeid,
  datetimes,
  unix_timestamp() as timestmp
FROM
  assessdb.dates
where yearnumber = 2015 or yearnumber=2016
ORDER BY id;
ALTER TABLE processdb.time1satellite CLUSTERED BY (id) INTO 1 BUCKETS;

链接表
+-------------------------+
|        tab_name         |
+-------------------------+
| event_event_link        |
| event_time_link         |
| location_event_link     |
| location_location_link  |
| location_time_link      |
| object_event_link       |
| object_location_link    |
| object_object_link      |
| object_time_link        |
| person_event_link       |
| person_location_link    |
| person_object_link      |
| person_person_link      |
| person_time_link        |
| time_time_link          |
+-------------------------+

desc event_event_link;
+-----------+------------+----------+
| col_name  | data_type  | comment  |
+-----------+------------+----------+
| id        | int        |          |
| eventid1  | int        |          |
| eventid2  | int        |          |
+-----------+------------+----------+

desc event_time_link;
+-----------+------------+----------+
| col_name  | data_type  | comment  |
+-----------+------------+----------+
| id        | int        |          |
| eventid   | int        |          |
| timeid    | int        |          |
+-----------+------------+----------+

desc location_event_link;
+-------------+------------+----------+
|  col_name   | data_type  | comment  |
+-------------+------------+----------+
| id          | int        |          |
| locationid  | int        |          |
| eventid     | int        |          |
+-------------+------------+----------+

转换数据库 - DWS层
CREATE DATAbase IF NOT EXISTS transformdb
comment "transform data in here"
location '/hive_data/transformdb.db';

CREATE TABLE IF NOT EXISTS transformdb.dimperson (
  personkey  BIGINT,
  firstname  STRING,
  lastname   STRING
)
CLUSTERED BY (firstname, lastname,personkey) INTO 1 BUCKETS
STORED AS orc 
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

CREATE TABLE IF NOT EXISTS transformdb.dimperson001 (
  firstname  STRING,
  lastname   STRING
)
CLUSTERED BY (firstname, lastname) INTO 1 BUCKETS
STORED AS orc 
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

INSERT INTO TABLE transformdb.dimperson001 
SELECt DISTINCT
  firstname, 
  lastname 
FROM 
  processdb.personhub;
 
 CREATE TABLE IF NOT EXISTS transformdb.dimperson002 (
  personkey  BIGINT,
  firstname  STRING,
  lastname   STRING
)
CLUSTERED BY (firstname, lastname,personkey) INTO 1 BUCKETS
STORED AS orc 
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

INSERT INTO TABLE transformdb.dimperson002
SELECt 
  ROW_NUMBER() OVER (ORDER BY firstname, lastname),
  firstname, 
  lastname 
FROM 
  transformdb.dimperson001; 
 
INSERT INTO TABLE transformdb.dimperson
SELECt 
  personkey,
  firstname, 
  lastname 
FROM 
  transformdb.dimperson002
ORDER BY firstname, lastname, personkey;

INSERT INTO TABLE transformdb.dimperson
VALUES
(999997,'Ruff','Hond'),
(999998,'Robbie','Rot'),
(999999,'Helen','Kat');

DROp TABLE transformdb.dimperson001;
DROP TABLE transformdb.dimperson002;

创建dimaccount表
CREATE TABLE IF NOT EXISTS transformdb.dimaccount (
  accountkey      BIGINT,
  accountnumber   INT
)
CLUSTERED BY (accountnumber,accountkey) INTO 1 BUCKETS
STORED AS orc 
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');


CREATE TABLE IF NOT EXISTS transformdb.dimaccount001 (
  accountnumber   INT
)
CLUSTERED BY (accountnumber) INTO 1 BUCKETS
STORED AS orc 
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

INSERT INTO TABLE transformdb.dimaccount001 
SELECT DISTINCT
  objectid 
FROM 
  processdb.objecthub
WHERe objecttype = 'intangible'
AND objectname = 'bankaccount';


CREATE TABLE IF NOT EXISTS transformdb.dimaccount002 (
  accountkey      BIGINT,
  accountnumber   INT
)
CLUSTERED BY (accountnumber,accountkey) INTO 1 BUCKETS
STORED AS orc 
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');


INSERT INTO TABLE transformdb.dimaccount002 
SELECt DISTINCT
  ROW_NUMBER() OVER (ORDER BY accountnumber DESC),
  accountnumber  
FROM 
  transformdb.dimaccount001;

INSERT INTO TABLE transformdb.dimaccount 
SELECt DISTINCT
  accountkey,
  accountnumber  
FROM 
  transformdb.dimaccount002
ORDER BY accountnumber; 


INSERT INTO TABLE transformdb.dimaccount 
VALUES
(88888887,208887),
(88888888,208888),
(88888889,208889);

DROp TABLE transformdb.dimaccount001;
DROP TABLE transformdb.dimaccount002;
创建fctpersonaccount表
CREATE TABLE IF NOT EXISTS transformdb.fctpersonaccount (
  personaccountkey     BIGINT,
  personkey            BIGINT,
  accountkey           BIGINT,
  balance             DECIMAL(18, 9)
)
CLUSTERED BY (personkey,accountkey) INTO 1 BUCKETS
STORED AS orc 
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');


CREATE TABLE IF NOT EXISTS transformdb.fctpersonaccount001 (
  personkey            BIGINT,
  accountkey           BIGINT,
  balance             DECIMAL(18, 9)
)
CLUSTERED BY (personkey,accountkey) INTO 1 BUCKETS
STORED AS orc 
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');


INSERT INTO TABLE transformdb.fctpersonaccount001
VALUES
(999997,88888887,10.60),
(999997,88888887,400.70),
(999997,88888887,-210.90),
(999998,88888888,1000.00),
(999998,88888888,1990.60),
(999998,88888888,900.70),
(999999,88888889,160.60),
(999999,88888889,180.70),
(999999,88888889,100.60),
(999999,88888889,120.90),
(999999,88888889,180.69),
(999999,88888889,130.30);

CREATE TABLE IF NOT EXISTS transformdb.fctpersonaccount002 (
  personkey      BIGINT,
  accountkey     BIGINT,
  balance        DECIMAL(18, 9)
)
CLUSTERED BY (personkey,accountkey) INTO 1 BUCKETS
STORED AS orc 
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

INSERT INTO TABLE transformdb.fctpersonaccount002
SELECT 
CAST(personkey AS BIGINT), 
CAST(accountkey AS BIGINT), 
CAST(SUM(balance) AS DECIMAL(18, 9))
FROM transformdb.fctpersonaccount001
GROUP BY personkey, accountkey;

INSERT INTO TABLE transformdb.fctpersonaccount
SELECt 
ROW_NUMBER() OVER (ORDER BY personkey, accountkey),
CAST(personkey AS BIGINT), 
CAST(accountkey AS BIGINT), 
CAST(balance AS DECIMAL(18, 9))
FROM transformdb.fctpersonaccount002;

DROp TABLE transformdb.fctpersonaccount001;
DROP TABLE transformdb.fctpersonaccount002; 
创建dimaddress表
CREATE TABLE IF NOT EXISTS transformdb.dimaddress(
  addresskey    BIGINT,
  postcode      STRING
)
CLUSTERED BY (addresskey) INTO 1 BUCKETS
STORED AS orc 
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

INSERT INTO TABLE transformdb.dimaddress
VALUES
(1,'KA12 8RR'),
(2,'FK8 1EJ'),
(3,'EH1 2NG');

CREATE TABLE IF NOT EXISTS transformdb.dimdatetime(
  datetimekey    BIGINT,
  datetimestr    STRING
)
CLUSTERED BY (datetimekey) INTO 1 BUCKETS
STORED AS orc 
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

INSERT INTO TABLE transformdb.dimdatetime
VALUES
(1,'2015/08/23 16h00'),
(2,'2015/10/03 17h00'),
(3,'2015/11/12 06h00');

CREATE TABLE IF NOT EXISTS transformdb.fctpersonaddressdate(
  personaddressdatekey      BIGINT,
  personkey                 BIGINT,
  addresskey                BIGINT,
  datetimekey               BIGINT
)
CLUSTERED BY (datetimekey) INTO 1 BUCKETS
STORED AS orc 
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

INSERT INTO TABLE transformdb.fctpersonaddressdate
VALUES
(1,999997,1,1),
(2,999998,2,2),
(3,999999,3,3);
组织数据库
CREATE DATAbase IF NOT EXISTS organisedb;
USE organisedb;

CREATE TABLE IF NOT EXISTS organisedb.dimperson LIKE transformdb.dimperson;

INSERT INTO TABLE organisedb.dimperson
SELECT 
  personkey,
  firstname, 
  lastname 
FROM 
  transformdb.dimperson
ORDER BY firstname, lastname, personkey; 

CREATE TABLE IF NOT EXISTS organisedb.dimaccount LIKE transformdb.dimaccount;

INSERT INTO TABLE organisedb.dimaccount 
SELECt DISTINCT
  accountkey,
  accountnumber  
FROM 
  transformdb.dimaccount
ORDER BY accountnumber; 

CREATE TABLE IF NOT EXISTS organisedb.fctpersonaccount LIKE transformdb.fctpersonaccount;

INSERT INTO TABLE organisedb.fctpersonaccount 
SELECt DISTINCT
  personaccountkey,
  personkey,
  accountkey,
  balance
FROM 
  transformdb.fctpersonaccount
WHERe
  personaccountkey = 1
ORDER BY personaccountkey,personkey,accountkey; 

CREATE TABLE IF NOT EXISTS organisedb.dimaddress(
  addresskey    BIGINT,
  postcode      STRING
)
CLUSTERED BY (addresskey) INTO 1 BUCKETS
STORED AS orc 
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

INSERT INTO TABLE organisedb.dimaddress 
SELECt DISTINCT
  addresskey,
  postcode  
FROM 
  transformdb.dimaddress
ORDER BY addresskey; 

CREATE TABLE IF NOT EXISTS organisedb.dimaddress(
  addresskey    BIGINT,
  postcode      STRING
)
CLUSTERED BY (addresskey) INTO 1 BUCKETS
STORED AS orc 
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

INSERT INTO TABLE organisedb.dimaddress 
SELECt DISTINCT
  addresskey,
  postcode  
FROM 
  transformdb.dimaddress
ORDER BY addresskey;

CREATE TABLE IF NOT EXISTS organisedb.fctpersonaddressdate(
  personaddressdatekey      BIGINT,
  personkey                 BIGINT,
  addresskey                BIGINT,
  datetimekey               BIGINT
)
CLUSTERED BY (datetimekey) INTO 1 BUCKETS
STORED AS orc 
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

INSERT INTO TABLE organisedb.fctpersonaddressdate 
SELECt
  personaddressdatekey,
  personkey,
  addresskey,
  datetimekey
FROM 
  transformdb.fctpersonaddressdate
WHERe personaddressdatekey = 1
ORDER BY 
  personaddressdatekey,
  personkey,
  addresskey,
  datetimekey;
报表数据库 - ADS 层
CREATE DATAbase IF NOT EXISTS reportdb;
USE reportdb;

CREATE TABLE IF NOT EXISTS reportdb.report001(
  firstname       STRING,
  lastname        STRING,
  accountnumber   INT,
  balance         DECIMAL(18, 9)
)
CLUSTERED BY (firstname, lastname) INTO 1 BUCKETS
STORED AS orc 
TBLPROPERTIES('transactional' = 'true','orc.compress'='ZLIB','orc.create.index'='true');

INSERT INTO TABLE reportdb.report001
SELECt 
  dimperson.firstname,
  dimperson.lastname,
  dimaccount.accountnumber,
  fctpersonaccount.balance
FROM
  organisedb.fctpersonaccount
JOIN
  organisedb.dimperson
ON
  fctpersonaccount.personkey = dimperson.personkey
JOIN
  organisedb.dimaccount  
ON  
  fctpersonaccount.accountkey = dimaccount.accountkey;
  
SELECt * FROM reportdb.report001;
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/581408.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号