栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink 实现Mysql的数据同步 Mysql Sink Mysql

Flink 实现Mysql的数据同步 Mysql Sink Mysql

Flink Sink Mysql 简单实现
  • 环境
  • 需求
  • 解析
  • 完整代码
  • 源码下载
  • 附:flink-cdc

环境
组件版本
scala2.12
netcat*
kafka*
mysql*
flink1.13.3
需求
监听mysql某个表的动态,实时同步到另一个数据库中。
当然使用maxwell或canal也可以实现同样的效果。这里只是简单演示
解析
创建环境
//创建环境
    val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
    val tEnv = TableEnvironment.create(settings)
创建表user01,mysql-cdc是第三方连接器
//创建表user01 - 使用 mysql-cdc connector
    tEnv.executeSql(
      """
        |create table user01 (
        |id int ,
        |name string,
        |PRIMARY KEY  (id) NOT ENFORCED
        |)with(
        |'connector' = 'mysql-cdc',
        |'hostname' = 'server120',
        |'port' = '3306',
        |'username' = 'flink_test',
        |'password' = 'flink_test',
        |'database-name' = 'flink_test',
        |'table-name' = 'user01',
        |'scan.incremental.snapshot.enabled' = 'false'
        |)
        |""".stripMargin)
创建表user02,jdbc 是flink自带的连接器
//创建表user02 - 使用jdbc connector
    tEnv.executeSql(
      """
        |create table user02 (
        |id int PRIMARY KEY,
        |name string
        |)with(
         'connector' = 'jdbc',
        | 'url' = 'jdbc:mysql://server120:3306/flink_test',
        | 'table-name' = 'user02',
        | 'username' = 'flink_test',
        | 'password' = 'flink_test'
        |)
        |""".stripMargin)
将user01同步到user02
    //将user01同步到user02
    tEnv.from("user01").executeInsert("user02")
这种方式简单的实现了实时同步mysql某个表的增删改查。
完整代码
package com.z.tableapi

import org.apache.flink.table.api._


object Mysql2MysqlWithCDC {
  def main(args: Array[String]): Unit = {
    //创建环境
    val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
    val tEnv = TableEnvironment.create(settings)
    //创建表user01 - 使用 mysql-cdc connector
    tEnv.executeSql(
      """
        |create table user01 (
        |id int ,
        |name string,
        |PRIMARY KEY  (id) NOT ENFORCED
        |)with(
        |'connector' = 'mysql-cdc',
        |'hostname' = 'server120',
        |'port' = '3306',
        |'username' = 'flink_test',
        |'password' = 'flink_test',
        |'database-name' = 'flink_test',
        |'table-name' = 'user01',
        |'scan.incremental.snapshot.enabled' = 'false'
        |)
        |""".stripMargin)
    //创建表user02 - 使用jdbc connector
    tEnv.executeSql(
      """
        |create table user02 (
        |id int PRIMARY KEY,
        |name string
        |)with(
         'connector' = 'jdbc',
        | 'url' = 'jdbc:mysql://server120:3306/flink_test',
        | 'table-name' = 'user02',
        | 'username' = 'flink_test',
        | 'password' = 'flink_test'
        |)
        |""".stripMargin)

    //将user01同步到user02
    tEnv.from("user01").executeInsert("user02")
  }
}

flink mysql 依赖


            com.ververica
            flink-connector-mysql-cdc
            ${mysql.cdc}
        
        
            org.apache.flink
            flink-connector-jdbc_${scala.version}
            ${flink.version}
        
        
            mysql
            mysql-connector-java
            5.1.47
        

flink 依赖


        
            org.apache.flink
            flink-clients_${scala.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-scala_${scala.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-scala_${scala.version}
            ${flink.version}
        
    
源码下载

https://download.csdn.net/download/sinat_25528181/44038825

附:flink-cdc
目前有mysql-cdc、postgres-cdc、MongoDB-cdc、oracle-cdc

Github官网:https://github.com/ververica/flink-cdc-connectors

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/581988.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号