- 案例说明
- flink 1.10 版本 的outputSelector 实现
- flink 1.12 版本 OutputTag 实现
flink 1.10 版本 的outputSelector 实现利用flink source 功能实现一个自定义的实时数据源。
达到的效果是:将实时的商品数据进行分流,分成even 和 odd 两个流进行join。条件是名称相同,最后把join 的结果输出。
public class MyStreamingSource implements SourceFunctionflink 1.12 版本 OutputTag 实现- { private boolean isRunning = true; public void run(SourceContext
- ctx) throws Exception { while(isRunning){ Item item = generateItem(); ctx.collect(item); //每秒产生一条数据 Thread.sleep(1000); } } @Override public void cancel() { isRunning = false; } //随机产生一条商品数据 private Item generateItem(){ int i = new Random().nextInt(100); ArrayList
list = new ArrayList(); list.add("HAT"); list.add("TIE"); list.add("SHOE"); Item item = new Item(); item.setName(list.get(new Random().nextInt(3))); item.setId(i); return item; } } class StreamingDemo { public static void main(String[] args) throws Exception { EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); SingleOutputStreamOperator - source = bsEnv.addSource(new MyStreamingSource()).map(new MapFunction
- () { @Override public Item map(Item item) throws Exception { return item; } }); DataStream
- evenSelect = source.split(new OutputSelector
- () { @Override public Iterable
select(Item value) { List output = new ArrayList<>(); if (value.getId() % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; } }).select("even"); DataStream - oddSelect = source.split(new OutputSelector
- () { @Override public Iterable
select(Item value) { List output = new ArrayList<>(); if (value.getId() % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; } }).select("odd"); bsTableEnv.createTemporaryView("evenTable", evenSelect, "name,id"); bsTableEnv.createTemporaryView("oddTable", oddSelect, "name,id"); Table queryTable = bsTableEnv.sqlQuery("select a.id,a.name,b.id,b.name from evenTable as a join oddTable as b on a.name = b.name"); queryTable.printSchema(); bsTableEnv.toRetractStream(queryTable, TypeInformation.of(new TypeHint >(){})).print(); bsEnv.execute("streaming sql job"); } }
public class MyStreamingSource implements SourceFunction- { private boolean isRunning = true; public void run(SourceContext
- ctx) throws Exception { while(isRunning){ Item item = generateItem(); // System.out.println(item); ctx.collect(item); //每秒产生一条数据 Thread.sleep(1000); } } @Override public void cancel() { isRunning = false; } //随机产生一条商品数据 private Item generateItem(){ int i = new Random().nextInt(100); ArrayList
list = new ArrayList(); list.add("HAT"); list.add("TIE"); list.add("SHOE"); Item item = new Item(); item.setName(list.get(new Random().nextInt(3))); item.setId(i); return item; } } class StreamingDemo { public static void main(String[] args) throws Exception { EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); //首先定义两个sideoutput来保存切分出来的数据 OutputTag - outputTag1 = new OutputTag
- ("even") {};//保存偶数 OutputTag
- outputTag2 = new OutputTag
- ("odd") {};//保存奇数 SingleOutputStreamOperator
- source = bsEnv.addSource(new MyStreamingSource()).map(new MapFunction
- () { @Override public Item map(Item item) throws Exception { return item; } }); SingleOutputStreamOperator
- result= source.process(new ProcessFunction
- () { @Override public void processElement(Item value, Context ctx, Collector
- out) throws Exception { if (value.getId() % 2 == 0) { ctx.output(outputTag1, value); } else { ctx.output(outputTag2, value); } } }); DataStream
- so1 = result.getSideOutput(outputTag1); DataStream
- so2 = result.getSideOutput(outputTag2); bsTableEnv.createTemporaryView("evenTable", so1); bsTableEnv.createTemporaryView("oddTable", so2); Table queryTable = bsTableEnv.sqlQuery("select a.id,a.name,b.id,b.name from evenTable as a join oddTable as b on a.name = b.name"); bsTableEnv.toRetractStream(queryTable, TypeInformation.of(new TypeHint
>(){})).print(); bsEnv.execute("streaming sql job"); } }



