栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink 算子大全

flink 算子大全

算子大全

摘要1.map2.flatMap3.filter4.keyBy5.reduce6.window 和aggregate聚合函数7.windowAll8.window 的apply8.window reduce

摘要

首先不得不提一点,每一个算子都是有状态的,算子的状态也是flink能够从错误中恢复的基础. 算子的执行状态称为状态后端,状态是可以被程序访问,甚至我们可以自己及写代码访问状态.比如广播就利用了这个特性,首先将流广播出去,然后通过状态句柄去访问广播出去的流.
可以说理解算子状态是学习flink的核心. 状态的存储见我的其他的文章.多说一句, flink运行过程中真正有意义的数据就是状态数据,状态数据就是中间结果. 每个算子operation 计算的中间结果就是状态. 本章只讲解常见的算子operation并不讲解状态,这里之所以说出来是为了提醒读者注意了解flink的状态的意义.

1.map

map是对流中的每个T类型元素做处理之后返回新的类型为R元素,然后将R元素组成的流作为新的流往后流动.
下面是scala版本map函数的定义

Creates a new DataStream by applying the given function to every element of this DataStream.(翻译:通过对传入方法中的每个元素做处理,然后返回一个新的流)
def map[R: TypeInformation](fun: T => R): DataStream[R] {...省略详细代码...}

下面说map函数:看上面函数参数的定义fun: T => R,意思是该函数的参数是一个用户传入函数,该函数的参数类型为流中类型为T的元素,经过处理之后返回一个类型为R的元素
例子:ds..map(x=>{
x+1
})

下面是java版本map函数的源码定义:

public  SingleOutputStreamOperator map(MapFunction mapper) {......}

该函数的参数是一个MapFunction mapper 接口的实现,点开该接口源码如下:
@Public
@FunctionalInterface
public interface MapFunction extends Function, Serializable {

    /**
     * The mapping method. Takes an element from the input data set and transforms it into exactly
     * one element.
     *
     * @param value The input value.
     * @return The transformed value
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the
     *     operation to fail and may trigger recovery.
     */
    O map(T value) throws Exception;
}
java中被@FunctionalInterface注解修饰的接口且该接口只有一个抽象方法,那么表示该接口符合lambda表达式的定义,因此可以简化写成lambda的样式:
下面是例子:
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
                .keyBy(value -> value.f0)
                .map( value -> value.getField(0))
                .print();
2.flatMap

传入一个元素,根据当前传入的单个元素可能会生成一个或者一个以上的元素

java版本
dataStream.flatMap(new FlatMapFunction() {
    @Override
    public void flatMap(String value, Collector out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});

scala版本
dataStream.flatMap { str => str.split(" ") }
3.filter

用自定义的逻辑检测一个元素,如果希望这个元素向下流动就返回true,如果洗碗粉抛弃掉该元素就返回false:

java版本
dataStream.filter(new FilterFunction() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});
scala版本
dataStream.filter { _ != 0 }
4.keyBy

逻辑上将流划分为不相连的分区,具有相同key的流元素都被分配到相同的分区。不同分区的数据会交给不同的task去执行,底层其实使用了hash分区的方式.
既然是根据hash分区,因此如果key的选择是一个对象且这个对象没有实现自己的hashcode方法,那么个的对象是不能作为key的. 另外任何Array也不能作为key
源码定义:

  public  KeyedStream keyBy(KeySelector key) {}

由此可见keyBy方法接收一个KeySelector 的实现类,下面是KeySelector接口的定义

@Public
@FunctionalInterface
public interface KeySelector extends Function, Serializable {
    KEY getKey(IN value) throws Exception;
}

例子:

java版本
  dataStream.keyBy(new KeySelector, String>() {
                    @Override
                    public String getKey(Tuple2 value) throws Exception {
                        return value.getField(0);
                    }
                })
scala 版本
1. ds.keyBy(new KeySelector[(Long,Long),Long] {
    override def getKey(value: (Long, Long)): Long = value._1
  })
scala 最简单的写法如下:
2. ds.keyBy(x=>x._1) 这种写法和上面一样,但是不推荐了,scala版本方法定义明说了推荐方式:  
@deprecated("use [[DataStream.keyBy(KeySelector)]] instead")
 def keyBy(fields: Int*): KeyedStream[T, JavaTuple] = asScalaStream(stream.keyBy(fields: _*))
意思即是推荐scala的第一种写法.


5.reduce

对keyBy处理后的数据流做“滚动”操作。将当前元素与最近的做操作,并发出操作后的新值。新的值与下一个值进行同样的操作,然后发出新的值,依次往后计算,直到最后形成的新的数据流是由发出的新的值组成的. 注意reduce只能用于keyBy 之后的数据流. 对于keyBy数据流,相同的key会交给一个线程处理. 所以如果keyBy数据流有多个key, 那么对于reduce而言会有多个不同的线程去独立处理, 处理的结果是根据key独立的. 下面看scala版本的例子:

object Test{

  def main(args: Array[String]): Unit = {
      val env = StreamExecutionEnvironment.getExecutionEnvironment
      env.fromCollection(List(
        (1L, 3L),
        (1L, 5L),
        (1L,1L),
        (1L,1L),
        (4L, 7L),
        (4L, 3L),
        (1L, 2L)
      )).keyBy(fun = new KeySelector[(Long, Long), Long] {
        override def getKey(value: (Long, Long)): Long = value._1
      }).reduce(new ReduceFunction[(Long, Long)] {
        override def reduce(value1: (Long, Long), value2: (Long, Long)): (Long, Long) = (value1._1,value1._2+value2._2)
      }).setParallelism(1).writeAsText("D:\flink\a.txt")
      // the printed output will be (1,4) and (1,5)

      env.execute("ExampleKeyedState")
    }

}
输出结果如下:
(1,3)
(1,8)
(1,9)
(1,10)
(1,12)
(4,7)
(4,10)

可以看到结果中key为1  key为2是并行存在的两个独立的结果

下面是reduce函数源码请自己看注释:

/**
   * Creates a new [[DataStream]] by reducing the elements of this DataStream
   * using an associative reduce function. An independent aggregate is kept per key.注意: kept per key这三个单词
   */
  def reduce(reducer: ReduceFunction[T]): DataStream[T] = {...}
  下面看看ReduceFunction接口源码:
 @Public
@FunctionalInterface
public interface ReduceFunction extends Function, Serializable {

    /**
     * The core method of ReduceFunction, combining two values into one value of the same type. The
     * reduce function is consecutively applied to all values of a group until only a single value
     * remains.
     *
     * @param value1 The first value to combine.
     * @param value2 The second value to combine.
     * @return The combined value of both input values.
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the
     *     operation to fail and may trigger recovery.
     */
    T reduce(T value1, T value2) throws Exception;
}

6.window 和aggregate聚合函数

window :注意此函数只用于处理keyBy处理后的键值流数据,应用于窗口函数,每个窗口做一次计算,窗口的计算结果是独立的: 换句话说,window函数后面函数执行逻辑是基于key 独立计算的. 也即是窗口在不同的key上独立计算.
aggregate: aggregate函数用于处理当前window的数据,他有三个方法:

    createAccumulator() 用于初始化累加器(用return返回你创建的累加器)ACC add(IN value, ACC accumulator) value是window窗口的下一条数据,accumulator是你在第一个方法创建的累加器, add方法会返回一个新的累加器,格式和第一个方法创建的累加器要保证一样OUT getResult(ACC accumulator); 这个方法在window窗口触发的时候执行,用于从累加器中获取结果.
    关于aggregate请详细看下面的demo:MyAgg
object StreamingJob {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
//    val env = StreamExecutionEnvironment.createRemoteEnvironment("LOCALHOST",8081,"D:\IT\Project\FlinkDemo\target\FlinkDemo-1.0-SNAPSHOT.jar")
    val text = env.socketTextStream("localhost", 9999)

    val counts = text.flatMap { _.toLowerCase.split("\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(_._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .sum(1)

    counts.print()

    env.execute("Window 333 WordCount")
  }
}

为了理解窗口基于key独立计算的逻辑,下面在看一个java版本的代码:

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 测试AggFunction——求各个班级英语成绩平均分,下面是一个基于元素数量计算的窗口,当窗口检测到两个元素到来的时候就会触发计算.CountTrigger.of(2)意思就是当前key对应的窗口
 * 每检测到两个元素就会触发计算
 * 
 *
 */
public class TestAggFunctionOnWindow {
    private static final Logger logger = LoggerFactory.getLogger(TestAggFunctionOnWindow.class);
    public static void main(String[] args) throws Exception {
        // 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 读取数据
        DataStream> input = env.fromElements(ENGLISH);

        // 求各个班级英语成绩平均分
        SingleOutputStreamOperator> ds = input.keyBy(0).window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(2))).aggregate(new MyAgg());
//        ds.print();
        ds.addSink(new PrintSinkFunction<>("这是我的自定义输出:", false));
        env.execute("TestAggFunctionOnWindow");

    }

    public static final Tuple3[] ENGLISH = new Tuple3[] {
            Tuple3.of("一班", "张三", 1L),
            Tuple3.of("一班", "李四", 2L),
            Tuple3.of("一班", "王五", 3L),
            Tuple3.of("二班", "赵六", 4L),
            Tuple3.of("二班", "小七", 5L),
            Tuple3.of("二班", "小八", 6L),
    };

}

class MyAgg implements AggregateFunction, Tuple3, Tuple2> {
    /**
     * 创建累加器保存中间状态
     * Tupel<班级名称,总分数,总人数>
     */
    @Override
    public Tuple3 createAccumulator() {
        return new Tuple3<>("",0L, 0L);
    }

    /**
     * 将元素添加到累加器并返回新的累加器
     *
     * @param value 输入类型
     * @param acc 累加器ACC类型
     *
     * @return 返回新的累加器
     */
    @Override
    public Tuple3 add(Tuple3 value, Tuple3 acc) {
        //acc.f0 班级名称
        //acc.f1 总分数
        //acc.f2 总人数
        //value.f0 表示班级 value.f1 表示姓名 value.f2 表示分数

        return new Tuple3(value.f0,acc.f1 + value.f2, acc.f2 + 1L);
    }


    @Override
    public Tuple2 getResult(Tuple3 acc) {
        return new Tuple2<>(acc.f0,((double) acc.f1) / acc.f2);
    }


    @Override
    public Tuple3 merge(Tuple3 acc1, Tuple3 acc2) {
        System.out.println("这个函数不会被执行,只有sessoin窗口函数才会被触发,请忽略此方法");
        return new Tuple3<>("",1L,1L);
    }
结果如下:
这是我的自定义输出::4> (一班,1.5)
这是我的自定义输出::2> (二班,4.5)

结果分析:
看到没有keyBy 依据班级分成了两个分区,  window函数后面的计算逻辑在分区之间是独立计算的. 过程如下:
第一个分区检测到:Tuple3.of("一班", "张三", 1L),
              Tuple3.of("一班", "李四", 2L),
              因为窗口数量为2就会触发索引结果为:(一班,1.5)
第二个分区检测到:Tuple3.of("二班", "赵六", 4L),
              Tuple3.of("二班", "小七", 5L),
              同理触发窗口计算结果为:(二班,4.5)
有人可能会注意到我在打印结果的时候没有用:ds.print()
而是用了:ds.addSink(new PrintSinkFunction<>("这是我的自定义输出:", false));
如果你看到了这里请点挂机print()看源码就会看到:
    @PublicEvolving
    public DataStreamSink print() {
        PrintSinkFunction printFunction = new PrintSinkFunction<>();
        return addSink(printFunction).name("Print to Std. Out");
    }
所以说print()方法,底层调用的还是addSink,上面代码用了new PrintSinkFunction<>(); 通过看源码你会看到:打印输出我们可以自定义前缀的,这样方便我们调试.
              

7.windowAll

在keyby后数据跟据指定的key被切. 相同的key会被分配到同一个窗口任务中(可理解为独立线程), window后面的清洗逻辑是在独立线程中分别运行的
而调用windowAll之前不需要调用keyBy函数,windowall则把所有的key都聚合起来所以windowall的并行度只能为1,而window可以有多个并行度。
上面说的东西非常重要,如果看不懂的话请停下来.

8.window 的apply

先看源码:

 public  SingleOutputStreamOperator apply(WindowFunction function) {
        TypeInformation resultType = getWindowFunctionReturnType(function, getInputType());

        return apply(function, resultType);
    }
    下面是WindowFunction 接口:
    
/**
 * base interface for functions that are evaluated over keyed (grouped) windows.
 *
 * @param  The type of the input value.  //流数据元素类型
 * @param  The type of the output value.//处理完后输出元素的类型
 * @param  The type of the key.//key 的类型
 * @param  The type of {@code Window} that this window function can be applied on.//window 的类型, 因为window有很多实现类
 */
@Public
public interface WindowFunction extends Function, Serializable {

    /**
     * evaluates the window and outputs none or several elements.
     *
     * @param key The key for which this window is evaluated.
     * @param window The window that is being evaluated.
     * @param input The elements in the window being evaluated.
     * @param out A collector for emitting elements.
     * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
     */
    void apply(KEY key, W window, Iterable input, Collector out) throws Exception;
}

apply用于在keyBy, window之后,用于对分区之后的每个key对应的独立处理线程中的每个元素做处理.

下面是一个demo,用于对每个window窗口中:

    数据中班级拼接一个班级.人数乘以十.

apply什么时候执行?

执行的时候应当是窗口被触发运算的时候

代码:

package com.pg.flink;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class WindowApply {
    private static final Logger logger = LoggerFactory.getLogger(TestAggFunctionOnWindow.class);
    public static void main(String[] args) throws Exception {
        logger.info("程序开始运行....");
        // 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 读取数据
        DataStream> input = env.fromElements(ENGLISH);

        // 求各个班级英语成绩平均分
        //KeyedStream, String> keyedStreams= input.keyBy((KeySelector, String>) value -> value.f0);
        KeyedStream, String> keyedStreams= input.keyBy(new MyKeySelector());
        WindowedStream, String, GlobalWindow> ws =  keyedStreams.countWindow(2);
        SingleOutputStreamOperator> ds =  ws.apply(new MyWindowFunction());
        ds.addSink(new PrintSinkFunction<>("这是我的自定义输出:", false));
        env.execute("TestAggFunctionOnWindow");

    }

    public static final Tuple3[] ENGLISH = new Tuple3[] {
            Tuple3.of("一班", "张三", 1L),
            Tuple3.of("一班", "李四", 2L),
            Tuple3.of("一班", "王五", 3L),
            Tuple3.of("二班", "赵六", 4L),
            Tuple3.of("二班", "小七", 5L),
            Tuple3.of("二班", "小八", 6L),
    };
    public static class MyWindowFunction implements WindowFunction, Tuple3, String, GlobalWindow>{

        @Override
        public void apply(String s, GlobalWindow window, Iterable> input, Collector> out) throws Exception {
            for (Tuple3 e: input) {
                out.collect(new Tuple3<>(e.f0+s,e.f1,e.f2*10));
            }
        }
    }
    public static class MyKeySelector implements KeySelector, String>{

        @Override
        public String getKey(Tuple3 value) throws Exception {
            return value.f0;
        }
    }
}

上面代码构造了一个计数窗口基于班级名称做分区,下面数据就两个班级,因此keyBy之后会分成两个独立的窗口处理线程, 二者独立运行. 窗口触发的条件是当前窗口有两个数据的时候.
当窗口触发之后apply用于处理当前窗口的数据. 代码中我们每个班级有三条数据,而窗口的触发是:当窗口遇到两条数据的时候被触发.
代码中keyBy基于班级名称做分流,于是(基于下面的数据)会产生两个独立的窗口处理线程

        窗口处理线程一:
        Tuple3.of("一班", "张三", 1L),
        Tuple3.of("一班", "李四", 2L),
        Tuple3.of("一班", "王五", 3L),
        当窗口触发计算的时候(检测到两条数据):调用apply
        Tuple3.of("一班", "张三", 1L),
        Tuple3.of("一班", "李四", 2L),
        变成:
        Tuple3.of("一班一班", "张三", 10L),
        Tuple3.of("一班一班", "李四", 20L),
        而Tuple3.of("一班", "王五", 3L)被抛弃
   
   窗口处理线程一二:
        Tuple3.of("二班", "赵六", 4L),
        Tuple3.of("二班", "小七", 5L),
        Tuple3.of("二班", "小八", 6L),
    当窗口触发计算的时候(检测到两条数据):调用apply
    同理结果为:
        Tuple3.of("二班二班", "赵六", 40L),
        Tuple3.of("二班二班", "小七", 50L),
        而Tuple3.of("二班", "小八", 6L),被抛弃

所以最终两个独立的窗口线程的输出结果,也就是程序的最终输出结果:
这是我的自定义输出::2> (二班二班,赵六,40)
这是我的自定义输出::2> (二班二班,小七,50)
这是我的自定义输出::4> (一班一班,张三,10)
这是我的自定义输出::4> (一班一班,李四,20)

注意:当你不用window而是用的windowAll, windowAll意思就是不根据keyBy分区,也就是所有的数据都跑到一个窗口处理,此时调用apply的时候需要用AllWindowFunction而不是WindowFunction ,二者很相似这里不真多windowAll的apply方法多做阐述.

8.window reduce

顾名思义针对window窗口数据(以key切分), 当前元素与下一个元素做逻辑将生成的新元素返回, 新的元素和下一个元素做下一轮逻辑,然后将生成的新的元素返回,依次往后…知道当前window被触发. 窗口中的数据流由每次产生的新元素返回.
demo:

package com.pg.flink;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WindowReduceDemo {

    private static final Logger logger = LoggerFactory.getLogger(TestAggFunctionOnWindow.class);
    public static void main(String[] args) throws Exception {
        logger.info("程序开始运行....");
        // 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 读取数据
        DataStream> input = env.fromElements(ENGLISH);

        // 求各个班级英语成绩平均分
        //KeyedStream, String> keyedStreams= input.keyBy((KeySelector, String>) value -> value.f0);
        KeyedStream, String> keyedStreams= input.keyBy(new WindowReduceDemo.MyKeySelector());
        WindowedStream, String, GlobalWindow> ws =  keyedStreams.countWindow(2);
        SingleOutputStreamOperator> ds =  ws.reduce(new MyReduce());
        ds.addSink(new PrintSinkFunction<>("这是我的自定义输出:", false));
        env.execute("TestAggFunctionOnWindow");

    }

    public static final Tuple3[] ENGLISH = new Tuple3[] {
            Tuple3.of("一班", "张三", 1L),
            Tuple3.of("一班", "李四", 2L),
            Tuple3.of("一班", "王五", 3L),
            Tuple3.of("二班", "赵六", 4L),
            Tuple3.of("二班", "小七", 5L),
            Tuple3.of("二班", "小八", 6L),
    };

    public static class MyReduce implements  ReduceFunction>{

    @Override
    public Tuple3 reduce(Tuple3 value1, Tuple3 value2) throws Exception {
        return Tuple3.of(value1.f0+value2.f0,value1.f1+value2.f1, value1.f2+value2.f2); }
}
    public static class MyKeySelector implements KeySelector, String> {

        @Override
        public String getKey(Tuple3 value) throws Exception {
            return value.f0;
        }
    }
}
//countWindow(2)意思是当前窗口检测到两个元素就会触发计算
结果如下:
这是我的自定义输出::4> (一班一班,张三李四,3)
这是我的自定义输出::2> (二班二班,赵六小七,9)
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/784873.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号