第一步:环境准备一个简单的flink打包运行的demo
首先你要有个flink运行环境,我这里是使用虚拟机搭建的单机模式,启动flink,在8081端口就可以看见flink UI,在这里就可以进行flink作业的管理:
我使用flink SQL编写的代码,代码很简单,使用SQL读入kafka中一个topic的消息,写入另一个kafka topic中:
package com.ms.flinksql;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class Kafka2Kafka {
public static void main(String[] args) {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String ddlSource = "CREATE TABLE user_behavior (n" +
" user_id BIGINT,n" +
" item_id BIGINT,n" +
" category_id BIGINT,n" +
" behavior STRING,n" +
" ts TIMESTAMP(3)n" +
") WITH (n" +
" 'connector.type' = 'kafka',n" +
" 'connector.version' = 'universal',n" +
" 'connector.topic' = 'user_behavior',n" +
" 'connector.startup-mode' = 'latest-offset',n" +
" 'connector.properties.zookeeper.connect' = '192.168.126.128:2181',n" +
" 'connector.properties.bootstrap.servers' = '192.168.126.128:9092',n" +
" 'format.type' = 'json'n" +
")";
String ddlSink = "CREATE TABLE user_behavior_sink (n" +
" user_id BIGINT,n" +
" item_id BIGINTn" +
") WITH (n" +
" 'connector.type' = 'kafka',n" +
" 'connector.version' = 'universal',n" +
" 'connector.topic' = 'user_behavior_sink',n" +
" 'connector.properties.zookeeper.connect' = '192.168.126.128:2181',n" +
" 'connector.properties.bootstrap.servers' = '192.168.126.128:9092',n" +
" 'format.type' = 'json',n" +
" 'update-mode' = 'append'n" +
")";
//提取读取到的数据,然后只要两个字段,重新发送到 Kafka 新 topic
String sql = "insert into user_behavior_sink select user_id, item_id from user_behavior";
tableEnv.executeSql(ddlSource);
tableEnv.executeSql(ddlSink);
tableEnv.executeSql(sql);
}
}
maven的依赖如下:这里一些依赖是scope是provided,是啥意思捏?其实就是scope标注为provided在编译阶段会起作用,这样你的代码就不会出现找不到依赖的情况,可以通过编译从而打包,但是这些依赖不会真的打到jar里面。为啥要这样做呢,因为flink-table-api-java-bridge,flink-streaming-scala...这些包实际上flink环境是自带的,你根部不需要在项目的jar里打进去,这样jar包整的这么大还不讨好,何必呢?
第三步:打包org.apache.flink flink-table-api-java-bridge_${scala.version} ${flink.version} provided org.apache.flink flink-streaming-scala_${scala.version} ${flink.version} provided org.apache.flink flink-table-common ${flink.version} provided org.apache.flink flink-table-planner_${scala.version} ${flink.version} provided org.apache.flink flink-table-planner-blink_${scala.version} ${flink.version} provided org.apache.flink flink-connector-kafka_${scala.version} ${flink.version} org.apache.flink flink-json ${flink.version} org.apache.flink flink-clients_${scala.version} ${flink.version} provided
为了将需要的依赖都打进jar包里面,在maven的pom.xml里面指定打包工具:
maven-compiler-plugin 3.8.1 1.8 1.8 maven-assembly-plugin jar-with-dependencies com.ms.flinksql.Kafka2Kafka make-assembly package single
使用idea打包
之后我们就可以在任务界面看到我们的任务啦:
打开kafka,在source topic写入数据,sink topic中就会实时输出:



