栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

pyflink 用 jdbc 写入 mysql 例子

pyflink 用 jdbc 写入 mysql 例子

pyflink用jdbc连接mysql例子
准备:

软件:python3.7 pycharm

面向 flink1.13.5
安装模块:python -m pip install apache-flink==1.13.5
添加 mysql 的 jdbc jar 插件、pyflink 的 flink-connector-jdbc_2.11 两个插件
到pyflink模块的 lib 文件夹(注意mysql的jdbc版本、pyflink的路径、flink的版本)

${PYTHON_HOME} Python37site-packagespyflinklib

创建mysql表

CREATE TABLE `print_table` (

  `f0` int(11) DEFAULT NULL,
  `f1` int(11) DEFAULT NULL,
  `f2` varchar(500) DEFAULT NULL
)`

具体python代码

(注意 jdbc url 的 hostname、database 和 user、password根据自己的进行更改)

from pyflink.table import EnvironmentSettings, TableEnvironment

# 1. 创建 TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = TableEnvironment.create(env_settings)


# 2. 创建 source 表
table_env.execute_sql("""CREATE TABLE source_table (
  f0 INT,
  f1 INT,
  f2 STRING
 ) WITH (
  'connector' = 'datagen',
  'rows-per-second'='5'
 )
""")

# 3. 创建 sink 表
table_env.execute_sql("""CREATE TABLE print_table (
  f0 INT,
  f1 INT,
  f2 STRING
 ) WITH (
  'connector' = 'jdbc',
  'url'='jdbc:mysql://hostname:3306/test?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',
  'username'='root',
  'password'='password',
  'table-name' = 'print_table'
 )
""")


# 或者通过 SQL 查询语句来写入 sink 表:
table_env.execute_sql("insert into print_table select f0,f1,f2 from source_table").wait(60000)
# wait(60000)的60000是超时时间,60000毫秒,即60秒,这个可以根据自己的需求进行更改

转载说明出处

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/742773.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号