Flink自从1.10就喊着要搞流批一体,据说1.14是个里程碑,特意体验下。
变化 DataSet消失笔者隐约记得,Flink1.8老版本和Spark很像,同样分Stream流处理和DataSet批处理。新版本中:
package com.zhiyong.flinkStudy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.SortPartitionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class FlinkDatasetDemo1 {
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource data = env.fromElements("hehe", "haha", "哈哈", "哈哈");//老版本是返回DataSet
String[] str1 = {"hehe1", "haha1", "哈哈1", "哈哈1"};
DataSource data1 = env.fromElements(str1);//老版本是返回DataSet
AggregateOperator> result = data.flatMap(new FlatMapFunction1())
.groupBy(0).sum(1);
result.print();
System.out.println("**************************");
SortPartitionOperator> result1 = data1.flatMap(new FlatMapFunction2())
.groupBy(0).sum(1).sortPartition(1, Order.DESCENDING);
result1.print();
}
private static class FlatMapFunction1 implements FlatMapFunction> {
@Override
public void flatMap(String value, Collector> out) throws Exception {
for (String cell : value.split("\s+") ) {
out.collect(Tuple2.of(cell,1));
}
}
}
private static class FlatMapFunction2 implements FlatMapFunction> {
@Override
public void flatMap(String value, Collector> out) throws Exception {
String[] split = value.split("\s+");
for (int i = 0; i < split.length; i++) {
out.collect(new Tuple2<>(split[i],1));
}
}
}
}
执行后:
log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.utils.PlanGenerator). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. (hehe,1) (haha,1) (哈哈,2) ************************** (哈哈1,2) (hehe1,1) (haha1,1) Process finished with exit code 0
结果当然是不会有啥变化,但是记忆中的DataSet消失了,变成了DataSource,点进去可以看到:
package org.apache.flink.api.java.operators; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.NonParallelInput; import org.apache.flink.api.common.operators.GenericDataSourcebase; import org.apache.flink.api.common.operators.OperatorInformation; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.SplitDataProperties; import org.apache.flink.configuration.Configuration; @Public public class DataSourceextends Operator > { private final InputFormat inputFormat; private final String dataSourceLocationName; private Configuration parameters; private SplitDataProperties splitDataProperties; // -------------------------------------------------------------------------------------------- public DataSource( ExecutionEnvironment context, InputFormat inputFormat, TypeInformation type, String dataSourceLocationName) { super(context, type); this.dataSourceLocationName = dataSourceLocationName; if (inputFormat == null) { throw new IllegalArgumentException("The input format may not be null."); } this.inputFormat = inputFormat; if (inputFormat instanceof NonParallelInput) { this.parallelism = 1; } } @Internal public InputFormat getInputFormat() { return this.inputFormat; } public DataSource withParameters(Configuration parameters) { this.parameters = parameters; return this; } public Configuration getParameters() { return this.parameters; } @PublicEvolving public SplitDataProperties getSplitDataProperties() { if (this.splitDataProperties == null) { this.splitDataProperties = new SplitDataProperties (this); } return this.splitDataProperties; } // -------------------------------------------------------------------------------------------- protected GenericDataSourcebase translateToDataFlow() { String name = this.name != null ? this.name : "at " + dataSourceLocationName + " (" + inputFormat.getClass().getName() + ")"; if (name.length() > 150) { name = name.substring(0, 150); } @SuppressWarnings({"unchecked", "rawtypes"}) GenericDataSourcebase source = new GenericDataSourcebase( this.inputFormat, new OperatorInformation (getType()), name); source.setParallelism(parallelism); if (this.parameters != null) { source.getParameters().addAll(this.parameters); } if (this.splitDataProperties != null) { source.setSplitDataProperties(this.splitDataProperties); } return source; } }
继续往下找:
package org.apache.flink.api.java.operators; import org.apache.flink.annotation.Public; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.operators.util.OperatorValidationUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @Public public abstract class Operator> extends DataSet { }
接着往下找:
package org.apache.flink.api.java; import org.apache.flink.annotation.Public; import 省略中间的。。。。。。。。。。。。 import org.apache.flink.util.Preconditions; import java.io.IOException; import java.util.ArrayList; import java.util.List; @Public public abstract class DataSet{ }
新版本已经废弃了直接操作DataSet,使用船新的DataSource来做批处理!!!
可以看到现在使用的DataSet的实现类Operator的实现类DataSource。
每秒mock一条数据的数据源:
package com.zhiyong.flinkStudy; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.ArrayList; import java.util.Random; public class WordCountSource1ps implements SourceFunction{ private boolean needRun = true; @Override public void run(SourceContext sourceContext) throws Exception { while (needRun){ ArrayList result = new ArrayList<>(); for (int i = 0; i < 20; i++) { result.add("zhiyong"+i); } sourceContext.collect(result.get(new Random().nextInt(20))); Thread.sleep(1000); } } @Override public void cancel() { needRun = false; } }
DataStream程序:
package com.zhiyong.flinkStudy;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
import java.util.Collection;
public class FlinkDataStreamDemo1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);//防止报网络资源不充分的错
SingleOutputStreamOperator> result1 = env.addSource(new WordCountSource1ps())
.flatMap(new FlatMapFunction1())
.keyBy(new KeySelector, Object>() {
@Override
public Object getKey(Tuple2 value) throws Exception {
return value.f0;
}
})
.sum(1);
DataStream> result2 = env.addSource(new WordCountSource1ps())
.flatMap(new FlatMapFunction1())
.keyBy(0)
// 已经过时的方法
.sum(1);
// SingleOutputStreamOperator> result3 = env.addSource(new WordCountSource1ps())
// .flatMap(new FlatMapFunction1())
// .keyBy(new KeySelector, Object>() {
// @Override
// public Object getKey(Tuple2 value) throws Exception {
// return value.f0;
// }
// })
// .window(new WindowAssigner, Window>() {
// @Override
// public Collection assignWindows(Tuple2 element, long timestamp, WindowAssignerContext context) {
// return null;
// }
//
// @Override
// public Trigger, Window> getDefaultTrigger(StreamExecutionEnvironment env) {
// return null;
// }
//
// @Override
// public TypeSerializer getWindowSerializer(ExecutionConfig executionConfig) {
// return null;
// }
//
// @Override
// public boolean isEventTime() {
// return false;
// }
// })
// .sum(1);
SingleOutputStreamOperator> result4 = env.addSource(new WordCountSource1ps())
.flatMap(new FlatMapFunction1())
.keyBy(0)
// keyBy已经过时的方法
.timeWindow(Time.seconds(30))
// timeWindow已经过时的方法
.sum(1);
SingleOutputStreamOperator> result5 = env.addSource(new WordCountSource1ps())
.flatMap(new FlatMapFunction1())
.keyBy(new KeySelector, Object>() {
@Override
public Object getKey(Tuple2 value) throws Exception {
return value.f0;
}
})
.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))).sum(1);
//result1.print();
//result2.print();
//result3.print();
//result4.print();
result5.print();
env.execute("有这句才能执行任务,没有这句会Process finished with exit code 0直接结束");
}
public static class FlatMapFunction1 implements FlatMapFunction> {
@Override
public void flatMap(String value, Collector> out) throws Exception {
for (String cell : value.split("\s+")) {
out.collect(Tuple2.of(cell, 1));
}
}
}
}
result1.print()执行后:
log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.ClosureCleaner). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 10> (zhiyong16,1) 30> (zhiyong7,1) 32> (zhiyong14,1) 33> (zhiyong3,1) 29> (zhiyong12,1) 2> (zhiyong15,1) 10> (zhiyong16,2) 2> (zhiyong15,2) 10> (zhiyong16,3) 30> (zhiyong7,2) 17> (zhiyong18,1) 30> (zhiyong7,3) 35> (zhiyong19,1) 35> (zhiyong19,2) 4> (zhiyong4,1) 18> (zhiyong5,1) 35> (zhiyong8,1) 18> (zhiyong5,2) 25> (zhiyong11,1) 23> (zhiyong2,1) 23> (zhiyong2,2) 25> (zhiyong11,2) 35> (zhiyong8,2) 18> (zhiyong5,3) 35> (zhiyong19,3) 35> (zhiyong8,3) 33> (zhiyong3,2) 35> (zhiyong19,4) 35> (zhiyong8,4) 35> (zhiyong8,5) 4> (zhiyong0,1) 23> (zhiyong2,3) 32> (zhiyong14,2) 10> (zhiyong10,1) 25> (zhiyong11,3) 35> (zhiyong17,1) 35> (zhiyong17,2) 32> (zhiyong14,3) Process finished with exit code 130
SingleOutputStreamOperator这种新类:
官方介绍:
SingleOutputStreamOperator represents a user defined transformation applied on a DataStream with one predefined output type. Type parameters:– The type of the elements in this stream.
显然这是一种DataStream的继承类。
如果遇到了报错:
log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.ClosureCleaner). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:258) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47) at akka.dispatch.OnComplete.internal(Future.scala:300) at akka.dispatch.OnComplete.internal(Future.scala:297) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23) at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63) at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233) at org.apache.flink.runtime.scheduler.Schedulerbase.updateTaskExecutionState(Schedulerbase.java:684) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444) at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) at akka.actor.ActorCell.invoke(ActorCell.scala:548) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ... 4 more Caused by: java.io.IOException: Insufficient number of network buffers: required 37, but only 3 available. The total number of network buffers is currently set to 2048 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'. at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:386) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:364) at org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:279) at org.apache.flink.runtime.io.network.partition.ResultPartition.setup(ResultPartition.java:151) at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.setup(BufferWritingResultPartition.java:95) at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:969) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:664) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.lang.Thread.run(Thread.java:750) Process finished with exit code 1
这是因为网络资源不充分,最简单的方式就是设置并行度来降低网络要求:
env.setParallelism(1);
之后result2.print()可以正常输出:
log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.ClosureCleaner). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. (zhiyong3,1) (zhiyong0,1) (zhiyong14,1) (zhiyong3,2) (zhiyong18,1) (zhiyong1,1) (zhiyong13,1) (zhiyong19,1) (zhiyong13,2) (zhiyong4,1) (zhiyong3,3) (zhiyong9,1) (zhiyong0,2) (zhiyong12,1) (zhiyong10,1) (zhiyong6,1) (zhiyong19,2) (zhiyong18,2) (zhiyong15,1) (zhiyong6,2) (zhiyong4,2) (zhiyong16,1) (zhiyong15,2) (zhiyong6,3) (zhiyong10,2) (zhiyong4,3) Process finished with exit code 130
继承类直接手动强转为父类,调用父类的方法一般不会有啥毛病。
再来试试DataStream的窗口:
SingleOutputStreamOperator> result4 = env.addSource(new WordCountSource1ps()) .flatMap(new FlatMapFunction1()) .keyBy(0) // keyBy已经过时的方法 .timeWindow(Time.seconds(30)) // timeWindow已经过时的方法 .sum(1);
虽然还能用,但是已经是过时的方法,点进去看timeWindow:
@Deprecated
public WindowedStream timeWindow(Time size) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(TumblingProcessingTimeWindows.of(size));
} else {
return window(TumblingEventTimeWindows.of(size));
}
}
显然是不推荐继续使用timeWindow算子了。
result3的window算子直接new的原生WindowAssigner对象用起来显然是有点复杂,源码也写了可以使用.window(TumblingEventTimeWindows.of(size))或者.window(TumblingProcessingTimeWindows.of(size)),即使用滚动的时间时间窗口或者滚动的处理时间窗口。
点到WindowAssigner看到:
package org.apache.flink.streaming.api.windowing.assigners; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.Window; import java.io.Serializable; import java.util.Collection; @PublicEvolving public abstract class WindowAssignerimplements Serializable { private static final long serialVersionUID = 1L; public abstract Collection assignWindows( T element, long timestamp, WindowAssignerContext context); public abstract Trigger getDefaultTrigger(StreamExecutionEnvironment env); public abstract TypeSerializer getWindowSerializer(ExecutionConfig executionConfig); public abstract boolean isEventTime(); public abstract static class WindowAssignerContext { public abstract long getCurrentProcessingTime(); } }
父类WindowAssigner有子类:
除了2种滚动窗口,当然还有2种滑动窗口。
简单使用下滑动窗口:
SingleOutputStreamOperator> result5 = env.addSource(new WordCountSource1ps()) .flatMap(new FlatMapFunction1()) .keyBy(new KeySelector , Object>() { @Override public Object getKey(Tuple2 value) throws Exception { return value.f0; } }) .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))).sum(1);
输出:
(zhiyong17,1) (zhiyong17,1) (zhiyong7,1) (zhiyong15,1) (zhiyong19,1) (zhiyong4,2) (zhiyong4,2) (zhiyong0,1) (zhiyong15,1) (zhiyong7,1) (zhiyong9,2) (zhiyong18,1) (zhiyong19,1) (zhiyong8,1) Process finished with exit code 130
算子API变化了很多,过时的老API目前也还能凑合着用,以后肯定是要慢慢习惯新API的,老API搞不好哪个版本就不能用了。
DSL(Table API)更新构造执行环境的设置对象时发现嘴强王者的BlinkPlanner居然作废了!!!
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
// useBlinkPlanner()已过期
.inStreamingMode()
.build();
点进去发现源码写着:
@Deprecated
public Builder useOldPlanner() {
throw new TableException(
"The old planner has been removed in Flink 1.14. "
+ "Please upgrade your table program to use the default "
+ "planner (previously called the 'blink' planner).");
}
@Deprecated
public Builder useBlinkPlanner() {
return this;
}
好家伙,这2个Planner都要废弃了。
package com.zhiyong.flinkStudy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.expressions.$;//可以使用$("变量名")
public class FlinkTableApiDemo1 {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Flink1.14不需要设置Planner
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
DataStreamSource data = env.addSource(new WordCountSource1ps());
Table table1 = tableEnv.fromDataStream(data, "word");//过时
Table table1_1 = table1.where($("word").like("%5%"));
System.out.println("tableEnv.explain(table1_1) = " + tableEnv.explain(table1_1));//过时
tableEnv.toAppendStream(table1_1, Row.class).print("table1_1");//过时
System.out.println("env.getExecutionPlan() = " + env.getExecutionPlan());
env.execute();
}
}
执行后:
tableEnv.explain(table1_1) = == Abstract Syntax Tree ==
LogicalFilter(condition=[LIKE($0, _UTF-16LE'%5%')])
+- LogicalTableScan(table=[[Unregistered_DataStream_1]])
== Optimized Physical Plan ==
Calc(select=[word], where=[LIKE(word, _UTF-16LE'%5%')])
+- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[word])
== Optimized Execution Plan ==
Calc(select=[word], where=[LIKE(word, _UTF-16LE'%5%')])
+- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[word])
env.getExecutionPlan() = {
"nodes" : [ {
"id" : 1,
"type" : "Source: Custom Source",
"pact" : "Data Source",
"contents" : "Source: Custom Source",
"parallelism" : 1
}, {
"id" : 4,
"type" : "SourceConversion(table=[Unregistered_DataStream_1], fields=[word])",
"pact" : "Operator",
"contents" : "SourceConversion(table=[Unregistered_DataStream_1], fields=[word])",
"parallelism" : 1,
"predecessors" : [ {
"id" : 1,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 5,
"type" : "Calc(select=[word], where=[LIKE(word, _UTF-16LE'%5%')])",
"pact" : "Operator",
"contents" : "Calc(select=[word], where=[LIKE(word, _UTF-16LE'%5%')])",
"parallelism" : 1,
"predecessors" : [ {
"id" : 4,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 6,
"type" : "SinkConversionToRow",
"pact" : "Operator",
"contents" : "SinkConversionToRow",
"parallelism" : 1,
"predecessors" : [ {
"id" : 5,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 7,
"type" : "Sink: Print to Std. Out",
"pact" : "Data Sink",
"contents" : "Sink: Print to Std. Out",
"parallelism" : 36,
"predecessors" : [ {
"id" : 6,
"ship_strategy" : "REBALANCE",
"side" : "second"
} ]
} ]
}
table1_1:8> +I[zhiyong5]
table1_1:9> +I[zhiyong5]
table1_1:10> +I[zhiyong5]
table1_1:11> +I[zhiyong15]
table1_1:12> +I[zhiyong15]
Process finished with exit code 130
浏览器:
https://flink.apache.org/visualizer/
将上方打印出的JSON字符串粘贴到网站的文本框,点Draw:
可以看到DAG图:
虽然可以正常使用Table API,但是过时方法太多了:
例如过时方法fromDataStream:
@DeprecatedTable fromDataStream(DataStream dataStream, String fields);
例如过时方法explain:
@Deprecated String explain(Table table);
还有过时方法toAppendStream:
@DeprecatedDataStream toAppendStream(Table table, Class clazz);
根据源码API替换为新方法后:
package com.zhiyong.flinkStudy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.apache.flink.table.api.expressions.$;//可以使用$("变量名")
public class FlinkTableApiDemo1 {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Flink1.14不需要设置Planner
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
DataStreamSource data = env.addSource(new WordCountSource1ps());
System.out.println("***********新方法**************");
ArrayList strings = new ArrayList<>();
strings.add("f0");//必须写f0
List dataTypes = new ArrayList();
dataTypes.add(DataTypes.STRING());
Schema schema = Schema.newBuilder()
.fromFields(strings, dataTypes)
.build();
List columns = schema.getColumns();
for (Schema.UnresolvedColumn column : columns) {
System.out.println("column = " + column);
}
Table table2 = tableEnv.fromDataStream(data, schema);
Table table2_1 = table2.where($("f0").like("%5%"));//必须写f0
System.out.println("table2_1.explain() = " + table2_1.explain(ExplainDetail.JSON_EXECUTION_PLAN));
tableEnv.toDataStream(table2_1,Row.class).print("table2_1");
System.out.println("env.getExecutionPlan() = " + env.getExecutionPlan());
env.execute();
}
}
字段名称必须写【f0】,还没来得及扒源码仔细研究为何是这样。不这么写会报错:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to find a field named 'word' in the physical data type derived from the given type information for schema declaration. Make sure that the type information is not a generic raw type. Currently available fields are: [f0] at org.apache.flink.table.catalog.SchemaTranslator.patchDataTypeFromColumn(SchemaTranslator.java:327) at org.apache.flink.table.catalog.SchemaTranslator.patchDataTypeFromDeclaredSchema(SchemaTranslator.java:314) at org.apache.flink.table.catalog.SchemaTranslator.createConsumingResult(SchemaTranslator.java:213) at org.apache.flink.table.catalog.SchemaTranslator.createConsumingResult(SchemaTranslator.java:158) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.fromStreamInternal(StreamTableEnvironmentImpl.java:294) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.java:232) at com.zhiyong.flinkStudy.FlinkTableApiDemo1.main(FlinkTableApiDemo1.java:65) Process finished with exit code 1DSL(Table API)进行批处理
之前进行了流处理,接下来试试批处理。
由于批处理已经不直接使用DataSet,而是使用DataSource,故如下算子已经消失:
tableEnv.fromDataSet(data1);//老版本Flink中,data1是DataSet的实例对象,该API可以从DataSet创建Table类的实例对象 tableEnv.toDataSet(table1);//老版本Flink中,table1是Table的实例对象,该API可以转出DataSet对象
整个DSL方式如果按照如下方式构建TableEnvironment:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironment.create(settings);
将会很鸡肋,这样产生的tableEnv实例对象可用方法很少。
但是可以使用如下方式:
package com.zhiyong.flinkStudy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.expressions.$;
public class FlinkTableApiDemo2 {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(env);
String[] str1 = {"hehe1", "haha1", "哈哈1", "哈哈1"};
Table table1 = streamTableEnv.fromValues(str1);
Table table1_1 = table1.where($("f0").like("%h%"));
DataStream batchTable1 = streamTableEnv.toDataStream(table1_1);
batchTable1.print();
System.out.println("*************************");
DataStreamSource dataStream2 = env.fromElements(str1);
Table table2 = streamTableEnv.fromDataStream(dataStream2);
Table table2_1 = table2.where($("f0").like("%哈%"));
DataStream batchTable2 = streamTableEnv.toDataStream(table2_1);
batchTable2.print();
env.execute();
}
}
执行后:
log4j:WARN No appenders could be found for logger (org.apache.flink.table.module.ModuleManager). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. ************************* 20> +I[哈哈1] 21> +I[哈哈1] 20> +I[hehe1] 21> +I[haha1] Process finished with exit code 0
可以发现,Flink1.14.3中,已经可以直接使用流的方式处理批,而不像Flink1.8老版本那样还区分stream和batch。虽然现在还保留了batch的Env及API,但是已经废弃的差不多了,以后可能再也用不上了。事实证明,在Flink1.14.3中,DSL方式的Table API层面已经可以不用做区分,统一转换为DataStream即可。而DataStream也可以不区分是stream环境的Table还是batch环境的Table。
使用DataStream实现流批一体package com.zhiyong.flinkStudy;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSqlApiDemo1 {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(env);
DataStreamSource data1 = env.addSource(new WordCountSource1ps());
String inputPath = "E:/study/flink/data/test1";
DataStreamSource data2 = env.readTextFile(inputPath);
data1.print("data1");
data2.print("data2");
env.execute();
}
}
执行后:
data1> zhiyong19 data2> 好 data2> 喜欢 data2> 数码宝贝 data2> 宝宝 宝贝 data2> 宝贝 好 喜欢 data2> 123 data2> 123 data2> 123 data2> 哈哈 haha data2> hehe 呵呵 呵呵 呵呵 呵呵 data2> hehe data1> zhiyong17 data1> zhiyong7 data1> zhiyong7 data1> zhiyong5 data1> zhiyong11 data1> zhiyong18 data1> zhiyong14 data1> zhiyong13 data1> zhiyong5 data1> zhiyong8 Process finished with exit code 130
可以看出,Flink1.14.3直接使用DataStream即可。不管是批还是流,直接当作流来处理。
使用DSL(Table API)实现流批一体package com.zhiyong.flinkStudy;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.expressions.$;
public class FlinkSqlApiDemo1 {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(env);
DataStreamSource data1 = env.addSource(new WordCountSource1ps());
String inputPath = "E:/study/flink/data/test1";
DataStreamSource data2 = env.readTextFile(inputPath);
Table streamTable = streamTableEnv.fromDataStream(data1);
Table batchTable = streamTableEnv.fromDataStream(data2);
Table streamTable1 = streamTable.where($("f0").like("%2%"));
Table batchTable1 = batchTable.where($("f0").like("%2%"));
DataStream s1 = streamTableEnv.toDataStream(streamTable1);
DataStream s2 = streamTableEnv.toDataStream(batchTable1);
s1.print();
s2.print();
env.execute();
}
}
执行后:
+I[123] +I[123] +I[123] +I[zhiyong2] +I[zhiyong12] +I[zhiyong12] +I[zhiyong12] +I[zhiyong12] +I[zhiyong12] Process finished with exit code 130
这样我们在使用Flink时,只要运算逻辑一致,就可以使用同一套算子包,不用刻意区分流和批。Flink1.8老版本还需要写2套程序,至少从Flink1.14.3开始,不需要了。代码复用性提高,意味着dev、debug及之后的op工作量大大减少!这一点目前应该是Spark望尘莫及的。
使用SQL实现流批一体由于SQL是Table的更高层封装,更适合不需要关心平台组件底层实现的业务开发者【也就是俗称的SQL Boy】使用,既然Table层面已经实现了流批一体,那么SQL层面必然也可以实现。
package com.zhiyong.flinkStudy;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.expressions.$;
public class FlinkSqlApiDemo1 {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(env);
DataStreamSource data1 = env.addSource(new WordCountSource1ps());
String inputPath = "E:/study/flink/data/test1";
DataStreamSource data2 = env.readTextFile(inputPath);
Table streamTable = streamTableEnv.fromDataStream(data1);
Table batchTable = streamTableEnv.fromDataStream(data2);
Table streamTable1 = streamTable.where($("f0").like("%2%"));
Table batchTable1 = batchTable.where($("f0").like("%2%"));
Table t1 = streamTableEnv.sqlQuery("SeLeCt UPPER(f0) frOm " + streamTable1);
Table t2 = streamTableEnv.sqlQuery("SeLeCt UPPER(f0) frOm " + batchTable1);
DataStream s1 = streamTableEnv.toDataStream(t1);
DataStream s2 = streamTableEnv.toDataStream(t2);
s1.print();
s2.print();
env.execute();
}
}
执行后:
+I[123] +I[123] +I[123] +I[ZHIYONG2] +I[ZHIYONG2] +I[ZHIYONG2] +I[ZHIYONG12] +I[ZHIYONG12] +I[ZHIYONG12] +I[ZHIYONG12] +I[ZHIYONG12] +I[ZHIYONG2] +I[ZHIYONG12] Process finished with exit code 130
同样证明,Flink1.14.3中使用同一套API即可实现SQL方式的流批一体。有了SQL层面的流批一体,写业务代码的SQL Boy们就更无需关心底层实现了。技术的发展,总是让业务人员的技术水平越来越低。。。不过这不是坏事。
总结在Flink1.14.3中,不管是顶层的SQL、次顶层的DSL还是中层的DataStream都可以实现流批一体。SQL调用DSL,DSL调用DataStream,SQL和DSL调用后都是Table对象,而Flink1.14.3中Table和DataStream又可以无缝切换,使用起来灰常方便。较Flink1.8,API大幅变化,可能对SQL Boy们来讲没什么影响,但是对平台及组件二开人员还是造成了一定阻碍,需要再投入时间和精力研习变化。但是从统一了流批API,减少开发、测试、运维工作量来说,付出的代价是值得的。
对懂底层的平台及组件二开人员来说,Flink1.14是个当之无愧的里程碑。至于SQL Boy们,懂也好,不懂也罢。DSL用户不喜欢几千行的SQL,SQL Boy们不喜欢从上到下顺序执行。黑底白斑和白底黑斑的斑马们,可能一时半会儿也不能理解彼此。天之道,不争而善胜。



