package org.example.scala
import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
object Flink_2021_0323_1443 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1) // 设置并行度
doPlan02(env)
env.execute("Flink_2021_0323_1443")
}
def doPlan01(env: StreamExecutionEnvironment): Unit = {
val file = "G:\workspace01\flink\src\main\resources\test.txt"
val dataStream: DataStream[String] = env.readTextFile(file)
val splitStream: DataStream[(String, Int)] = dataStream
.flatMap(_.toUpperCase.split(" "))
.map((_, 1))
.keyBy(0)
.sum(1)
splitStream.print()
}
def doPlan02(env: StreamExecutionEnvironment): Unit = {
val socketStream: DataStream[String] = env.socketTextStream("192.168.195.178", 9999)
val filterStream:DataStream[String]=socketStream.filter(_.contains('a')) // 过滤条件
filterStream.print()
}
}
声明:本文档仅是自己学习总结,其中有些知识点可能存在错误,若是学友偶然搜到参考,望斟酌后再使用,以免给您带来困扰,若是发现错误也希望您指出更正,在此提前感谢!! 总结过程中要是有些地方借鉴了各路大神成果,您觉得侵犯了您的知识产权,对您有所冒犯,烦请通知鄙人,鄙人将会尽快修正! 邮箱地址:390835164@qq.com



