1.keyby的特殊
keyby是基于hashcode进行重分区的,而broadcast和rebanlance是随机重新分区 sum等算子是基于keyedsteam,【普通的数据类型是Dsteam3.Flink流处理的API
flink的处理过程包含source transfer sink3.1创建执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
StreamExecutionEnvironment.createLocalEnvironment(1)
StreamExecutionEnvironment.createRemoteEnvironment("172.168.72.64",4044,"")
环境执行的俩种方式:
第一种是flink自动获取,自动包装
第二种是 手动切换的类型
3.2source几种的例子
readTextFile 读文件的source接入
socketTextStream:从文本流读入的方式
env.fromCollection(List(
SendsoRendsourceTest("ceshi1",1547718199,123),
SendsoRendsourceTest("ceshi2",1287319287,100),
)):从内存读集合的方式
从kafka中读取数据的方式
val properties = new Properties()
properties.setProperty("bootstrap.serers","localhost:9092")
properties.setProperty("group.id","consumer-group")
properties.setProperty("auto.offset.reset","latest")
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
val data: DataStream[String] = env.addSource(new FlinkKafkaConsumer011[String]("ceshi",new SimpleStringSchema(),properties))
data.print("data")
env.execute("source test job")
java版本
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.Collector;
import javax.xml.crypto.*;
import java.util.Properties;
public class FlinkSteamingApi {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// StreamExecutionEnvironment.createLocalEnvironment(1)
// StreamExecutionEnvironment.createRemoteEnvironment("172.168.72.64",4044,"")
env.setParallelism(4);
Properties properties = new Properties();
properties.setProperty("","");
DataStreamSource inputData = env.addSource(new FlinkKafkaConsumer011("test", new SimpleStringSchema(), properties));
// SingleOutputStreamOperator mapDataStream =
SingleOutputStreamOperator sum = inputData.flatMap(new FlatMapFunction() {
public void flatMap(String o, Collector collector) throws Exception {
String[] split = o.split("");
for (String data : split) {
collector.collect(new Tuple2(data, 1));
}
}
}).keyBy(0).sum(1);
sum.print();
try {
env.execute("cveshi");
} catch (Exception e) {
e.getMessage();
}
}
}
3.3不同算子执行在不同的slot上的设置
3.3.1slotSharingGroup
slotSharingGroup("1")将slot划分组管理,同一组内可以共享slot
默认情况:slotSharingGroup ("defualt")
在指定上slot组之后,后面的算子默认是和之前指定的slot组
应用场景:类似于窗口计算比较耗时的算子任务,可以单独使用slot组区分资源应用3.3.2disablechaining和startNewChain
disablechaining :算子作用,算子前后任务都不能合并成一个任务执行 startNewChain:算子作用算子断开前面算子3.4自定义source源
import java.util.{Properties, Random}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.kafka.common.serialization.StringDeserializer
object FlinkSteaming {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// StreamExecutionEnvironment.createLocalEnvironment(1)
// StreamExecutionEnvironment.createRemoteEnvironment("172.168.72.64",4044,"")
env.setParallelism(4)
// val data: DataStream[SendsoRendsourceTest] = env.fromCollection(List(
// SendsoRendsourceTest("ceshi1",1547718199,123),
// SendsoRendsourceTest("ceshi2",1287319287,100),
// SendsoRendsourceTest("ceshi2",1287319287,99),
// SendsoRendsourceTest("ceshi1",1547718200,123),
// SendsoRendsourceTest("ceshi1",1547718201,123),
// SendsoRendsourceTest("ceshi1",1547718202,123)
// ))
// data.print("data")
// val data: DataStream[String] = env.readTextFile("E:\java_home_work\IdeaProjects\zbs_learn\zbs_spring\src\main\resources\Tess_data")
// env.socketTextStream("",21)
// v从kafka中读取数据
val properties = new Properties()
properties.setProperty("bootstrap.serers","localhost:9092")
properties.setProperty("group.id","consumer-group")
properties.setProperty("auto.offset.reset","latest")
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
val data: DataStream[String] = env.addSource(new FlinkKafkaConsumer011[String]("ceshi",new SimpleStringSchema(),properties))
data.print("data")
data.flatMap(_.split(" ")).map((_,1)).slotSharingGroup("1").disableChaining()
.filter(_._1.nonEmpty).keyBy(0).sum(1).startNewChain()
data.slotSharingGroup("1")
env.execute("source test job")
}
}
//数据的输入形式是集合
case class SendsoRendsourceTest(id:String,timestap:Double,rmperatur:Double)
//从文件中读取数据
object sourceDemo{
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream5: DataStream[SendsoRendsourceTest] = env.addSource(new MysensorSource())
stream5.print()
env.execute("ceshisource")
}
}
//实现一个自定义的sourceFunction,自动生成测试数据
class MysensorSource extends SourceFunction[SendsoRendsourceTest] {
// 定义一个FLAG,表示数据源是否正常运行
var running:Boolean=true
override def run(ctx: SourceFunction.SourceContext[SendsoRendsourceTest]): Unit = {
val rand = new Random()
var curTemps = 1.to(10).map(
i=>("sensor"+i,60 +rand.nextGaussian()*20)
)
//无限循环,生成循环数据
while (running){
//随机生成微小波动
curTemps.map(
data=> (data._1,data._2+rand.nextGaussian())
)
val cutrs=System.currentTimeMillis()
//包装成样例类,用ctx发送数据
curTemps.foreach(
data=>ctx.collect(SendsoRendsourceTest(data._1,cutrs,data._2))
)
Thread.sleep(1000)
}
}
override def cancel(): Unit = {
running=false
}
}
3.5transform
scource之后,sink之前的算子都是转换算子
基本转换算子
map、flatmap、filter
特殊算子,只在数据传输之前起作用,起到重分区和类型装换
keyby 基于key的hashcode做重分区 datastream -> keystream 下面可以做滚动聚合算子 dataStream 是没有聚合操作,目前所有的聚合操作都是针对keySteam
滚动聚合算子
sum
min和minby区别 min只输出指定字段的最小值,其他字段使用第一个的值,minby是根据最小值的,其他字段也是最小值的其他字段
max和maxby区别 同上
reduce 类型为keyedstream keyby流 根据历史结果+新传入的数据进行计算 所以两个参数是相同的
fold 已删除
aggregate
reduce和aggregate
.reduce(
(a: SendsoRendsourceTest, b: SendsoRendsourceTest) => {
a.rmperatur = a.rmperatur + b.rmperatur
a
}
拆分
split-select 分流 connect 和comap/coflatmap 成对出现,都是针对 splitSteam 或者 connectSteam ,然后通过select 或者comap coflatmap 进行获取 union 合流
data.split(x=>{ //Traversableonce sacla里的复杂类型 seq list等
if(x.isEmpty){
List("high")
}
else
List("low")
}).select("high")
有状态的算子
mapwithstate
函数类:flink提供了所有的udf的函数类 都可以通过继承实现自己的函数类
富函数类:flink提供了所有的udf的函数类 都可以通过继承实现自己的函数类比一般的udf类多加生命周期的函数
open() 初始化的时候,只调用一次
close() 结束的时候,只调用一次
getruntimecontext():获取运行时上下文,例如状态
状态操作多用于 状态编程
getruntimecontext().getIndexOfThisSubtask:获取当前任务的子任务编号
3.6sink源
sink: .writeAsCsv() 输出为csv文件的sink方法
..writeToSocket() 一般测试数据
官方提供的sink源 sourece源
kafka sink/sources
cassandra
kinesis Streams sink/sources
es
hdfs
rabbitmq sink/sources
nifi sink/sources
twitter streaming api source
Bahir
activemq sink/sources
flume
redis
akka
netty
kafka sink 案例分析:
tuple2SingleOutputStreamOperator.addSink(
new FlinkKafkaProducer011>("localhost:9092","sinktest",new mySerializationSchema())
);
class mySerializationSchema implements SerializationSchema> {
@Override
public byte[] serialize(Tuple2 stringIntegerTuple2) {
return stringIntegerTuple2.toString().getBytes();
}
}
redis sink 案例分析:apache bahir的依赖 tuple2SingleOutputStreamOperator.addSink(new RedisSink org.apache.bahir flink-connector-redis_2.111.0 >(new FlinkJedisPoolConfig.Builder().setHost("").setPort(6379).build(), new RedisMapper >() { @Override //定义保存数据到redis的命令 public RedisCommandDescription getCommandDescription() { new RedisCommandDescription(RedisCommand.HSET,"sadasdas"); } @Override public String getKeyFromData(Tuple2 data) { return data.f0; } @Override public String getValueFromData(Tuple2 data) { return data.f1.toString(); } }); env.execute();
自定义sink源: 使用richfunction 使用open方式创建连接然后实现invoke方式发送数据 实现数据插入
tuple2SingleOutputStreamOperator.addSink(new RichSinkFunction>() {
//声明连接和预编译语句
Connection connection = null;
PreparedStatement insertstmt = null;
PreparedStatement updateStmnt = null;
@Override
public void open(Configuration parameters) throws Exception {
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456");
System.out.println(connection);
insertstmt = connection.prepareStatement("insert into ceshi(name,id) values (?,?)");
updateStmnt = connection.prepareStatement("update ceshi set id=? where name=?");
}
//每来一条数据,就调用连接执行sql
@Override
public void invoke(Tuple2 value, Context context) throws Exception {
//直接执行更新语句,如果没有更新就拆入
updateStmnt.setInt(1, value.f1);
updateStmnt.setString(2, value.f0);
updateStmnt.execute();
if (updateStmnt.getUpdateCount() == 0) {
insertstmt.setString(1,value.f0);
insertstmt.setInt(2,value.f1);
insertstmt.execute();
}
}
@Override
public void close() throws Exception {
insertstmt.close();
updateStmnt.close();
connection.close();
}
});
env.execute();
4.窗口
windows:窗口就是将无限流切割为有限流的一种方式,塔会将流数据发送到有限大小的桶中进行分析4.1窗口的类型
时间窗口: 滚动时间窗口-tumbling Winows: 数据根据固定的窗口长度对数据进行划分 时间对齐,窗口长度固定,没有重叠 滑动时间窗口-Sliding Windows:滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成,窗口长度固定,可以有重叠 会话窗口-特殊的一种窗口Session Windows:由一系列事件组合一个指定时间长度的timeout间隙组成,也就是一段时间就么有接受到新数据就会生成一个新的窗口 计数窗口: 滚动计数窗口-tumbling Winows: 数据根据固定的窗口长度对数据进行划分 滑动计数窗口-Sliding Windows:滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成,窗口长度固定,可以有重叠4.2Window ApI
窗口分配器.window()方法 我们可以用window()来定义一个窗口,然后基于这个windows去做一些聚合和其他的处理操作,注意这个window() 方法必须在keyby以后使用 Flink还提供了更加简单的.timeWindow和.countWindow方法,用于定义时间和计数窗口
窗口测试: 在不使用keyby算子的开窗函数,调用windowall方法,这个方法会默认吧数据都会放到一个key里,相当于提前加了group ,建议不是用 splitDataStream.windowAll(); 基于keyby之后的开窗更常见 正常的写法: DataStream> tuple2SingleOutputStreamOperator = splitDataStream.keyBy(0).timeWindow(Time.seconds(2)).sum(1); tuple2SingleOutputStreamOperator.print();
窗口的分配器:globalWindows共同的窗口计数器
tumblingWinows滚动窗口计数器
SlidingWindows滑动窗口计数器
sessionWindows会话窗口计数器
实例: 滚动时间窗口 .timeWindow(Time.seconds(15))
滑动时间窗口 .timeWindow(Time.seconds(15),Time.seconds(5))
会话窗口 .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
滚动计数窗口 .countWindow(5)
滑动计数窗口 .countWindows(10,2)
4.3窗口聚合操作
增量聚合函数
每条数据来之后,保持一个简单的状态 ReduceFunction AggregateFunction 来一个计算一个,保持状态,不输出 .sum .min .max
全窗口函数 ds-aws-ds
全把窗口的所有数据收集起来,等到计算的时候遍历所有数据 ProcessWindowFunction WindowFunction 类似于批处理
DataStream> tuple2SingleOutputStreamOperator = splitDataStream.keyBy(0).timeWindow(Time.seconds(2) ).aggregate(new AggregateFunction , Integer, Tuple2 >() { @Override public Integer createAccumulator() { return 0; } @Override public Integer add(Tuple2 stringIntegerTuple2, Integer integer) { return integer+1; } @Override public Tuple2 getResult(Integer integer) { return null; } @Override public Integer merge(Integer integer, Integer acc1) { return null; } }) ; tuple2SingleOutputStreamOperator.print(); env.execute(); }
.apply(new WindowFunction, Tuple2 , Tuple, TimeWindow>() { @Override public void apply(Tuple tuple, TimeWindow window, Iterable > input, Collector > out) throws Exception { //获取最终时间 long end = window.getEnd(); ArrayList > list = IteratorUtils.toList(input.iterator(); out.collect(list.get(0)); } })
.countWindow(10,2).aggregate(new AggregateFunction, Tuple2 , Tuple2 >() { @Override public Tuple2 createAccumulator() { return new Tuple2(1,1); } @Override public Tuple2 add(Tuple2 stringIntegerTuple2, Tuple2 integerIntegerTuple2) { return new Tuple2< Integer, Integer>(stringIntegerTuple2.f1+integerIntegerTuple2.f0,stringIntegerTuple2.f1+1); } @Override public Tuple2 getResult(Tuple2 integerIntegerTuple2) { return null; } @Override public Tuple2 merge(Tuple2 integerIntegerTuple2, Tuple2 acc1) { return null; } }) ;
注意:刚开窗的滑动步长就是输出的频率4.4窗口的其他方法
process context 是上下文
.trigger() 触发器 定义window关闭,触发计算,输出结果 ·evictor() 移除器 定义移除某些数据的逻辑 .allowedLateness() 允许处理迟到的数据 .sideOutputLateData() 讲迟到的数据放入侧输出流 .getSideOutput获取侧输出流 窗口的迟到的逻辑,现在原窗口的时间输出一次,切窗口不会立即关闭,会等待一个时间的,等待数据的传入,再次计算,再次输出,等到迟到时间到后,才会关闭窗口 迟到数据-》窗口等待-》测输出流写入处理-》侧输出流获取数据
.countWindow(10,2)
// .trigger()
// .evictor()
.allowedLateness(Time.seconds(2))
.sideOutputLateData(new OutputTag<>("123"))
.aggregate(new AggregateFunction, Tuple2, Tuple2>() {
@Override
public Tuple2 createAccumulator() {
return new Tuple2(1,1);
}
@Override
public Tuple2 add(Tuple2 stringIntegerTuple2, Tuple2 integerIntegerTuple2) {
return new Tuple2< Integer, Integer>(stringIntegerTuple2.f1+integerIntegerTuple2.f0,stringIntegerTuple2.f1+1);
}
@Override
public Tuple2 getResult(Tuple2 integerIntegerTuple2) {
return null;
}
@Override
public Tuple2 merge(Tuple2 integerIntegerTuple2, Tuple2 acc1) {
return null;
}
})
注意点:侧输出流的获取只能用 SingleOutPutStreamOperator这个数据类型,DataStream没有getSidwOutput的方法 例如:SingleOutputStreamOperator alertStream = (SingleOutputStreamOperator) alerts;5.时间语义
主要包含三个时间语义:
Event Time 事件创建的时间
ingestion time 数据进入Flink的时间
processing time 执行操作算子的本地系统时间,与机器相关
不同的时间语义有不同的应用的场景 Event time是我们更关心的时间
设置代码的处理时间是处理时间 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
然后设置流的事件时间时.assignTimestampsAndWatermarks5.1乱序时间的影响
Event Time也会带来数据的影响,因为数据的到达是可能是无序,影响数据窗口的操作
例子,当新的窗口数据到时,上一个窗口将要关闭,但是上一窗口执行什么操作,是关闭还是等待,怎么关闭,原来的迟到数据处理会带来一个问题
,窗口先输出结果,然后再迟到一条计算一条,会造成大量的输出窗口计算,
5.2Watermark
遇到一个时间戳达到了窗口的关闭时间,不应该立刻触发窗口的计算,而是等待一段时间, 等迟到的数据来了在关闭窗口
watermark是衡量数据EventTime的机制,可以设定延迟触发
通常结合window的实现,数据流的Watermark用于timestamp小于Watewrmark的数据, 都已经到达了,因此,window的执行也是Watermark触发的
watermark,让窗口延迟触发,allowedLateness 运行时等待迟到数据,singoutput侧输出流处理真正数据的延迟5.3Watermark的特点
watermark是一条特殊的数据记录
watermark必须单点递增,以确保任务的事件时间时钟在向前推进,而不是在后退,watermark于数据的时间戳有关
举例:
以数字代表时间戳
1 5 3 6 8 7
watermark 2 5 8
watermark的时间怎么设置,最大的迟到时间
数据会根据时间分桶,但是窗口的关闭是根据water
0-5的桶
1 w -2
4 w 1
5 w 2
5.4Watermark的传递
涉及到多个分区的数据 Watermark怎么保证共同 广播下游 涉及到多个上游watermark向下传递时, 每一个任务可能会有多个并行的上游任务发送watermark 取上游最小的watermaek, 同时也要跟多个并行的下游任务 去发送他的watermark broadcast5.5Watermark的定义
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor>( Time.seconds(10)) { @Override public long extractTimestamp(Tuple2 element) { return element.f1 *1000L ; } });
实现的类里传的参数是watermark的延迟时间 类中是 取哪个字段是通过extractTimestamp 来设置 取哪个字段
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor>() { @Override public long extractAscendingTimestamp(Tuple2 element) { return element.f1; } });
如果数据是升序的,即不需要设置Watermark的时间 默认是-1毫秒的
assignTimestampsAndWatermarks方法可以传两个function 主要分为两类: AssignerWithPeriodicWatermarks 周期性生成watermark 隔一段时间自动生成 AssignerWithPunctuatedWatermarks 立即生成watermark 基于数据来判断是否生成watermaek5.6窗口开始时间
窗口的开始时间:
窗口的起始点, 源码里是一个取模的操作 使用当前的时间戳-offset
splitDataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(2),Time.seconds(1)));
splitDataStream.keyBy(0).window(SlidingEventTimeWindows.of(Time.seconds(2),Time.seconds(1),Time.seconds(1)));
通过窗口的时间变量控制窗口的开始时间
6状态编程
6.1状态的基本概念
Flink的状态 分为两大类: 算子状态 OPeratior State 键控状态 Keyed State 状态后端 State Backends 我们任务数据流一条一条处理的,可以看做是无状态算子, 类似于 window的 minby sum这种是带状态的算子,会有一个额外的任务来维护这个状态 state会缓存的内存中 状态需要关联 因为本地的状态需要有互通的(序列化反序列化 以及哪些状态储存在哪里 )高效存储 状态一致性等。 被mange state raw state6.2举例示范
连续两个数据差值超过10度
public class FlatmapRichFunctionDev extends RichFlatMapFunction6.3键控状态, Tuple3 > { private Integer num_flag; //定义状态,保存上一次的值 private ValueState state; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); state = getRuntimeContext().getState(new ValueStateDescriptor ("last_temp", Integer.class)); } public FlatmapRichFunctionDev(int num_flag) { this.num_flag = num_flag; } @Override public void flatMap(Tuple2 stringIntegerTuple2, Collector > collector) throws Exception { Integer value = state.value(); if (value != null) { Integer i = Math.abs(stringIntegerTuple2.f1 - value); if(i > num_flag) { collector.collect(new Tuple3<>(stringIntegerTuple2.f0,value,stringIntegerTuple2.f1)) ; } } state.update(stringIntegerTuple2.f1); } }
键控状态是根据输入数据流中定义的建 来维护和访问的 Flink为每一个key维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子中, 这个任务会维护和处理这个key对应的状态 当任务处理一条数据时
键控状态的数据结构:
值状态: Value state 列表状态 list state 映射状态 Map state 聚合状态 Reducing state & Aggregating State
键控装填的声明: myValueState = getRuntimeContext().getState(new ValueStateDescriptor("last_temp", Integer.class));
算子状态:定义一个变量 提交给flink 通过checkpoint 将数据保存和恢复



