数字化时代,业务的实时处理需求越来越迫切,实时预警、实时风控、实时推荐等,Flink作为新一代流批统一的计算引擎,具有独特的天然流式计算特性和更为先进的架构设计的特点,它可以从不同的第三方存储引擎中读取数据,进行处理,然后再写出到另外的存储引擎中。
GES拥抱变化,开发了与Flink的对接工具GES-Flink-Connector。GES-Flink-Connector是一款自定义的离线/实时数据同步Flink连接器(connector),用于外部数据源与GES图数据库的数据同步。Connector的作用就相当于一个连接器,连接 Flink 计算引擎跟外界存储系统。GES-Flink-Connector具备流批统一的能力,对于离线计算与流计算的数据都可以写入GES图数据库中。利用Flink连接器机制,只要实现了数据源的Source Connector读取数据,就可以通过GES-Flink-Connector将数据进行自定义转换并导入到GES图数据库中。
GES-Flink-Connector的架构图如下所示:
GES-Flink-Connector具备如下能力:
流批统一,支持流数据与批数据数据导入支持三种提交模式,批量提交、间隔提交、混合提交利用Flink提供的Checkpoint机制,具备一定的容错能力具备导入失败处理能力,批导入失败转单条导入,单条导入失败转存储具备脏数据发现能力,验证属性数量是否符合要求,验证label是否存在具备脏数据和错误数据存储能力,可将数据存储到LOCAL、OBS、HDFS具备错误数据限制能力,当错误率达到一定上限时,停止任务 使用案例介绍
将离线数据导入GES
以向GES中导入JDBC离线数据为例,操作步骤如下:
- 将GES-Flink-Connector jar包打入本地maven仓库
mvn install:install-file -DgroupId=com.huawei.ges -DartifactId=ges-flink-connector -Dversion=1.0.0 -Dpackaging=jar -Dfile=../jars/ges-flink-connector-1.0.0.jar
- 添加相关maven依赖(flink版本需高于1.7.2)
com.huawei.ges ges-flink-connector1.0.0
配置相关参数
编写数据转换方法
// T is your data type public class GraphStringDataConverter implements GraphDataConverter{ @Override public String convert(T t) { // Implement your transformation method String s = ... return s; } }
- 创建flink任务
// ------------------------flink环境创建----------------------------------
// 创建flink流数据环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
environment.setParallelism(CONCURRENT_COUNT);
// 开启checkpoint 设置checkpoint时间间隔与checkpoint模式
environment.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
// -------------------------数据源获取-------------------------------------
// table schema
TypeInformation[] fieldTypes = new TypeInformation[]{
// id
BasicTypeInfo.INT_TYPE_INFO,
// label
BasicTypeInfo.STRING_TYPE_INFO,
// property 1
BasicTypeInfo.STRING_TYPE_INFO
// ...
};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
// query sql
String querySql = "select * from {$your_table_name}";
// 数据源获取,JDBCInputFormat 读出来数据为flink Row类型
DataStream dataSource =
environment.createInput(
JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("your_mysql_jdbc_url")
.setUsername("you_mysql_username")
.setPassword("you_mysql_password")
.setQuery(querySql)
.setRowTypeInfo(rowTypeInfo)
.finish());
// -------------------------输出源配置---------------------------------------
// 读取配置文件
Properties gesProp = new Properties();
InputStream in = GraphFlinkConnectorJdbcVertexExample.class.getClassLoader().getResourceAsStream("config.properties");
gesProp.load(in);
// 创建flink Row数据转为要求的逗号分隔字符串的策略
GraphDataConverter graphRowDataConvert = new GraphRowDataConvert();
GraphDataConvertStrategy> rowConvertStrategy =
new GraphDataConvertStrategy<>(graphRowDataConvert);
// 创建batch输出方法,并添加转化策略与配置文件
GraphBatchOutputFormat outputFormat = new GraphBatchOutputFormat<>(rowConvertStrategy, gesProp);
// 创建sink输出方法
GraphSinkFunction sinkFunction = new GraphSinkFunction<>(outputFormat);
// 为数据源添加输出方法
dataSource.addSink(sinkFunction).setParallelism(CONCURRENT_COUNT);
// 启动flink
environment.execute();
通过DLI与云上数据源交互
GES-Flink-Connector-DLI版本用于云上DLI Flink队列,采用Flink SQL的方式完成数据到GES的导入,操作步骤如下:
修改jar包内config.properties参数配置
将jar包导入OBS
DLI创建程序包(数据管理-程序包管理-创建程序包)
DLI购买队列并创建Flink作业
创建DLI Flink队列与GES图服务的对等连接(跨源连接-创建连接)
将vpc设置为GES图引擎服务的同一个vpc,并测试地址连通性。
- 编辑Flink SQL
# SOURCE表示数据源,可以是DLI支持的任意数据源 CREATE SOURCE STREAM v_labels ( id STRING, label STRING, uuid STRING, d1 STRING, d2 STRING ) WITH ( type = "obs", bucket = "your bucket", region = "your region", object_name = "your file", row_delimiter = "n", field_delimiter = "," ); # SINK表示输出源 为GES图数据库 CREATE SINK STREAM ges_sink ( id STRING, label STRING, uuid STRING, d1 STRING, d2 STRING ) WITH ( type = "user_defined", type_class_name = "com.huawei.ges.flink.connector.sink.GraphSinkFunction", -- 指定sinkFunction type_class_parameter = "" ); # Some data processing ... # 执行数据由输入源导入输出源 INSERT INTO ges_sink SELECt * -- 选择想要输出的字段 FROM v_labels;
本文由华为云发布。



