前言:
由于Structured Streaming是基于SparkSQL,且数据抽象是DataSet,Dataframe,因此创建SparkSession即可
代码:
import org.apache.spark.SparkContext
import org.apache.spark.sql.{Dataframe, Dataset, SparkSession}
object StructuredFromSocket {
def main(args: Array[String]): Unit = {
//创建环境
//由于Structured Streaming是基于SparkSQL,且数据抽象是DataSet,Dataframe,因此创建SparkSession即可
val session:SparkSession=SparkSession.builder().master("local[*]").appName("xiaobai")
//设置分区
.config("spark.sql.shuffle.partitions","4").getOrCreate()
val sc:SparkContext=session.sparkContext
sc.setLogLevel("WARN")
//隐式转换
import session.implicits._
val df:Dataframe = session.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
//把Dataframe数据类型转换为带泛型的DataSet
val ds:Dataset[String]=df.as[String]
val result=ds.flatMap(_.split(" ")).groupBy("value").count().orderBy('count.desc)
//写在控制台,由于是数据流,不能使用show输出到控制台
result.writeStream.format("console").outputMode("complete").start().awaitTermination()
session.stop()
}
}
两次输入
输出



