栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink1.11(flink1.12.0)

flink1.11(flink1.12.0)

本文 Flink 版本为 1.14.3。

本文主要讲解如何将 Flink 和 Springboot 两大框架融合在一起,

可以对接 SpringCloud 生态系统,比如分布式配置中心、服务注册和发现、负载均衡等;可以通过 Restful 接口的形式提交 Flink 任务。 本地工程构建

软件环境:

Flink 1.14.3Springboot 2.0.3.RELEASEJDK 11



    
        awesome-flink
        org.test
        1.0-SNAPSHOT
    
    4.0.0

    flink-springboot

    
        11
        11
        2.0.3.RELEASE
        1.14.3
    

    
        
            org.apache.flink
            flink-java
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-java_2.11
            ${flink.version}
        
        
            org.apache.flink
            flink-clients_2.11
            ${flink.version}
        
        
            org.apache.flink
            flink-runtime-web_2.11
            ${flink.version}
        
        
            org.springframework.boot
            spring-boot-starter
            ${spring.boot.version}
        
        
            org.springframework.boot
            spring-boot-starter-web
            ${spring.boot.version}
        
    

以 Socket WindowWordCount 程序为例:

// 添加 @Component 注解
@Component
public class SocketWindowWordCount {

    public void wordCount(String hostname, int port, long windowSize) throws Exception {
        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        DataStream text = env.socketTextStream(hostname, port, "n");

        // parse the data, group it, window it, and aggregate the counts
        DataStream windowCounts =
                text.flatMap(
                        (FlatMapFunction)
                                (value, out) -> {
                                    for (String word : value.split("\s")) {
                                        out.collect(new WordWithCount(word, 1L));
                                    }
                                },
                        Types.POJO(WordWithCount.class))
                        .keyBy(value -> value.word)
                        .window(TumblingProcessingTimeWindows.of(Time.seconds(windowSize)))
                        .reduce((a, b) -> new WordWithCount(a.word, a.count + b.count))
                        .returns(WordWithCount.class);

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }
}

// 结果类 POJO

public class WordWithCount {

    public String word;
    public long count;

    @SuppressWarnings("unused")
    public WordWithCount() {}

    public WordWithCount(String word, long count) {
        this.word = word;
        this.count = count;
    }

    public String getWord() {
        return word;
    }

    public void setWord(String word) {
        this.word = word;
    }

    public long getCount() {
        return count;
    }

    public void setCount(long count) {
        this.count = count;
    }

    @Override
    public String toString() {
        return word + " : " + count;
    }
}

因为 SocketWindowWordCount 是1个流处理常驻任务,可以通过实现 Springboot 提供的 ApplicationRunner 接口,使得上述任务可以在 Springboot Application 启动时被构建并提交到 Flink 集群。

@Component
public class Execution implements ApplicationRunner {

    @Value("${source.socket.host}")
    private String host;

    @Value("${source.socket.port}")
    private int port;

    @Value("${flink.window.size}")
    private long windowSize;

    @Autowired
    private SocketWindowWordCount wordCount;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        // 启动任务
        wordCount.wordCount(host, port, windowSize);
    }
}

接下来,编写启动类:

@SpringBootApplication
public class FlinkApplication {
    public static void main(String[] args) {
        SpringApplication.run(FlinkApplication.class, args);
    }
}

对应的配置文件 application.yml 为:

server:
  port: 7605
  connection-timeout: 300s
  servlet:
    context-path: /awesome/flink/springboot
spring:
  application:
    name: flink-springboot

source:
  socket:
    host: localhost
    port: 9000
flink:
  window:
    size: 10

接着在 Windows 上安装 netcat:

下载地址;解压后,将 nc.exe 复制到 C:WindowsSystem32 目录下;

打开 cmd,执行下述命令用于发送消息:

nc -l -p 9000

紧接着执行 FlinkApplication 的 main 方法启动 Springboot 工程:

一定要执行完 nc -l -p 9000 命令之后再启动 Springboot 工程,因为 Springboot 工程在启动的时候会先连接端口号为 9000 的 Socket


通过启动日志,可以看到 Tomcat 启动后,紧接着会启动 Flink 内嵌的 Mini Cluster。同时, Mini Cluster 也有相应的前端 Web 页面,可以在启动日志里获取其 Web URL。

在浏览器里打开上述页面:

可以发现,SocketWindowWordCount 任务已成功提交到 Flink 集群。

紧接着,在上面的 nc 命令下连续输入:

Hello
Hello
World

可以看到控制台输出:

Hello : 2
World : 1
提交到远程集群

接下来,尝试将应用提交到远程的 Flink 集群。

如果你不熟悉 Flink 集群的搭建,可以参照我之前写的1篇文章:

Flink入门系列–安装部署及任务提交(1.14.3版本)

这里,我们搭建1个单节点的 Flink Session 集群:

本地工程提交到远程集群,需要将 Springboot 相关依赖放置到 Flink 集群的 classpath 下,或者通过 maven 的 shade 插件将工程代码和相关依赖打成1个 fat jar。本文选用第2种方法,在 pom.xml 里增加 build 打包配置:


	
		
			org.apache.maven.plugins
			maven-shade-plugin
			
				
					org.springframework.boot
					spring-boot-maven-plugin
					2.0.3.RELEASE
				
			
			
				socket-word-count-jar-with-dependencies
				true
				true
				
					
						*:*
						
							meta-INF/*.SF
							meta-INF/*.DSA
							meta-INF/*.RSA
						
					
				
				
					
						com.google.code.findbugs:jsr305
						org.slf4j:*
						log4j:*
					
				
			
			
				
					package
					
						shade
					
					
						
							
								meta-INF/spring.factories
							
							
								meta-INF/spring.handlers
							
							
								meta-INF/spring.schemas
							
							
								meta-INF/spring.tooling
							
							
							
								
									org.test.awesome.flink.springboot.FlinkApplication
								
							
						
					
				
			
		
	

执行 install,将 fat jar 提交到 Flink 集群所在的服务器上。

启动前,在服务器上打开 nc,Linux 系统的 nc 命令和 Windows 略有不同。

nc -lk 9000

然后执行下述命令提交任务:

# /usr/lib/test/flink-1.14.3/ 为 Flink 的安装目录
# /usr/lib/test/apps/socket-word-count-jar-with-dependencies.jar 为上传的 fat jar
/usr/lib/test/flink-1.14.3/bin/flink run /usr/lib/test/apps/socket-word-count-jar-with-dependencies.jar

结果报如下错误:

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: LoggerFactory is not a Logback LoggerContext but Logback is on the classpath. Either remove Logback or the competing implementation (class org.apache.logging.slf4j.Log4jLoggerFactory loaded from file:/usr/lib/ruanshubin/flink-1.14.3/lib/log4j-slf4j-impl-2.17.1.jar). If you are using WebLogic you will need to add 'org.slf4j' to prefer-application-packages in WEB-INF/weblogic.xml: org.apache.logging.slf4j.Log4jLoggerFactory
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.IllegalArgumentException: LoggerFactory is not a Logback LoggerContext but Logback is on the classpath. Either remove Logback or the competing implementation (class org.apache.logging.slf4j.Log4jLoggerFactory loaded from file:/usr/lib/ruanshubin/flink-1.14.3/lib/log4j-slf4j-impl-2.17.1.jar). If you are using WebLogic you will need to add 'org.slf4j' to prefer-application-packages in WEB-INF/weblogic.xml: org.apache.logging.slf4j.Log4jLoggerFactory
        at org.springframework.util.Assert.instanceCheckFailed(Assert.java:637)
        at org.springframework.util.Assert.isInstanceOf(Assert.java:537)
        at org.springframework.boot.logging.logback.LogbackLoggingSystem.getLoggerContext(LogbackLoggingSystem.java:274)
        at org.springframework.boot.logging.logback.LogbackLoggingSystem.beforeInitialize(LogbackLoggingSystem.java:99)
        at org.springframework.boot.context.logging.LoggingApplicationListener.onApplicationStartingEvent(LoggingApplicationListener.java:191)
        at org.springframework.boot.context.logging.LoggingApplicationListener.onApplicationEvent(LoggingApplicationListener.java:170)
        at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172)
        at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165)
        at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139)
        at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:127)
        at org.springframework.boot.context.event.EventPublishingRunListener.starting(EventPublishingRunListener.java:68)
        at org.springframework.boot.SpringApplicationRunListeners.starting(SpringApplicationRunListeners.java:48)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:313)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1255)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1243)
        at com.ruanshubin.awesome.flink.springboot.FlinkApplication.main(FlinkApplication.java:14)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        ... 8 more

显然是 Springboot 自带的 Logback 和 Flink 的默认日志实现冲突了,在 pom.xml 解一下依赖冲突重新上传:


	org.springframework.boot
	spring-boot-starter
	${spring.boot.version}
	
		
			org.springframework.boot
			spring-boot-starter-logging
		
	


此时,任务成功提交。

在 nc 下同样输入下述词语:

Hello
Hello
World


可以看到,Flink 任务也接收并处理了3个词语。

本文到此结束,感谢阅读!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/771385.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号