- 一、版本和开发工具
- 二、Flink 程序开发步骤
- 三、开发示例
- 四、运行
flink处理分流处理(streaming)和批处理(batch)。本示例来展示flink流处理开发的一般步骤和方法。 一、版本和开发工具
flink版本:1.13.3
开发工具:Intellij IDEA
Java版本:1.8.0_261
1、获得一个执行环境
2、加载或者创建初始化数据
3、指定操作数据的Transaction算子
4、指定计算好的数据的存放位置
5、调用execute()触发程序执行
1、创建项目,填写项目名称,存放路径、包名、版本号等
2、添加依赖,在本机注释掉scope,当发布到flink集群时,需要添加scope,因为集群已经存在这些依赖
8 8 UTF-8 1.13.3 2.12 org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_${scala.version} ${flink.version}
3、开发需求:通过Socket模拟产生单词,实现每隔1s对最近2s内的数据进行汇总计算
4、java程序结构如下,代码下载地址:Flink开发示例源代码
5、核心代码如下
public static void main(String[] args) throws Exception {
// 获取socket端口号
int port;
try {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
} catch (Exception e) {
System.out.println("没有设置端口号。使用默认端口号9109");
port = 9109;
}
// 获取Flink 运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 获取数据
String hostname = "node01";
String delimiter = "n";
DataStreamSource text = env.socketTextStream(hostname, port, delimiter);
// timeWindow 已过时,建议使用window
DataStream wordCounts = text.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String s, Collector collector) throws Exception {
String[] splits = s.split("\s");
for (String word : splits) {
collector.collect(new WordCount(word, 1L));
}
}
}).keyBy(new StreamWordCoutKeySelector())
.window(SlidingProcessingTimeWindows.of(Time.seconds(2),Time.seconds(1)))
.sum("count");
// 把数据打印到控制台并且设置并行度
wordCounts.print().setParallelism(1);
// 触发执行程序
env.execute("Socket window count");
}
四、运行
1、打开终端,输入命令:nc -l 9109
2、运行SocketWindowWorldCount,会出现异常,直接退出。如下图所示,查看日志有两处错误:
1)是由于依赖中缺少slf4j的依赖,添加依赖slf4j-simple
2)缺少flink执行环境的依赖,添加flink客户端依赖
org.apache.flink flink-clients_${scala.version} ${flink.version} org.slf4j slf4j-simple 1.7.32
3、再次运行,在终端输入单词,可以看到统计的单词个数



