栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink on Hive构建流批一体数仓

Flink on Hive构建流批一体数仓

Flink on Hive构建流批一体数仓

基于 Flink1.12

一、Flink写入Hive表

Flink支持以批处理(Batch)和流处理(Streaming)的方式写入Hive表。当以批处理的方式写入Hive表时,只有当写入作业结束时,才可以看到写入的数据。批处理的方式写入支持append模式和overwrite模式。

1、批处理模式写入

向非分区表写入数据7

Flink SQL> use catalog myhive; -- 使用catalog
Flink SQL> INSERT INTO users SELECt 2,'tom';
Flink SQL> set execution.type=batch; -- 使用批处理模式
Flink SQL> INSERT OVERWRITE users SELECT 2,'tom';

向分区表写入数据

-- 向静态分区表写入数据
Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom', 25;
-- 向动态分区表写入数据
Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08';
2、流处理模式写入

流式写入Hive表,不支持**Insert overwrite **方式,否则报如下错误

示例:

将kafka的数据流式写入Hive的分区表
-- 使用流处理模式
Flink SQL> set execution.type=streaming;

-- 使用默认SQL方言
Flink SQL> SET table.sql-dialect=default; 
-- 创建一张kafka数据源表
drop table if exists stream_kafka;
create table stream_kafka(
    `user_name` string,
    `value` double,
    `ts` string,
    `proctime` as proctime(), -- 通过计算列产生一个处理时间列
    `eventTime` as to_timestamp(from_unixtime(unix_timestamp(ts,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss')) ,-- 事件时间
     watermark for eventTime as eventTime - interval '5' second   -- 定义watermark
)with(
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'stream_kafka', -- kafka主题
    'scan.startup.mode' = 'earliest-offset', -- 偏移量
    'properties.group.id' = 'group-test', -- 消费者组
    'properties.bootstrap.servers' = 'azkaban:9092',
    'format' = 'json', -- 数据源格式为json
    'json.fail-on-missing-field' = 'true',
    'json.ignore-parse-errors' = 'false'
);

 # 测试数据 
{"user_name":"zhangsan","value": 1.0,"ts":"2021-07-17 10:00:01"}
{"user_name":"zhangsan","value": 1.0,"ts":"2021-07-17 10:00:02"}
{"user_name":"zhangsan","value": 1.0,"ts":"2021-07-17 10:00:03"}
{"user_name":"zhangsan","value": 1.0,"ts":"2021-07-17 10:00:04"}
{"user_name":"zhangsan","value": 1.0,"ts":"2021-07-17 10:00:05"}
{"user_name":"zhangsan","value": 1.0,"ts":"2021-07-17 10:00:06"}
{"user_name":"zhangsan","value": 1.0,"ts":"2021-07-17 10:00:07"}
{"user_name":"zhangsan","value": 1.0,"ts":"2021-07-17 10:00:08"}
{"user_name":"zhangsan","value": 1.0,"ts":"2021-07-17 10:00:09"}
{"user_name":"zhangsan","value": 1.0,"ts":"2021-07-17 10:00:10"}

-- 使用Hive方言
Flink SQL> SET table.sql-dialect=hive; 
-- 创建一张Hive分区表
CREATE TABLE stream_kafka_hive_tbl (
        `user_name` string, -- 用户
        `value` double, -- 值
        `ts` string -- 行为发生的时间
) PARTITIONED BY (dt STRING,hr STRING,mi STRING) STORED AS parquet TBLPROPERTIES (
    'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',
    'sink.partition-commit.trigger'='partition-time',
    'sink.partition-commit.delay'='0S',
    'sink.partition-commit.policy.kind'='metastore,success-file'
);
执行流式写入Hive表
INSERT INTO stream_kafka_hive_tbl
SELECt `user_name`, `value`, `ts`, from_unixtime(unix_timestamp(`ts`,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd'),
       from_unixtime(unix_timestamp(`ts`,'yyyy-MM-dd HH:mm:ss'),'HH'),
       from_unixtime(unix_timestamp(`ts`,'yyyy-MM-dd HH:mm:ss'),'mm') from stream_kafka;

关于Hive表的一些属性解释:

partition.time-extractor.timestamp-pattern 默认值:(none)
解释:分区时间抽取器,与 DDL 中的分区字段保持一致:
1) 如果是按天分区,则可以是: d t , 2 ) 如 果 是 按 年 ( y e a r ) 月 ( m o n t h ) 日 ( d a y ) 时 ( h o u r ) 进 行 分 区 , 则 该 属 性 值 为 : dt, 2)如果是按年(year)月(month)日(day)时(hour)进行分区,则该属性值为: dt,2)如果是按年(year)月(month)日(day)时(hour)进行分区,则该属性值为:year- m o n t h − month- month−day
h o u r : 00 : 00 , 3 ) 如 果 是 按 天 时 进 行 分 区 , 则 该 属 性 值 为 : hour:00:00, 3)如果是按天时进行分区,则该属性值为: hour:00:00,3)如果是按天时进行分区,则该属性值为:day $hour:00:00;

sink.partition-commit.trigger process-time:不需要时间提取器和水位线,当当前时间大于分区创建时间

  • sink.partition-commit.delay 中定义的时间,提交分区; partition-time:需要 Source 表中定义 watermark,当 watermark > 提取到的分区时间 +sink.partition-commit.delay
    中定义的时间,提交分区; 默认值:process-time 解释:分区触发器类型,可选 process-time
    或partition-time。 sink.partition-commit.delay 默认值:0S
    解释:分区提交的延时时间,如果是按天分区,则该属性的值为:1d,如果是按小时分区,则该属性值为1h;
    sink.partition-commit.policy.kind metastore:添加分区的元数据信息,仅Hive表支持该值配置
    success-file:在表的存储路径下添加一个_SUCCESS文件 默认值:(none)
    解释:提交分区的策略,用于通知下游的应用该分区已经完成了写入,也就是说该分区的数据可以被访问读取。可选的值如下:
    可以同时配置上面的两个值,比如metastore,success-file

尖叫提示:
1.Flink读取Hive表默认使用的是batch模式,如果要使用流式读取Hive表,需要而外指定一些参数,见下文。
2.只有在完成 checkpoint 之后,文件才会从 In-progress 状态变成 Finish 状态,同时生成 _SUCCESS文件,所以,Flink流式写入Hive表需要开启并配置 Checkpoint。对于Flink SQL Client而言,需要在flink-conf.yaml中开启CheckPoint,配置内容为:

flink-conf.yaml 配置:

# Fault tolerance and checkpointing
#==============================================================================
state.backend: filesystem
execution.checkpointing.externalized-checkpoint-retention:RETAIN_ON_CANCELLATION
execution.checkpointing.interval: 120s 
execution.checkpointing.mode: EXACTLY_ONCE
state.savepoints.dir: hdfs://wind:9000/flink-savepoints

引入依赖hadoop jar 包

hadoop-common-2.7.3.jar
hadoop-mapreduce-client-common-2.7.3.2.6.0.3-8.jar 
hadoop-mapreduce-client-core-2.7.3.2.6.0.3-8.jar
hadoop-mapreduce-client-jobclient-2.7.3.2.6.0.3-8.jar

引入kafka jar包:

flink-connector-kafka_2.11-1.12.0.jar
kafka-clients-2.1.0.jar

引入hive 包:

flink-connector-hive_2.11-1.12.0.jar
hive-exec-2.3.2.jar

sql-client-defaults.yaml 配置:

# Catalogs
#==============================================================================
# Define catalogs here.
catalogs: #[] # empty list
# A typical catalog definition looks like:
 - name: myhive
   type: hive
   hive-conf-dir: /wind/hive-2.3.2-bin/conf/
  default-database: default
   
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/300185.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号