流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
操作教程
本文将为您详细介绍如何使用自定义聚合函数(UDAF),将处理后的存入 MySQL 中。
前置准备 创建流计算 Oceanus 集群进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。
创建 MySQL 实例进入 MySQL 控制台 [3],点击【新建】。具体可参考官方文档 创建 MySQL 实例 [4]。进入实例后,单击右上角【登陆】即可登陆 MySQL 数据库。
创建 MySQL 表
-- 建表语句,用于向 Source 提供数据 CREATE TABLE `udaf_input` ( `id` int(10) NOT NULL, `product` varchar(50) DEFAULT '', `value` int(10) DEFAULT NULL, `weight` int(10) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 -- 插入数据 INSERT INTO `udaf_input` (`id`, `product`, `value`, `weight`) VALUES (1, 'oceanus-1', 2, 2); INSERT INTO `udaf_input` (`id`, `product`, `value`, `weight`) VALUES (2, 'oceanus-1', 3, 3); INSERT INTO `udaf_input` (`id`, `product`, `value`, `weight`) VALUES (3, 'oceanus-2', 5, 4); INSERT INTO `udaf_input` (`id`, `product`, `value`, `weight`) VALUES (5, 'oceanus-2', 6, 5); -- 建表语句,用于接收 Sink 端数据 CREATE TABLE `udaf_output` ( `product` varchar(50) NOT NULL DEFAULT '', `sum` double(11,0) DEFAULT NULL, PRIMARY KEY (`product`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
开发 UDAF
我们自定义一个 UDAF,继承 AggregateFunction,对算子输入的两个字段计算加权平均值。
1. 代码编写
WeightedAvgAccumulator类:
package demos.UDAF;
public class WeightedAvgAccumulator{
public long sum = 0;
public int count = 0;
}
WeightedAvg 类:
package demos.UDAF; import org.apache.flink.table.functions.AggregateFunction; public class WeightedAvg extends AggregateFunction{ @Override public WeightedAvgAccumulator createAccumulator() { return new WeightedAvgAccumulator(); } @Override public Long getValue(WeightedAvgAccumulator acc) { if (acc.count == 0) { return null; } else { return acc.sum / acc.count; } } public void accumulate(WeightedAvgAccumulator acc, Long iValue, Integer iWeight) { acc.sum += iValue * iWeight; acc.count += iWeight; } public void retract(WeightedAvgAccumulator acc, Long iValue, Integer iWeight) { acc.sum -= iValue * iWeight; acc.count -= iWeight; } public void merge(WeightedAvgAccumulator acc, Iterable it) { for (WeightedAvgAccumulator a : it) { acc.count += a.count; acc.sum += a.sum; } } public void resetAccumulator(WeightedAvgAccumulator acc) { acc.count = 0; acc.sum = 0L; } }
2. 打包 Jar
使用 IDEA 自带打包工具 Build Artifacts 或者命令行进行打包。命令行打包命令:
mvn clean package
命令行打包后生成的 Jar 包可以在项目 target 目录下找到。
流计算 Oceanus 作业 上传依赖在 Oceanus 控制台,点击左侧【依赖管理】,点击左上角【新建】新建依赖,上传本地 Jar 包。
创建 SQL 作业在 Oceanus 控制台,点击左侧【作业管理】,点击左上角【新建】新建作业,作业类型选择 SQL 作业,点击【开发调试】进入作业编辑页面。单击【作业参数】,在【引用程序包】处选择刚才上传的 Jar 包。
1. 创建 Function
CREATE TEMPORARY SYSTEM FUNCTION WeightedAvg AS 'demos.UDAF.WeightedAvg';
WeightedAvg代表创建的函数名,demos.UDAF.WeightedAvg代表代码所在路径。
2. 创建 Source
CREATE TABLE `mysql_cdc_source_table` ( `id` INT, `product` VARCHAR, `value` INT, `weight` INT, PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义 ) WITH ( 'connector' = 'mysql-cdc', -- 固定值 'mysql-cdc' 'hostname' = 'xx.xx.xx.xx', -- 数据库的 IP 'port' = 'xxxx', -- 数据库的访问端口 'username' = 'root', -- 数据库访问的用户名(需要提供 SHOW DATAbaseS、REPLICATION SLAVE、REPLICATION CLIENT、SELECt 和 RELOAD 权限) 'password' = 'xxxxxxxxx', -- 数据库访问的密码 'database-name' = 'testdb', -- 需要同步的数据库 'table-name' = 'udaf_input' -- 需要同步的数据表名 );
3. 创建 Sink
CREATE TABLE `jdbc_source_table` ( `product` VARCHAR, `sum` DOUBLE, PRIMARY KEY(`product`) NOT ENFORCED ) WITH ( -- 指定数据库连接参数 'connector' = 'jdbc', 'url' = 'jdbc:mysql://xx.xx.xx.xx:xxxx/testdb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- 请替换为您的实际 MySQL 连接参数 'table-name' = 'udaf_output', -- 需要写入的数据表 'username' = 'root', -- 数据库访问的用户名(需要提供 INSERT 权限) 'password' = 'xxxxxxxxx', -- 数据库访问的密码 'sink.buffer-flush.max-rows' = '200', -- 批量输出的条数 'sink.buffer-flush.interval' = '2s' -- 批量输出的间隔 );
4. 编写业务 SQL
INSERT INTO jdbc_source_table SELECT product,CAST(WeightedAvg(`value`,`weight`) AS DOUBLE) AS `sum` FROM mysql_cdc_source_table GROUP BY `product`;
总结
本文首先在本地开发 UDAF 函数,将其打成 Jar 包后上传到 Oceanus 平台引用。接下来使用 MySQL CDC 连接器获取udaf_input表数据,调用 UDAF 函数对输入的两个字段计算加权平均值后存入 MySQL 中。其他的自定义函数,例如自定义标量函数(UDF)和自定义表值函数(UDTF)的使用方法和视频教程可以参考之前的文章 Flink 实践教程:进阶8-自定义标量函数(UDF) [5]、Flink 实践教程:进阶9-自定义表值函数(UDTF) [6]
自定义聚合函数(UDAF)可以将多条记录聚合成 1 条记录。
参考链接[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview
[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298
[3] MySQL 控制台:https://console.cloud.tencent.com/cdb
[4] 创建 MySQL 实例:https://cloud.tencent.com/document/product/236/46433
[5] Flink 实践教程:进阶8-自定义标量函数(UDF):https://cloud.tencent.com/developer/article/1946320
[6] Flink 实践教程:进阶9-自定义表值函数(UDTF):https://cloud.tencent.com/developer/article/1951900



