使用scala代码实现
package test
import org.apache.avro.LogicalTypes.date
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
object UVPV {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf =new SparkConf()
.setAppName("TrackLogAnalyseSpark")
.setMaster("local[2]")
val sc =new SparkContext(sparkConf)
sc.setLogLevel("WARN")
val trackRDD =sc.textFile("",4)
println(s"${trackRDD.count()}")
val filterRDD:RDD[(String,String,String)]=trackRDD
.filter(_.split(" ").length>35)
.map((line=>{
val arr=line.split(" ")
(arr(17).substring(0,10),arr(1),arr(5))
}))
filterRDD.persist(StorageLevel.MEMORY_AND_DISK_SER_2)
val pv =filterRDD
.map{case(date,url,guid)=>(date,url)}
.filter(_._2.trim.length>0)
.map(x=>(x._1,1))
.reduceByKey(_ +_)
pv.foreachPartition(iter=>iter.foreach(println))
val uv:RDD[(String,Int)]=filterRDD
.map{case(date,url,guid)=>(date,guid)}
.filter(_._2.trim.length>0)
.distinct()
.map(x=>(x._1,1))
.reduceByKey(_+_)
uv.foreachPartition(iter=>iter.foreach(println))
println("----------union----------")
val unionRDD :RDD[(String,Int)]=pv.union(uv)
unionRDD.coalesce(1).foreachPartition(iter=>iter.foreach(println))
println("----------join----------")
val joinRDD:RDD[(String,(Int,Int))]=pv.join(uv)
joinRDD.coalesce(1).foreachPartition(f =iter => {
Class.forName("com.mysqk.jdbc.Driver")
joinRDD.foreach{
case(data,(pv1,uv1))=>{
println(s"date=$date,pv=$pv1,uv=$uv1")
}
}
val url="jdbc:mysql://192.168.9.105/"
val username="hivedb"
val password="hivedb"
var conn: Connection = null
try{
conn=DriverManager.getConnection(url,username,password)
val pst: PreparedStatement=conn.prepareStatement("INSERT INTO tb_pvuv_result(data,pv,uv) VALUE(?,?,?)")
iter.foreach {
case (data, (pv, uv)) => {
println(s"date=$date,pv=$pv,uv=$uv")
pst.setString(1, data)
pst.setInt(2, pv)
pst.setInt(3, uv)
pst.executeUpdate()
}
}
}catch{
case e:Exception =>e.printStackTrace()
}finally {
if (conn!=null) conn.close()
}
})
filterRDD.unpersist()
sc.stop()
}
}


![[大数据技术与应用省赛学习记录十一]——模块四(统计该系统的UV与PV存入Mysql中) [大数据技术与应用省赛学习记录十一]——模块四(统计该系统的UV与PV存入Mysql中)](http://www.mshxw.com/aiimages/31/585023.png)
