1、环境
- JDK 1.8
- Maven
- Intellij Idea
- Flink1.9.2
2、创建maven项目
导入依赖
注意:这里
Pom.xml中添加
8 8 1.9.2 2.12 org.apache.flink flink-java${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version}${flink.version} org.apache.flink flink-clients_${scala.binary.version}${flink.version}
使用socket接收数据,创建StreamWordCount_Socket.class,使用socket无界流处理
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
//import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamWordCount_Socket {
//无界流--socket
public static void main(String[] args) throws Exception {
// 创建流处理执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度,默认值 = 当前计算机的CPU逻辑核数(设置成1即单线程处理)
// env.setMaxParallelism(32);
// 从文件中读取数据 有界流处理
//String inputPath = "D:\IdeaProjects\flink-study\src\main\resources\hello.txt";
DataStream inputDataStream = env.socketTextStream("localhost",9000);
// 基于数据流进行转换计算 流处理中数据分组keyBy(相同数据分组) 批处理中分组groupby
DataStream> resultStream = inputDataStream.flatMap(new MyFlatMapper())
.keyBy(item->item.f0)
.sum(1);
resultStream.print();
// 执行任务
env.execute();
}
public static class MyFlatMapper implements FlatMapFunction> {
@Override
public void flatMap(String s, Collector> out) throws Exception {
// 按空格分词
String[] words = s.split(" ");
// 遍历所有word,包成二元组输出
for (String str : words) {
out.collect(new Tuple2<>(str, 1));
}
}
}
}
端口号与Ubuntu端口号一致。
3、安装flink
官网:https://flink.apache.org/downloads.html
安装了1.9.2版本,1.9以上版本bin中无.Bat文件,我的笔记本上无法运行。
运行flink
浏览器进入http://localhost:8081/#/overview。
4、安装Ubuntu 使用nc命令发送socket
本机安装netcat nc命令执行不了,在Windows store中安装Ubuntu子系统,Linux系统中自带nc命令,发送socket。
5、运行效果:
打开Ubuntu输入nc -l -k 9000
9000是端口号,在StreamWordCount_Socket.class中对应输入端口号,idea中运行如下:
6、提交到flink仪表盘:
打包
启动flink
进入http://localhost:8081/#/overview
找到项目target中的jar包:
输入入口和host、port,点击提交
提交成功
如果在提交时报错可以打开flink log文件查看报错信息,
一开始提交显示服务器错误,查看log文件发现报错:
NoClassDefFoundError: org/apache/flink/client/program/StreamContextEnvironment
原因是pom文件中导入依赖版本和flink版本不同,在1.9.2中没有
StreamContextEnvironment.class
原本导入的是1.12.1,重新导入1.9.2的依赖,在1.9.2中使用的是StreamExecutionEnvironment.class
所以要注意不同版本 依赖中可能有不同的类。



