栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink知识总结(汇总)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink知识总结(汇总)

一、Flink cdc 1.简介 1.1 CDC种类

FlinkCDC,简单了解下Change Data Capture(变更数据获取)的概念:
核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、 更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费
CDC的种类:主要分为基于查询和基于Binlog两种方式,区别:

针对Binlog的CDC,有如下的区别分析:

1.2 FlinkCDC 版本

Flink自然也不甘示弱,FlinkCDC应运而生,通过flink-cdc-connectors 组件,可以直接从MySQL等数据库直接读取全量数据和增量变更数据的source组件

开源地址:https://github.com/ververica/flink-cdc-connectors

版本信息

2.Demo实战演示 2.1 Datestream方式

通过创建maven项目,通过pom文件注入相关依赖:

 
 org.apache.flink
 flink-java
 1.12.0
 
 
 org.apache.flink
 flink-streaming-java_2.12
 1.12.0
 
 
 org.apache.flink
 flink-clients_2.12
 1.12.0
 
 
 org.apache.hadoop
 hadoop-client
 3.1.3
 
 
 mysql
 mysql-connector-java
 5.1.49
 
  
 com.alibaba.ververica
 flink-connector-mysql-cdc
 1.2.0
 
 
 com.alibaba
 fastjson
 1.2.75
 


 
 
 org.apache.maven.plugins
 maven-assembly-plugin
 3.0.0
 
 
 jar-with-dependencies
 
 
 
 
 make-assembly
 package
 
 single
 
 
 
 
 


打开IDEA

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkCDC {
    public static void main(String[] args) throws Exception {
        // 1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        // 开启 Checkpoint,每隔 5 秒钟做一次 Checkpoint
        env.enableCheckpointing(5000L);
        
        //指定 CK 的一致性语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        
        // 设置超时时间
        env
        
       //设置任务关闭的时候保留最后一次 CK 数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(
            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        
        
        // 设置状态后端
		env.setStateBackend(new FsStateBackend("hdfs://centos01:8020/flinkCDC"));


        // 2、通过cdc构建SourceFunction并且读取数据

        DebeziumSourceFunction mySQLSource = MySQLSource.builder()
                .hostname("centos01")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("flink")
                .tableList("flink.base_trademark") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据  注意:指定的时候需要使用"db.table"的方式
                .deserializer(new StringDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();


        DataStreamSource streamSource = env.addSource(mySQLSource);


        // 3、打印数据
        streamSource.print();


        // 4、启动任务
        env.execute("FlinkCDC");

    }
}
```shell
flink启动standalone模式
端口号8081
[root@centos01 flink-1.13.1]# bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host centos01.
Starting taskexecutor daemon on host centos02.
Starting taskexecutor daemon on host centos03.

运行jar包
[root@centos01 flink-1.13.1]# bin/flink run -m centos01:8081 -c com.yyds.FlinkCDC ./flink-cdc-1.0.jar 



开启savepoint
[root@centos01 flink-1.13.1]# bin/flink savepoint 88acb63b3d39ab4b6749e7378259676c hdfs://centos01:8020/flinkCDC/savepoint
Triggering savepoint for job 88acb63b3d39ab4b6749e7378259676c.
Waiting for response...
Savepoint completed. Path: hdfs://centos01:8020/flinkCDC/savepoint/savepoint-88acb6-97b82909494b
You can resume your program from this savepoint with the run command.


从savepoint中启动,实现断点续传的功能
[root@centos01 flink-1.13.1]# bin/flink run -m centos01:8081 -s hdfs://centos01:8020/flinkCDC/savepoint/savepoint-88acb6-97b82909494b -c com.yyds.FlinkCDC ./flink-cdc-1.0.jar 

2.2 FlinkSql方式

同样首先注入依赖


 org.apache.flink
 flink-table-planner-blink_2.12
 1.12.0


import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class FlinkCDCWithSql {
    public static void main(String[] args) throws Exception {
        // 1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 2、DDL方式建表
        tableEnv.executeSql("CREATE TABLE binlog (" +
                " id BIGINT NOT NULL," +
                " tm_name STRING," +
                " logo_url STRING" +
                ") WITH (" +
                " 'connector' = 'mysql-cdc'," +
                " 'hostname' = 'centos01'," +
                " 'port' = '3306'," +
                " 'username' = 'root'," +
                " 'password' = '123456'," +
                " 'database-name' = 'flink'," +
                " 'table-name' = 'base_trademark'" +
                ")");

        // 3、查询数据
        Table table = tableEnv.sqlQuery("select * from binlog ");


        // 4、将动态表转换维流
        DataStream> retractStream = tableEnv.toRetractStream(table, Row.class);

        retractStream.print();
//        tableEnv.executeSql("select * from binlog").print();

        // 5、启动任务
        env.execute("FlinkCDCSQL");

    }
}

二、Flink connector 连接器

Flink作为一个计算引擎,是缺少存储介质的,那么数据从哪儿来,到哪儿去,就需要连接器了,链接各种类型数据库,各种类型组件进行数据的抽取、计算、存储等,下面来看看flink都有哪些connector,怎么使用的?

介绍

看看目前支持的connector:
这是官方给出的:

有些支持数据源,有些不支持数据源,有些支持无边界流式处理,有些不支持,具体看上图。

我们目前市面上用的比较多的数据库,大概是以下几种:

# 支持jdbc
mysql mongodb postgresql oracle db2 sybase sqlserver hive 
# 不支持jdbc
hbase es 文件 消息队列(kafka rabbitmq rocketmq)

使用

kafka

CREATE TABLE MyUserTable (
  -- declare the schema of the table
  `user` BIGINT,
  `message` STRING
) WITH (
  -- declare the external system to connect to
  'connector' = 'kafka',
  'topic' = 'topic_name',
  'scan.startup.mode' = 'earliest-offset', -- 还有可选从最近offset开始消费:latest-offset
  'properties.bootstrap.servers' = 'localhost:9092', --kafka broker连接串
  'format' = 'json'   -- declare a format for this system
)

详细可见官网链接:flink 官网

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/820955.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号