本文 Flink 版本为 1.14.3。
本文主要讲解如何将 Flink 和 Springboot 两大框架融合在一起,
可以对接 SpringCloud 生态系统,比如分布式配置中心、服务注册和发现、负载均衡等;可以通过 Restful 接口的形式提交 Flink 任务。 本地工程构建
软件环境:
Flink 1.14.3Springboot 2.0.3.RELEASEJDK 11
awesome-flink org.test 1.0-SNAPSHOT 4.0.0 flink-springboot11 11 2.0.3.RELEASE 1.14.3 org.apache.flink flink-java${flink.version} org.apache.flink flink-streaming-java_2.11${flink.version} org.apache.flink flink-clients_2.11${flink.version} org.apache.flink flink-runtime-web_2.11${flink.version} org.springframework.boot spring-boot-starter${spring.boot.version} org.springframework.boot spring-boot-starter-web${spring.boot.version}
以 Socket WindowWordCount 程序为例:
// 添加 @Component 注解
@Component
public class SocketWindowWordCount {
public void wordCount(String hostname, int port, long windowSize) throws Exception {
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream text = env.socketTextStream(hostname, port, "n");
// parse the data, group it, window it, and aggregate the counts
DataStream windowCounts =
text.flatMap(
(FlatMapFunction)
(value, out) -> {
for (String word : value.split("\s")) {
out.collect(new WordWithCount(word, 1L));
}
},
Types.POJO(WordWithCount.class))
.keyBy(value -> value.word)
.window(TumblingProcessingTimeWindows.of(Time.seconds(windowSize)))
.reduce((a, b) -> new WordWithCount(a.word, a.count + b.count))
.returns(WordWithCount.class);
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
}
// 结果类 POJO
public class WordWithCount {
public String word;
public long count;
@SuppressWarnings("unused")
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
因为 SocketWindowWordCount 是1个流处理常驻任务,可以通过实现 Springboot 提供的 ApplicationRunner 接口,使得上述任务可以在 Springboot Application 启动时被构建并提交到 Flink 集群。
@Component
public class Execution implements ApplicationRunner {
@Value("${source.socket.host}")
private String host;
@Value("${source.socket.port}")
private int port;
@Value("${flink.window.size}")
private long windowSize;
@Autowired
private SocketWindowWordCount wordCount;
@Override
public void run(ApplicationArguments args) throws Exception {
// 启动任务
wordCount.wordCount(host, port, windowSize);
}
}
接下来,编写启动类:
@SpringBootApplication
public class FlinkApplication {
public static void main(String[] args) {
SpringApplication.run(FlinkApplication.class, args);
}
}
对应的配置文件 application.yml 为:
server:
port: 7605
connection-timeout: 300s
servlet:
context-path: /awesome/flink/springboot
spring:
application:
name: flink-springboot
source:
socket:
host: localhost
port: 9000
flink:
window:
size: 10
接着在 Windows 上安装 netcat:
下载地址;解压后,将 nc.exe 复制到 C:WindowsSystem32 目录下;
打开 cmd,执行下述命令用于发送消息:
nc -l -p 9000
紧接着执行 FlinkApplication 的 main 方法启动 Springboot 工程:
一定要执行完 nc -l -p 9000 命令之后再启动 Springboot 工程,因为 Springboot 工程在启动的时候会先连接端口号为 9000 的 Socket
通过启动日志,可以看到 Tomcat 启动后,紧接着会启动 Flink 内嵌的 Mini Cluster。同时, Mini Cluster 也有相应的前端 Web 页面,可以在启动日志里获取其 Web URL。
在浏览器里打开上述页面:
可以发现,SocketWindowWordCount 任务已成功提交到 Flink 集群。
紧接着,在上面的 nc 命令下连续输入:
Hello Hello World
可以看到控制台输出:
Hello : 2 World : 1提交到远程集群
接下来,尝试将应用提交到远程的 Flink 集群。
如果你不熟悉 Flink 集群的搭建,可以参照我之前写的1篇文章:
Flink入门系列–安装部署及任务提交(1.14.3版本)
这里,我们搭建1个单节点的 Flink Session 集群:
本地工程提交到远程集群,需要将 Springboot 相关依赖放置到 Flink 集群的 classpath 下,或者通过 maven 的 shade 插件将工程代码和相关依赖打成1个 fat jar。本文选用第2种方法,在 pom.xml 里增加 build 打包配置:
org.apache.maven.plugins maven-shade-pluginorg.springframework.boot spring-boot-maven-plugin2.0.3.RELEASE socket-word-count-jar-with-dependencies true true *:* meta-INF/*.SF meta-INF/*.DSA meta-INF/*.RSA com.google.code.findbugs:jsr305 org.slf4j:* log4j:* package shade meta-INF/spring.factories meta-INF/spring.handlers meta-INF/spring.schemas meta-INF/spring.tooling org.test.awesome.flink.springboot.FlinkApplication
执行 install,将 fat jar 提交到 Flink 集群所在的服务器上。
启动前,在服务器上打开 nc,Linux 系统的 nc 命令和 Windows 略有不同。
nc -lk 9000
然后执行下述命令提交任务:
# /usr/lib/test/flink-1.14.3/ 为 Flink 的安装目录 # /usr/lib/test/apps/socket-word-count-jar-with-dependencies.jar 为上传的 fat jar /usr/lib/test/flink-1.14.3/bin/flink run /usr/lib/test/apps/socket-word-count-jar-with-dependencies.jar
结果报如下错误:
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: LoggerFactory is not a Logback LoggerContext but Logback is on the classpath. Either remove Logback or the competing implementation (class org.apache.logging.slf4j.Log4jLoggerFactory loaded from file:/usr/lib/ruanshubin/flink-1.14.3/lib/log4j-slf4j-impl-2.17.1.jar). If you are using WebLogic you will need to add 'org.slf4j' to prefer-application-packages in WEB-INF/weblogic.xml: org.apache.logging.slf4j.Log4jLoggerFactory
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.IllegalArgumentException: LoggerFactory is not a Logback LoggerContext but Logback is on the classpath. Either remove Logback or the competing implementation (class org.apache.logging.slf4j.Log4jLoggerFactory loaded from file:/usr/lib/ruanshubin/flink-1.14.3/lib/log4j-slf4j-impl-2.17.1.jar). If you are using WebLogic you will need to add 'org.slf4j' to prefer-application-packages in WEB-INF/weblogic.xml: org.apache.logging.slf4j.Log4jLoggerFactory
at org.springframework.util.Assert.instanceCheckFailed(Assert.java:637)
at org.springframework.util.Assert.isInstanceOf(Assert.java:537)
at org.springframework.boot.logging.logback.LogbackLoggingSystem.getLoggerContext(LogbackLoggingSystem.java:274)
at org.springframework.boot.logging.logback.LogbackLoggingSystem.beforeInitialize(LogbackLoggingSystem.java:99)
at org.springframework.boot.context.logging.LoggingApplicationListener.onApplicationStartingEvent(LoggingApplicationListener.java:191)
at org.springframework.boot.context.logging.LoggingApplicationListener.onApplicationEvent(LoggingApplicationListener.java:170)
at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172)
at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165)
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139)
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:127)
at org.springframework.boot.context.event.EventPublishingRunListener.starting(EventPublishingRunListener.java:68)
at org.springframework.boot.SpringApplicationRunListeners.starting(SpringApplicationRunListeners.java:48)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:313)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1255)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1243)
at com.ruanshubin.awesome.flink.springboot.FlinkApplication.main(FlinkApplication.java:14)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 8 more
显然是 Springboot 自带的 Logback 和 Flink 的默认日志实现冲突了,在 pom.xml 解一下依赖冲突重新上传:
org.springframework.boot spring-boot-starter${spring.boot.version} org.springframework.boot spring-boot-starter-logging
此时,任务成功提交。
在 nc 下同样输入下述词语:
Hello Hello World
可以看到,Flink 任务也接收并处理了3个词语。
本文到此结束,感谢阅读!



