栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

FLink1.13.1+FlinkCDC2.0.2+Hudi0.10构建流批一体数仓

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

FLink1.13.1+FlinkCDC2.0.2+Hudi0.10构建流批一体数仓

架构版本:Flink1.13.1+FlinkCDC2.0.2+Hudi0.10 构建hudi

(1)通过国内镜像拉取源码

git clone https://github.com/apache/hudi.git

(2)修改pom.xml

vim pom.xml

-- 直接添加
	
        nexus-aliyun
        nexus-aliyun
        http://maven.aliyun.com/nexus/content/groups/public/
        
            true
        
        
            false
        
    

(3)构建

mvn clean package -DskipTests -Dspark3 -Dscala-2.1

(4)编译好之后文件目录对应Hudi下的packaging目录

Flink 操作 sql-client操作
-- 1、下载flink1.13.1
-- 2、添加hadoop环境变量
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
-- 3、启动flink集群
./start-cluster.sh 或 ./yarn-session.sh
-- 4、启动flink sql client,并关联编译好的hudi依赖包
bin/sql-client.sh embedded -j $hudi_home/packaging/hudi-flink-bundle/target/hudi-flink-bundle***.jar 

Flink Sql Client操作 查询数据
set ql-client.execution.result-mode=tableau;

CREATE TABLE t1(
  uuid VARCHAr(20),
  name VARCHAr(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAr(20)
)
PARTITIonED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'schema://base-path',
  'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
);

-- insert data using values
INSERT INTO t1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
  
-- query from the Hudi table
select * from t1;

-- this would update the record with key 'id1'
insert into t1 values
  ('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');
流式查询#
CREATE TABLE t2(
  uuid VARCHAR(20),
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIonED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs:///hudi/t2',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',  -- this option enable the streaming read
  'read.streaming.start-commit' = '20210927134557' -- specifies the start commit instant time
  'read.streaming.check-interval' = '4' -- specifies the check interval for finding new source commits, default 60s.
);

-- Then query the table in stream mode
select * from t1;
IDEA操作
  1. 引入下面依赖
-- 1、自己将构建好的下载好的 jar包 按下面方式做成maven依赖,hudi-flink-bundle在hudi目录下
-- 2、cdc地址:https://github.com/ververica/flink-cdc-connectors/releases/tag/release-2.0.2
-- 3、mvn install:install-file -DgroupId=com.flink.cdc -DartifactId=flink-connector-mysql-cdc -Dversion=2.0.2-SNAPSHOT -Dpackaging=jar  -Dfile=flink-sql-connector-mysql-cdc-2.0.2.jar        
		
            com.flink.cdc
            flink-connector-mysql-cdc
            2.0.2-SNAPSHOT
        

        
            com.hudi.demo
            hudi-flink-bundle
            0.10.0-SNAPSHOT
        

2、写java程序

package com.bighao.SQL.Hudi;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class CDCToHudi {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 记得开启Checkpoint,不然数据量不够是不会往hoodi写的
        env.setParallelism(1).enableCheckpointing(10000);
        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
            .useBlinkPlanner()
            .inStreamingMode()
            .build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);

        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

        String sourceDDL = "CREATE TABLE t1 (" +
             "  uuid VARCHAr(20) PRIMARY KEY," +
             "  name VARCHAr(10)," +
             "  age INT, " +
             "  ts TIMESTAMP(3), " +
             "  par VARCHAr(20) " +
             ") WITH (" +
             "  'connector' = 'mysql-cdc'," +
             "  'hostname' = 'mysql所在主机'," +
             "  'port' = '3306'," +
             "  'username' = 'name'," +
             "  'password' = '***'," +
             "  'database-name' = 'test'," +
             "  'table-name' = 't1'," +
             "  'debezium.snapshot.mode' = 'initial'" +
             ")";

        String sinkDDL ="CREATE TABLE t3( " +
            "uuid VARCHAr(20), " +
            "name VARCHAr(10), " +
            "age INT, " +
            "ts TIMESTAMP(3), " +
            "`partition` VARCHAr(20)) " +
        "PARTITIonED BY (`partition`) " +
        "WITH ( " +
            "'connector' = 'hudi', " +
            "'path' = 'hdfs://ip:9820/flink-hudi/t3', " +
            "'table.type' = 'MERGE_ON_READ', " +
            "'read.streaming.enabled' = 'true' ," +
            "'read.streaming.check-interval' = '4')";

        tableEnv.executeSql(sourceDDL);
        tableEnv.executeSql(sinkDDL);
        tableEnv.executeSql("INSERT INTO t3(uuid,name,age,ts, `partition`)  SELECt uuid,name,age,ts, par  FROM t1");
        env.execute("read_hudi");
    }
}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class ReadHoodi {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
            .useBlinkPlanner()
            .inStreamingMode()
            .build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);

        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

        String sourceDDL ="CREATE TABLE t3( " +
            "uuid VARCHAr(20), " +
            "name VARCHAr(10), " +
            "age INT, " +
            "ts TIMESTAMP(3), " +
            "`partition` VARCHAr(20)) " +
            "PARTITIonED BY (`partition`) " +
            "WITH ( " +
            "'connector' = 'hudi', " +
            "'path' = 'hdfs://ip:9820/hudi/t3', " +
            "'table.type' = 'MERGE_ON_READ', " +
            "'read.streaming.enabled' = 'true' ," +
            "'read.streaming.check-interval' = '4')";

        tableEnv.executeSql(sourceDDL);
        TableResult result2 = tableEnv.executeSql("SELECt * FROM t3");
        result2.print();
        env.execute("read_hudi1");
    }
}

3、往mysql表中插入数据,修改数据,查看ReadHoodi打印的数据


[1] MySQL CDC 文档:

https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html

[2] Hudi Flink 答疑解惑:

https://www.yuque.com/docs/share/01c98494-a980-414c-9c45-152023bf3c17?#

[3] Hudi 的一些设计:

https://www.yuque.com/docs/share/5d1c383d-c3fc-483a-ad7e-d8181d6295cd?#

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/274846.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号