SparkStreaming与SparkSQL集成分析数据并将结果存入MySQL
一、前提说明
- 安装了Flume
- 本案例实现流程图:
- 本案例实现的功能是:实现wordcount功能,并将每次的分析结果保存到数据库中
二、实现步骤
- 在MySQL创建top表,就只有两个字段:key和value
- 在pom.xml中确保已经添加了MySQL数据库的驱动
- 编写如下代码
import java.net.InetAddress
import java.sql.DriverManager
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.streaming.{Seconds, StreamingContext}
case class Count(key:String,value:Int)
object FlumeDemoMySQL {
def main(args: Array[String]): Unit = {
//创建一个Context对象: StreamingContext (SparkContext, SQLContext)
val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
val ssc = new StreamingContext(conf,Seconds(5))
// 创建FlumeStream, 特别注意,hostname是windows上的虚拟网卡net8的ip
val flumeDStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc, "niit01", 8888)
// 读取数据进行处理,即进行transform变换
val flatMapDStream: DStream[String] = flumeDStream.flatMap(e => {
val body = e.event.getBody
val str = new String(body.array())
val strings = str.split(" ")
strings
})
val mapDStream = flatMapDStream.map((_, 1))
val spark = SparkSession.builder().getOrCreate()
// 引入spark隐式转换函数
import spark.implicits._
val transformDStream: DStream[Count] = mapDStream.transform(t => {
t.map(x => Count(x._1, x._2))
})
// 打印结果
//transformDStream.print()
transformDStream.foreachRDD(c => {
val dataframe = c.toDF()
dataframe.createOrReplaceTempView("top")
spark.sql("select key,sum(value) as total from top group by key order by total desc")
.foreachPartition(x=>{
//x.foreach(println)
val connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/niit?characterEncoding=UTF-8", "root", "123456")
val preparedStatement = connection.prepareStatement("insert into realtime values(?,?)")
x.foreach(r => {
val key = r.get(0).toString
val value = r.get(1).toString.toInt
println(r.toString())
println("key: " + key + ", value: " + value)
val address = InetAddress.getLocalHost.getHostAddress
val hostName = InetAddress.getLocalHost.getHostName
val threadId = Thread.currentThread().getId
val threadName = Thread.currentThread().getName
println("HostAddress: " + address + ", HostName: " + hostName + ",threadId: " + threadId + ",threadName: " + threadName)
preparedStatement.setString(1,key.toString)
preparedStatement.setInt(2,value.toString.toInt)
preparedStatement.executeUpdate()
})
if (preparedStatement != null) preparedStatement.close()
if (connection != null) connection.close()
})
})
// 开启
ssc.start()
ssc.awaitTermination()
// 关闭资源
spark.stop()
}
}
- 启动程序
- 配置Flume,配置文件内容可参考:Flume之Pull模式
- 编写测试数据到某个文件,保存文件后,将该文件复制到Flume配置文件中所指定的路径
- 查看IDEA程序控制台的结果