spark中Dstream、RDD、Dataframe之间的转换实现Dstream中updateStateByKey的功能
spark中Dstream、RDD、Dataframe之间的转换虚拟机端:nc -lk 8888 用于测试
代码在IDEA中运行,从虚拟机nc -lk 8888指令的命令行中接收数据
package sparkstreaming
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataframe, SparkSession}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object Demo3DSToRDD {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.config("spark.sql.shuffle..partitions",2)
.master("local[2]")
.appName("RDD")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val sc: SparkContext = spark.sparkContext
val ssc: StreamingContext = new StreamingContext(sc,Durations.seconds(5))
//读取数据
val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master",8888)
linesDS.foreachRDD(rdd =>{
println("正在执行foreachRDD")
//RDD代码
rdd.flatMap(_.split(","))
.map((_, 1))
.reduceByKey(_ + _)
//.foreach(println)
//将RDD转换成Dataframe
val df: Dataframe = rdd.toDF("lines")
//注册成一张表写SQL
df.createOrReplaceTempView("lines")
df.select(explode(split($"lines",",")) as "word")
.groupBy($"word")
.agg(count($"word" as "c"))
.show()
})
val KvDS: DStream[(String, Int)] = linesDS.transform(rdd => {
val kvRDD: RDD[(String, Int)] = rdd.flatMap(_.split(","))
.map((_, 1))
kvRDD
})
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
实现Dstream中updateStateByKey的功能
package sparkstreaming
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Durations, StreamingContext}
object Demo4DstreamonRDD {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.config("spark.sql.shuffle..partitions",2)
.master("local[2]")
.appName("RDD")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val sc: SparkContext = spark.sparkContext
val ssc: StreamingContext = new StreamingContext(sc,Durations.seconds(5))
//读取数据
val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master",8888)
linesDS.foreachRDD(rdd =>{
val df: Dataframe = rdd.toDF("lines")
val countDF: Dataframe = df
.select(explode(split($"lines", ",")) as "word")
.groupBy($"word")
.agg(count($"word") as "c")
val path = "SparkLearning/src/main/data/stream_count"
val configuration: Configuration = new Configuration()
val fs: FileSystem = FileSystem.get(configuration)
if (fs.exists(new Path(path))){
val beforeDF: Dataframe = spark
.read
.format("csv")
.schema("word string,c long")
.load(path)
val newDF: Dataframe = beforeDF
.union(countDF)
.groupBy($"word")
.agg(sum($"c") as "cc")
.select($"word",$"cc" as "c")
//保存数据
newDF
.write
.mode(SaveMode.Overwrite)
.format("json")
.save("SparkLearning/src/main/data/stream_count_temp")
//修改目录的名称
fs.delete(new Path("SparkLearning/src/main/data/stream_count"))
fs.rename(new Path("SparkLearning/src/main/data/stream_count_temp"),new Path("SparkLearning/src/main/data/stream_count"))
}else{
//保存数据
countDF
.write
.mode(SaveMode.Overwrite)
.format("csv")
.save("SparkLearning/src/main/data/stream_count")
}
})
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}



