一、有界流wordcount二、无界流wordcount
一、有界流wordcountpackage com.shinho.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class BoundryWordCount {
public static void main(String[] args) throws Exception {
//1创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource lineDS = env.readTextFile("input/words.txt");
SingleOutputStreamOperator> wordAndOne = lineDS.flatMap((String line, Collector> out) -> {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
//分组
KeyedStream, Tuple> keyBy = wordAndOne.keyBy(0);
//求和
SingleOutputStreamOperator> sum = keyBy.sum(1);
//
sum.print();
//启动执行
env.execute();
}
}
控制台输出结果
1> (xx,1) 7> (kaikai,1) 3> (hello,1) 6> (word,1) 2> (gez,1) 7> (count,1) 3> (hello,2) 3> (hello,3) 3> (hello,4) 6> (word,2)
前面是并行子任务的编码,子任务个数取决于并行度(电脑CPU核数)。同一个任务上才能进行词频叠加。
二、无界流wordcount监听事件
yum install -y nc nc -lk 7777
package com.shinho.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class NoBoundryWordCount {
public static void main(String[] args) throws Exception {
// 创建流式环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.读取文本流
DataStreamSource lineDS = env.socketTextStream("192.168.10.132", 7777);
SingleOutputStreamOperator> wordAndOne = lineDS.flatMap((String line, Collector> out) -> {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
//分组
KeyedStream, Tuple> keyBy = wordAndOne.keyBy(0);
//求和
SingleOutputStreamOperator> sum = keyBy.sum(1);
//
sum.print();
//启动执行
env.execute();
}
}



