栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

第三章 FlinkCDC专题之mysql-cdc-source基础练习(Flinksql)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

第三章 FlinkCDC专题之mysql-cdc-source基础练习(Flinksql)

FlinkCDC专题
官网:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/oracle-cdc.html
1、导入依赖

 
 org.apache.flink
 flink-java
 1.12.0
 
 
 org.apache.flink
 flink-streaming-java_2.12
 1.12.0
 
 
 org.apache.flink
 flink-clients_2.12
 1.12.0
 
 
 org.apache.hadoop
 hadoop-client
 3.1.3
 
 
 mysql
 mysql-connector-java
 5.1.49


 org.apache.flink
 flink-table-planner-blink_2.12
 1.12.0


 com.ververica
flink-connector-mysql-cdc
 2.0.2

 
 com.alibaba
 fastjson
 1.2.75
 


 
 
 org.apache.maven.plugins
 maven-assembly-plugin
 3.0.0
 
 
 jar-with-dependencies
 
 
 
 
 make-assembly
 package
 
 single
 
 
 
 
 

2、测试环境准备
  • 在mysql中对数据监测的库开启binlog
sudo vim /etc/my.cnf

#添加数据库的binlog
binlog-do-db=flink

#重启MySQL服务
sudo systemctl restart mysqld

  • 查询生成日志
cd /var/lib/mysql

  • 创建库表并插入数据
create database flink;

use flink;

CREATE TABLE `stu` (
  `id` int(5),
  `name` varchar(20) ,
  `grade` int(5),
  `score` int(5),
    primary key (id)
) 


insert into stu value(1,"zhangsan",9,99);
3、FlinkCDC之FlinkSQL实战案例

(1)编写脚本

package com.hxjy;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class flinksqldemo {
    public static void main(String[] args) throws Exception{
        // 1、创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 2、创建Flink-Mysql-CDC的Source
        tableEnv.executeSql(
                "CREATE TABLE user_info (" +
                        " id INT," +
                        " name STRING," +
                        " grade INT," +
                        " score INT," +
                        " PRIMARY KEY (`id`)  NOT ENFORCED" +
                        ") WITH (" +
                        " 'connector' = 'mysql-cdc'," +
                        " 'hostname' = '192.168.6.102'," +
                        " 'port' = '3306'," +
                        " 'username' = 'root'," +
                        " 'password' = '123456'," +
                        " 'database-name' = 'flink'," +
                        " 'table-name' = 'stu'," +
                        "   'debezium.snapshot.mode' = 'initial' " +  // 读取mysql的全量,增量以及更新数据
                        ")");
        // 3、打印输出
        Table table = tableEnv.sqlQuery("select * from user_info");
        DataStream> stream = tableEnv.toRetractStream(table,Row.class);
        stream.print();

        // 4、提交任务
        env.execute();
    }
}

(2)运行测试

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/685949.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号