上一节介绍了使用Flink Table API 和 SQL在source端和sink连接外部文件系统,以及混用Table API 和 SQL进行一些简单的查询转换。实际生产环境中使用Flink连接外部系统的场景数不胜数,可能是外部文件系统的多种文件格式,如 CSV、Apache Parquet、Apache Avro等;可能是存储系统,如 JDBC、Apache Hbase、Apache Cassandra、Elasticsearch等,或消息队列系统,如 Apache Kafka、RabbitMQ等。
本节将继续介绍使用Flink Table API 和 SQL连接一些常用的外部系统。
二、更新模式介绍:对于流式查询,需要声明如何在表和外部连接器之间执行转换。与外部系统交换的消息类型,由更新模式(Uadate Mode)指定:
追加(Append)模式:表只做插入操作,和外部连接器只交换插入(Insert)消息
撤回(Retract)模式:表和外部连接器交换添加(Add)和撤回(Retract)消息。插入操作(Insert)编码为Add消息;删除(Delete)编码为Retract消息;更新(Update)编码为上一条的Retract和下一条的Add消息
更新插入(Upsert)模式:更新和插入都被编码为Upsert消息;删除编码为Delete消息
三、读写KafkaKafka作为消息队列,和文件系统类似的,只能往里追加数据,不能修改数据。
1、使用Table API读写Kafka 1.1 我们经常使用Table API 的 tableEnv.connect(new Kafka()) 来连接Kafka,然后在其中指定Kafka的版本、topic和一些连接参数。具体请参考下面的demo:消费sensor topic 的数据,并进行一些简单查询转换,然后将数据写到sinktest topic。
public static void main(String[] args) throws Exception {
//1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.创建表执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//3.连接Kafka,消费sensor topic的数据
tableEnv.connect(new Kafka()
.version("universal")
.topic("sensor")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
).withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE())
).createTemporaryTable("inputTable");
//4.查询转换
Table inputTable = tableEnv.from("inputTable");
//新版Table API
Table resultTable = inputTable.select($("id"), $("timestamp"), $("temp"))
.filter($("id").isEqual("sensor_6"));
//6.建立Kafka连接,输出到sinktest topic下
tableEnv.connect(new Kafka()
.version("universal")
.topic("sinktest")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
).withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE()))
.createTemporaryTable("outputTable");
//查询结果集输出到outputTable
resultTable.insertInto("outputTable");
tableEnv.execute("");
}
1.2 本地测试环境开启zookeeper 和 Kafka 服务,创建sensor 和 sinktest topic:
kafka-topic.sh --zookeeper localhost:2181 --create --topic sensor --partitions 1 --replication-factor 1 kafka-topic.sh --zookeeper localhost:2181 --create --topic sinktest --partitions 1 --replication-factor 11.3 启动Kafka生产者和消费者console:
kafka-console-producer.sh --broker-list localhost:9092 --topic sensor
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sinktest1.4 启动Flink程序,并在Kafka生产者console输入数据,观察消费者端console输出:
输入
>sensor_1,1547718199,35.8 >sensor_6,1547718201,15.4 >sensor_7,1547718202,6.7 >sensor_10,1547718205,38.1 >sensor_6,1547718209,34.5
输出
sensor_6,1547718201,15.4 sensor_6,1547718209,34.5
观察消费者端输出,只消费到id为sensor_6的数据,说明程序连接Kafka并进行简单转换没有问题。
2、使用Flink SQL读写Kafka输入端消费Kafka,输出端将数据生产到Kafka,我们都使用Flink SQL 来完成:
public static void main(String[] args) throws Exception {
//1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.创建表执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//3.连接Kafka,消费sensor topic的数据
String inputTable = "sensor";
String dropSql = "DROP TABLE IF EXISTS " + inputTable;
String kafkaTableDDL
= "CREATE TABLE " + inputTable + " (n" +
" id String,n" +
" ts BIGINT,n" +
" temp DOUBLEn" +
") WITH (n" +
" 'connector.type' = 'kafka',n" +
" 'connector.version' = 'universal',n" +
" 'connector.topic' = 'sensor',n" +
" 'connector.properties.bootstrap.servers' = 'localhost:9092',n" +
" 'format.type' = 'csv',n" +
" 'update-mode' = 'append'n" +
")";
//执行输入端的建表语句
tableEnv.executeSql(dropSql);
tableEnv.executeSql(kafkaTableDDL);
//6.建立Kafka连接,输出到不同的topic下
String outputTable = "sensor_2";
String dropSql2 = "DROP TABLE IF EXISTS " + outputTable;
String kafkaTableDDL2
= "CREATE TABLE " + outputTable + " (n" +
" id String,n" +
" ts BIGINT,n" +
" temp DOUBLEn" +
") WITH (n" +
" 'connector.type' = 'kafka',n" +
" 'connector.version' = 'universal',n" +
" 'connector.topic' = 'sinktest',n" +
" 'connector.properties.bootstrap.servers' = 'localhost:9092',n" +
" 'format.type' = 'csv',n" +
" 'update-mode' = 'append'n" +
")";
//执行输出端的建表语句
tableEnv.executeSql(dropSql2);
tableEnv.executeSql(kafkaTableDDL2);
//简单查询转换,将结果输出到表sensor_2,即输出到Kafka 的sinktest topic
String insertSql = "INSERT INTO sensor_2 " +
"SELECT id , ts , temp FROM sensor WHERe id='sensor_6'";
tableEnv.executeSql(insertSql);
}
3、使用upsert-kafka connector读写Kafka
在某些场景中,比如GROUP BY聚合之后的结果,需要去更新之前的结果值。这个时候,需要将 Kafka 消息记录的 key 当成主键处理,用来确定一条数据是应该作为插入、删除还是更新记录来处理。在Flink1.11中,可以通过 flink-cdc-connectors 项目提供的 changelog-json format来实现该功能。
在Flink1.12版本中, 新增了一个 upsert connector(upsert-kafka),该 connector 扩展自现有的 Kafka connector,工作在 upsert 模式下。新的 upsert-kafka connector 既可以作为 source 使用,也可以作为 sink 使用,并且提供了与现有的 kafka connector 相同的基本功能和持久性保证,因为两者之间复用了大部分代码。
以下示例代码展示了如何利用upsert-kafka connector以upsert模式向kafka中生产数据,
注意:需要添加flink-connector-kafka相关依赖:
org.apache.flink flink-connector-kafka_2.12 ${flink.version}
示例代码:
public static void main(String[] args) throws Exception {
//1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.创建表执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//3.连接Kafka,消费test topic的数据
String inputTable = "sensor";
String dropSql = "DROP TABLE IF EXISTS " + inputTable;
String kafkaTableDDL
= "CREATE TABLE " + inputTable + " (n" +
" id String,n" +
" ts BIGINT,n" +
" temp DOUBLEn" +
") WITH (n" +
" 'connector.type' = 'kafka',n" +
" 'connector.version' = 'universal',n" +
" 'connector.topic' = 'test',n" +
" 'connector.properties.group.id' = 'main-group',n" +
" 'connector.properties.bootstrap.servers' = 'localhost:9092',n" +
" 'format.type' = 'csv',n" +
" 'update-mode' = 'append'n" +
")";
//执行输入端的建表语句
tableEnv.executeSql(dropSql);
tableEnv.executeSql(kafkaTableDDL);
//6.建立Kafka连接,输出到不同的topic下
String outputTable = "sensor_2";
String dropSql2 = "DROP TABLE IF EXISTS " + outputTable;
String kafkaTableDDL2
= "CREATE TABLE " + outputTable + " (n" +
" id String,n" +
" cnt BIGINT,n" +
" avgTemp DOUBLE,n" +
" PRIMARY KEY (id) NOT ENFORCED n" +
") WITH (n" +
" 'connector' = 'upsert-kafka',n" +
" 'topic' = 'sink_test',n" +
" 'properties.bootstrap.servers' = 'localhost:9092',n" +
" 'key.format' = 'json',n" +
" 'key.json.ignore-parse-errors' = 'true',n" +
" 'value.format' = 'json',n" +
" 'value.json.fail-on-missing-field' = 'false'n" +
")";
//执行输出端的建表语句
tableEnv.executeSql(dropSql2);
tableEnv.executeSql(kafkaTableDDL2);
//简单聚合查询,以upsert模式将结果输出到表sensor_2,即输出到Kafka 的sink_test topic
String insertSql = "INSERT INTO sensor_2 " +
"SELECT id , count(id) as cnt , avg(temp) as avgTemp FROM sensor GROUP BY id";
tableEnv.executeSql(insertSql);
//查询表sensor_2
TableResult result = tableEnv.executeSql("select * from sensor_2");
result.print();
}
启动程序,并向test topic中生产数据:
>sensor_1,1547718199,35.8 >sensor_1,1547718199,34.2
控制台输出结果如下:
log4j:WARN No appenders could be found for logger (org.apache.flink.table.module.ModuleManager). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. +----+--------------------------------+----------------------+--------------------------------+ | op | id | cnt | avgTemp | +----+--------------------------------+----------------------+--------------------------------+ | +I | sensor_1 | 1 | 35.8 | | -U | sensor_1 | 1 | 35.8 | | +U | sensor_1 | 2 | 35.0 |
其中op列展示了数据是新增还是更新的结果, +I 表示数据新增;-U 表示数据删除;+U 表示数据更新;当生产第一条数据时候,id为sensor_1的数据count计数为1,平均温度为35.8,生产第二条数据以后,原来第一条数据删除,count更新为2,平均温度更新为35.0。
upsert-kafka connector的一些配置项如下:
topic:必选。用于读取和写入的 Kafka topic 名称。
properties.bootstrap.servers:必选。以逗号分隔的 Kafka brokers 列表。
key.format:必选。用于对 Kafka 消息中 key 部分序列化和反序列化的格式。key 字段由 PRIMARY KEY 语法指定。支持的格式包括 ‘csv’、‘json’、‘avro’。
value.format:必选。用于对 Kafka 消息中 value 部分序列化和反序列化的格式。支持的格式包括 ‘csv’、‘json’、‘avro’。
properties.*: 可选。该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。Flink 会自动移除 选项名中的 “properties.” 前缀,并将转换后的键名以及值传入 KafkaClient。例如,你可以通过 ‘properties.allow.auto.create.topics’ = ‘false’ 来禁止自动创建 topic。但是,某些选项,例如’key.deserializer’ 和 ‘value.deserializer’ 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。
value.fields-include: 可选,默认为ALL。控制key字段是否出现在 value 中。当取ALL时,表示消息的 value 部分将包含 schema 中所有的字段,包括定义为主键的字段。当取EXCEPT_KEY时,表示记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。
key.fields-prefix:可选。为了避免与value字段命名冲突,为key字段添加一个自定义前缀。默认前缀为空。一旦指定了key字段的前缀,必须在DDL中指明前缀的名称,但是在构建key的序列化数据类型时,将移除该前缀。见下面的示例。在需要注意的是:使用该配置属性,value.fields-include的值必须为EXCEPT_KEY。
三、sink端输出到Elasticsearch输出到es支持upsert模式,当有数据需要更新时,根据es 的 _id进行更新,需要添加如下依赖:
org.apache.flink
flink-connector-elasticsearch6_2.11
1.12.1
如下示例代码使用Table API消费Kafka指定topic的数据,进行简单聚合查询后,以upsert模式输出到Elasticsearch:
public static void main(String[] args) throws Exception {
// 1、创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2、设置并行度
env.setParallelism(1);
// 3、创建表执行环境
EnvironmentSettings oldStreamSettings = EnvironmentSettings.newInstance()
.useOldPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, oldStreamSettings);
//4.连接Kafka,消费数据
tableEnv.connect(new Kafka()
.version("universal")
.topic("sensor")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
).withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE())
).createTemporaryTable("inputTable");
//5.聚合统计
Table inputTable = tableEnv.from("inputTable");
inputTable.printSchema();
//新版Table API
Table aggTable = inputTable.groupBy($("id"))
.select($("id").count().as("count"), $("temp").avg().as("avgTemp"));
//旧版Table API
// Table aggTable = inputTable.groupBy("id").select("id ,id.count as count, temp.avg as avgTemp");
//6.连接es,聚合统计结果输出到es
tableEnv.connect(new Elasticsearch()
.version("6") //es版本
.host("127.0.0.1", 9200, "http") //连接地址
.index("sensor") //索引名
.documentType("temp") //文档类型
.bulkFlushInterval(2000) //批量插入到es的时间间隔
)
.inUpsertMode()
.withFormat(new Json())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("count", DataTypes.BIGINT())
.field("avgTemp", DataTypes.DOUBLE()))
.createTemporaryTable("esOutputTable");
//聚合结果输出到esOutputTable
aggTable.insertInto("esOutputTable");
tableEnv.execute("outPut2ES");
}
四、sink端输出到MySQL
引入 flink-connector-jdbc 依赖:
org.apache.flink flink-connector-jdbc_${scala.binary.version} ${flink.version}
如下代码source端和sink端都使用SQL操作,消费kafka指定topic数据,并进行简单聚合查询,以upsert模式将聚合统计结果输出到MySQL:
public static void main(String[] args) throws Exception {
//1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.创建表执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//3.连接Kafka,读取数据
String kafkaTableDDL
= "CREATE TABLE inputTable (n" +
" id String,n" +
" ts BIGINT,n" +
" temp DOUBLEn" +
") WITH (n" +
" 'connector.type' = 'kafka',n" +
" 'connector.version' = 'universal',n" +
" 'connector.topic' = 'sensor',n" +
" 'connector.properties.bootstrap.servers' = 'localhost:9092',n" +
" 'format.type' = 'csv',n" +
" 'update-mode' = 'append'n" +
")";
//执行输入端的建表语句
tableEnv.executeSql(kafkaTableDDL);
//4.sink端连接mysql,将数据进行输出
String sinkDDL =
"create table sensor_count (" +
" id varchar(20) not null, " +
" cnt bigint not null, " +
" avg_temp double not null " +
") with (" +
" 'connector.type' = 'jdbc', " +
" 'connector.url' = 'jdbc:mysql://localhost:3306/test', " +
" 'connector.table' = 'sensor_count', " +
" 'connector.driver' = 'com.mysql.cj.jdbc.Driver', " +
" 'connector.username' = 'root', " +
" 'connector.password' = 'admin', " +
" 'connector.write.flush.interval' = '2s' )";
//执行ddl创建表
tableEnv.executeSql(sinkDDL);
//5.聚合数据插入jdbc表
String insertSql = "INSERT INTO sensor_count " +
"SELECT id, count(id) as cnt, avg(temp) as avgTemp FROM inputTable GROUP BY id";
tableEnv.executeSql(insertSql);
}
Kafka生产端console输入以下数据:
>sensor_1,1547718199,35.8 >sensor_1,1547718199,34.2 >sensor_6,1547718212,37.1 >sensor_6,1547718212,32.9
MySQL的 sensor_count 表中数据发生更新:
本节介绍了使用 Flink Table API 和 SQL连接常见的外部系统进行操作,并放出了使用Table API或者SQL 操作Kafka、ES、MySQL的demo,自己在实践学习中也踩了不少坑。总之一句话,实践出真知。光是理论的学习是没用的,只有实实际际敲了代码,代入理论去思考,才会有一定感悟,而每一次报错,实际上就是经验的积累。博文中有错误或不足的地方,还清兄弟们指正。



