Flink程序的基本构建块是流和转换(请注意,Flink的DataSet API中使用的DataSet也是内部流 )。
1、实时需求 每隔5秒,统计最近10秒的窗口数据 2、开发环境部署 1. 官网建议使用 IDEA , IDEA 默认集成了 Java 和 Maven ,使用起来方便 2. 本次使用了 Flink-1.12 版本 3 、实时代码开发public class StreamWordCount {
public static void main(String[] args) throws Exception {
//创建执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//DataSource操作
DataStreamSource sourceStream = env.socketTextStream("192.168.153.10", 6666);
//通过匿名内部类的方式实现flatMap算子
final SingleOutputStreamOperator> flatMapStream = sourceStream.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String line, Collector> collector) throws Exception {
final String[] words = line.split(" ");
for (String word : words) {
collector.collect(new Tuple2<>(word, 1));
}
}
});
//keyBy分组操作
final KeyedStream, String> keyedStream = flatMapStream.keyBy(value -> value.f0);
//每隔5秒,统计最近10秒的窗口数据
WindowedStream, String, TimeWindow> window = keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));
//sum求和操作
final SingleOutputStreamOperator> result = window.sum(1);
//输出结果
result.print();
//执行程序
env.execute("StreamWordCount");
}
}
实时代码开发需连接集群,具体集群搭建方式参考Flink集群部署
4、离线需求 对文件中的单词内容进行统计计数 5、离线代码开发class BatchWordCount {
public static void main(String[] args) throws Exception {
//创建执行环境
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//创建数据源
final DataSource source = env.fromElements("Flink flink sqoop hadoop", "flume hadoop MapReduce flink");
//Transformation操作
final FlatMapOperator> flatMap = source.flatMap(new FlatMapClass());
final AggregateOperator> result = flatMap.groupBy(0).sum(1);
//输出结果
result.print();
}
private static class FlatMapClass implements FlatMapFunction> {
@Override
public void flatMap(String line, Collector> collector) throws Exception {
final String[] words = line.split(" ");
for (String word : words) {
collector.collect(new Tuple2(word, 1));
}
}
}
}



