栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink 实践教程-入门(10):Python作业的使用

Flink 实践教程-入门(10):Python作业的使用

作者:腾讯云流计算 Oceanus 团队 流计算 Oceanus 简介  

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

本文将通过一个处理数据后存入 MySQL 的作业示例,为您详细介绍如何使用 PyFlink。

前置准备 创建流计算 Oceanus 集群

进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。

创建 MySQL 实例

进入 MySQL 控制台 [3],点击【新建】。具体可参考官方文档 创建 MySQL 实例 [4]。进入实例后,单击右上角【登录】即可登陆 MySQL 数据库。

创建 MySQL 表
-- 建表语句,用于接受 Sink 端数据
CREATE TABLE `oceanus_intro10_output` (
 `id` int(5) DEFAULT NULL,
 `data` varchar(1000) DEFAULT ''
) ENGINE=InnoDB DEFAULT CHARSET=utf8

本地开发 PyFlink

这里使用 Datagen 连接器随机生成数据,经过简单的逻辑处理后存入 MySQL 中。

代码编写

作者使用 PyCharm 新建了一个 Python 项目,并以 demo1.py 作为需要上传到 Oceanus 平台的主类。

##   demo1.py


from pyflink.table import EnvironmentSettings, TableEnvironment


def pyflink_demo() :
   env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
   table_env = TableEnvironment.create(env_settings)


   table_env.execute_sql("""
      CREATE TABLE datagen (
      id INT,
      data STRING
  ) WITH (
      'connector' = 'datagen',
      'fields.id.kind' = 'sequence',
      'fields.id.start' = '1',
      'fields.id.end' = '10'
  )
  """)


   table_env.execute_sql("""
      CREATE TABLE `jdbc_upsert_sink_table` (
      id INT,
      data VARCHAR
  ) WITH (
      -- 指定数据库连接参数
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://xx.xx.xx.xx:3306/testdb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- 请替换为您的实际 MySQL 连接参数
      'table-name' = 'oceanus_intro10_output', -- 需要写入的数据表
      'username' = 'root',                     -- 数据库访问的用户名(需要提供 INSERT 权限)
      'password' = '*************',           -- 数据库访问的密码
      'sink.buffer-flush.max-rows' = '200',   -- 批量输出的条数
      'sink.buffer-flush.interval' = '2s'     -- 批量输出的间隔
  )
  """)


   source_table = table_env.sql_query("select * from datagen")
   result_table = source_table.select(source_table.id + 1, source_table.data)
   result_table.execute_insert("jdbc_upsert_sink_table").wait()


if __name__ == '__main__':


   pyflink_demo()

注意:如需本地调试,需在 PyCharm 终端输入命令 python -m pip install apache-flink 安装 flink 环境,默认安装最新版本。

流计算 Oceanus 作业 1. 上传依赖

在 Oceanus 控制台,点击左侧【依赖管理】,点击左上角【新建】新建依赖,上传本地 demo1.py 文件。当然也可以上传 Python 程序包。

2. 创建作业

在 Oceanus 控制台,点击左侧【作业管理】,点击左上角【新建】新建作业,作业类型选择 Python 作业,点击【开发调试】进入作业编辑页面。

【主程序包】选择刚才上传的 demo1.py 文件,并选择最新版本;【Python 环境】选择 Python-3.7;【作业参数】 > 【内置 Connector】选择 flink-connector-jdbc。

注意:如果上传的为 Zip 文件(此处上传的为 py 文件),则【入口类】需填写相应的主入口类。

3. 运行作业

点击【发布草稿】即可运行,可通过【日志】面板 TaskManager 或 Flink UI 查看运行信息。

总结

本文首先用 Datagen 连接器生成随机数据,经过简单处理后存入 MySQL 中,并无复杂的逻辑处理和第三方 Python 包的应用。Oceanus 平台已经内置了常见的 Python 包,如果没有复杂的逻辑,可以直接上传 xxxx.py 文件到 oceanus 平台运行,非常方便初学者调试运行。当然 oceanus 平台也提供上传 Zip 包和第三方 Python 包的能力,详情可以参考 Oceanus Python 开发指南 [5]。

更多 Oceanus Flink 实践教程详见 流计算 Oceanus 教程 [6]  更多 PyFlink DataStream && Table API 编写详见 Flink 官方文档 [7]

参考链接

[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview

[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298  

[3] MySQL 控制台:https://console.cloud.tencent.com/cdb  

[4] 创建 MySQL 实例:https://cloud.tencent.com/document/product/236/46433  

[5] Oceanus Python 开发指南:https://cloud.tencent.com/document/product/849/70715  

[6] 流计算 Oceanus 教程:https://cloud.tencent.com/developer/tag/10509  

[7] Flink 官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/python/overview/  

扫码加入 流计算 Oceanus 产品交流群

扫码关注「腾讯云大数据」,了解腾讯云流计算 Oceanus 更多信息~

腾讯云大数据

长按二维码
关注我们

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/779390.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号