- 创建工程
- 直接用maven 创建工程
- flink 提供的创建工程的方式
- 添加依赖
- DataSet wordcount
- DataStream wordCount
- flinkSql wordcount
mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.10.0
通过maven工程的三要素,GroupId ArtifactId Version 可以通过maven 命令的方式创建工程。
flink 提供的创建工程的方式curl https://flink.apache.org/q/quickstart.sh | bash -s 1.10.0
其中 1.10.0 指的是flink 的版本号。
添加依赖DataSet wordcountorg.apache.flink flink-java 1.10.0 org.apache.flink flink-streaming-java_2.11 1.10.0
package org.myorg.quickstart;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import java.util.Locale;
public class BatchJob {
public static void main(String[] args) throws Exception {
// set up the batch execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet text = env.fromElements(
"Flink Spark Storm",
"Flink Flink Flink",
"Spark Spark Spark",
"Kafka Kafka Kafka",
"Kafka Kafka Kafka"
);
// 通过Flink 内置的转换函数进行计算
DataSet> counts = text.flatMap(new LineSplitter())
.groupBy(0)
.sum(1);
counts.printToErr();
}
public static final class LineSplitter implements FlatMapFunction> {
@Override
public void flatMap(String value, Collector> out) throws Exception {
//将文本分割
String[] tokens = value.toLowerCase().split("\W+");
for (String token : tokens) {
if (token.length() > 0 ) {
out.collect(new Tuple2(token,1));
}
}
}
}
}
DataStream wordCount
package org.myorg.quickstart;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//监听端口
DataStream text = env.socketTextStream("127.0.0.1", 9000, "n");
// 将接收的数据进行拆分,分组窗口计算聚合再输出
DataStream windowCounts = text.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out) throws Exception {
for (String word : value.split("\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
}).keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction() {
@Override
public WordWithCount reduce(WordWithCount value1, WordWithCount value2) throws Exception {
return new WordWithCount(value1.word, value1.count + value2.count);
}
});
windowCounts.print().setParallelism(1);
env.execute("Flink Streaming Java API Skeleton");
}
public static class WordWithCount {
public String word;
public Long count;
public WordWithCount() {
}
public WordWithCount(String word, Long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + ":" + count;
}
}
}
flinkSql wordcount
package org.myorg.quickstart;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import java.util.ArrayList;
public class FlinkSql {
public static void main(String[] args) throws Exception {
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(fbEnv);
String words = "hello flink hello balckjoker";
String[] split = words.split("\W+");
ArrayList list = new ArrayList();
for (String word : split) {
WordWithCount wordWithCount = new WordWithCount(word,1L);
list.add(wordWithCount);
}
DataSet inpput = fbEnv.fromCollection(list);
Table table = batchTableEnvironment.fromDataSet(inpput);
table.printSchema();
// 注册一个表
batchTableEnvironment.createTemporaryView("wordCount",table);
Table table1 = batchTableEnvironment.sqlQuery("select word as word, sum(frequency) as frequency from wordCount group by word");
DataSet ds3 = batchTableEnvironment.toDataSet(table1,WordWithCount.class);
ds3.printToErr();
}
public static class WordWithCount {
public String word;
public Long frequency;
public WordWithCount() {
}
public WordWithCount(String word, Long frequency) {
this.word = word;
this.frequency = frequency;
}
@Override
public String toString() {
return word + ":" + frequency;
}
}
}
运行结果
root |-- frequency: BIGINT |-- word: STRING WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/D:/Dev/repository/org/apache/flink/flink-core/1.12.0/flink-core-1.12.0.jar) to field java.lang.String.value WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release flink:1 balckjoker:1 hello:2 Process finished with exit code 0



