- 一.转换算子
- 1.1 map
- 1.2 flatMap
- 1.3 Filter
- 二.代码
- 参考:
从如下图解可以看到,map是一对一的操作,对dataStream中的计算,一对一输出
DataStream1.2 flatMapmapStram = dataStream.map(new MapFunction () { public Integer map(String value) throws Exception { return value.length(); } });
flatMap是一个输入,多个输出,例如通过"," 分隔符将
DataStream1.3 FilterflatMapStream = dataStream.flatMap(new FlatMapFunction () { public void flatMap(String value, Collector out) throws Exception { String[] fields = value.split(","); for( String field: fields ) out.collect(field); } });
Filter可以理解为SQL语句中的where子句,过滤数据用的
DataStream二.代码filterStream = dataStream.filter(new FilterFunction () { public boolean filter(String value) throws Exception { return value == 1; } });
数据准备:
sensor.txt
sensor_1 1547718199 35.8
sensor_6 1547718201, 15.4
sensor_7 1547718202, 6.7
sensor_10 1547718205 38.1
代码:
package org.flink.transform;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class TransformTest1_base {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从文件读取数据
DataStream inputStream = env.readTextFile("C:\Users\Administrator\IdeaProjects\FlinkStudy\src\main\resources\sensor.txt");
// 1. map,把String转换成长度输出
DataStream mapStream = inputStream.map(new MapFunction() {
@Override
public Integer map(String value) throws Exception {
return value.length();
}
});
// 2. flatmap,按逗号分字段
DataStream flatMapStream = inputStream.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out) throws Exception {
String[] fields = value.split(",");
for( String field: fields )
out.collect(field);
}
});
// 3. filter, 筛选sensor_1开头的id对应的数据
DataStream filterStream = inputStream.filter(new FilterFunction() {
@Override
public boolean filter(String value) throws Exception {
return value.startsWith("sensor_1");
}
});
// 打印输出
mapStream.print("map");
flatMapStream.print("flatMap");
filterStream.print("filter");
env.execute();
}
}
运行结果:
Flink是基于数据流的处理,所以是来一条处理一条,由于并行度是1所以3个算子计算一个就输出一个。
这里,我把并行度改为2,再来看输出,就可以看到输出不一样了。
- https://www.bilibili.com/video/BV1qy4y1q728
- https://ashiamd.github.io/docsify-notes/#/study/BigData/Flink/%E5%B0%9A%E7%A1%85%E8%B0%B7Flink%E5%85%A5%E9%97%A8%E5%88%B0%E5%AE%9E%E6%88%98-%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0?id=_521-%e4%bb%8e%e9%9b%86%e5%90%88%e8%af%bb%e5%8f%96%e6%95%b0%e6%8d%ae



