package com.hmi1024.flink.example;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class BatchWordCount {
public static void main(String[] args) throws Exception {
//初始化flink批处理的运行环境(获取到当前环境,如果本地运行获取local环境)
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//指定文件路径,获取文件数据
final DataSource lines = env.readTextFile("./data/input/wordcount.txt");
//对获取到的数据进行空格拆分
//map与flatmap的区别
//String:传入值类型
//String:返回值类型
final FlatMapOperator words = lines.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String line, Collector out) throws Exception {
//将每行字符串进行空格拆分
final String[] dataArray = line.split(" ");
//循环遍历字符串数组
for (String word : dataArray) {
//需要使用out进行返回数据
out.collect(word);
}
}
});
final MapOperator> wordAndOne = words.map(new MapFunction>() {
@Override
public Tuple2 map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});
//对相同的单词进行分组操作
final UnsortedGrouping> grouped = wordAndOne.groupBy(0);
//对分组后的数据进行累加操作
final AggregateOperator> summed = grouped.sum(1);
//打印输出(测试)
summed.print();
//todo 8)启动作业,递交任务
//在批处理开发中以下方法会触发作业的递交操作,故无需 env.execute()
//'execute()', 'count()', 'collect()', or 'print()'.
//env.execute();
}
}
package com.hmi1024.flink.example;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamWordCount {
public static void main(String[] args) throws Exception {
//初始化flink流处理的运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(2);
//指定socket数据源,获取数据
final DataStreamSource socketTextStream = env.socketTextStream("node1", 9999);
//对获取到的数据进行空格拆分
final SingleOutputStreamOperator words = socketTextStream.flatMap(
new FlatMapFunction() {
@Override
public void flatMap(String line, Collector out) throws Exception {
//将每行字符串进行空格拆分
final String[] dataArray = line.split(" ");
//循环遍历字符串数组
for (String word : dataArray) {
//需要使用out进行返回数据
out.collect(word);
}
}
});
//对拆分的单词进行计数,每个单词记一次数
final SingleOutputStreamOperator> wordAndOne = words.map(
new MapFunction>() {
@Override
public Tuple2 map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});
//对相同的单词进行分组操作
final KeyedStream, Tuple> keyedStream = wordAndOne.keyBy(0);
//对分组后的数据进行累加操作
final SingleOutputStreamOperator> summed = keyedStream.sum(1);
//打印输出(测试)
summed.print();
//启动作业,递交任务
env.execute();
}
}