1 .搭建 maven 工程 FlinkDemo
1.1 pom 文件
4.0.0
com.offcn
FlinkDeno
1.0-SNAPSHOT
org.apache.flink
flink-scala_2.11
1.7.0
org.apache.flink
flink-streaming-scala_2.11
1.7.0
net.alchim31.maven
scala-maven-plugin
3.4.6
testCompile
compile
org.apache.maven.plugins
maven-assembly-plugin
3.0.0
jar-with-dependencies
make-assembly
package
single
2.批处理 wordcount
package chapter1
import org.apache.flink.api.scala.{AggregateDataSet, DataSet, ExecutionEnvironment, createTypeInformation}
//批处理
object WordCount {
def main(args: Array[String]): Unit = {
//创建flink批处理程序入口
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//加载数据
val file: DataSet[String] = env.readTextFile("D:\a资料offcn\5.第五阶段\Day08\SparkDay01\资料\data\words.txt")
//切分
val spliFile: DataSet[String] = file.flatMap(_.split(" "))
//每个单词记为1
val wordAndOne: DataSet[(String, Int)] = spliFile.map((_, 1))
//聚合相同的单词 按照String(0)分组
val wordAndCount: AggregateDataSet[(String, Int)] = wordAndOne.groupBy(0).sum(1)
//打印输出
wordAndCount.print()
}
}
结果:
(you,4)
(her,4)
(me,4)
(hello,12)
3. 流处理 wordcount
4.0.0 com.offcn FlinkDeno1.0-SNAPSHOT org.apache.flink flink-scala_2.111.7.0 org.apache.flink flink-streaming-scala_2.111.7.0 net.alchim31.maven scala-maven-plugin3.4.6 testCompile compile org.apache.maven.plugins maven-assembly-plugin3.0.0 jar-with-dependencies make-assembly package single
2.批处理 wordcount
package chapter1
import org.apache.flink.api.scala.{AggregateDataSet, DataSet, ExecutionEnvironment, createTypeInformation}
//批处理
object WordCount {
def main(args: Array[String]): Unit = {
//创建flink批处理程序入口
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//加载数据
val file: DataSet[String] = env.readTextFile("D:\a资料offcn\5.第五阶段\Day08\SparkDay01\资料\data\words.txt")
//切分
val spliFile: DataSet[String] = file.flatMap(_.split(" "))
//每个单词记为1
val wordAndOne: DataSet[(String, Int)] = spliFile.map((_, 1))
//聚合相同的单词 按照String(0)分组
val wordAndCount: AggregateDataSet[(String, Int)] = wordAndOne.groupBy(0).sum(1)
//打印输出
wordAndCount.print()
}
}
结果:
(you,4)
(her,4)
(me,4)
(hello,12)
3. 流处理 wordcount
首先在虚拟机执行:
执行代码:
package chapter1
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
//流处理代码
object StreamWordCount {
def main(args: Array[String]): Unit = {
//创建流处理程序入口
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//接收数据
val file: DataStream[String] = env.socketTextStream("node01", 8888)
//切分
val spliFile: DataStream[String] = file.flatMap(_.split(" "))
//每个单词记为一次
val wordAndOne: DataStream[(String, Int)] = spliFile.map((_, 1))
//聚合
val wordAndCount: DataStream[(String, Int)] = wordAndOne.keyBy(0).sum(1)
//打印输出
wordAndCount.print()
//加execute,处于一直进行的状态
env.execute()
}
}
再返回虚拟机传输数据:
IDEA中返回的结果:
execute用法:
批处理:将数据下沉sink到指定地点需要调用execute方法;
流处理:必须调用execute方法。
4.Flink结构图



