栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

[大数据技术与应用省赛学习记录十一]——模块四(统计该系统的UV与PV存入Mysql中)

[大数据技术与应用省赛学习记录十一]——模块四(统计该系统的UV与PV存入Mysql中)

使用scala代码实现

package test
import org.apache.avro.LogicalTypes.date

import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

object UVPV {



  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf =new SparkConf()
      .setAppName("TrackLogAnalyseSpark")
      .setMaster("local[2]")

    val sc =new SparkContext(sparkConf)
    sc.setLogLevel("WARN")

    val trackRDD =sc.textFile("",4)
    println(s"${trackRDD.count()}")

    val filterRDD:RDD[(String,String,String)]=trackRDD
      .filter(_.split(" ").length>35)
      .map((line=>{
      val arr=line.split(" ")
        (arr(17).substring(0,10),arr(1),arr(5))
    }))
    filterRDD.persist(StorageLevel.MEMORY_AND_DISK_SER_2)

    val pv =filterRDD
      .map{case(date,url,guid)=>(date,url)}
      .filter(_._2.trim.length>0)
      .map(x=>(x._1,1))
      .reduceByKey(_ +_)

    pv.foreachPartition(iter=>iter.foreach(println))

      val uv:RDD[(String,Int)]=filterRDD
        .map{case(date,url,guid)=>(date,guid)}
        .filter(_._2.trim.length>0)
        .distinct()
        .map(x=>(x._1,1))
        .reduceByKey(_+_)
    uv.foreachPartition(iter=>iter.foreach(println))

  println("----------union----------")
      val unionRDD :RDD[(String,Int)]=pv.union(uv)
      unionRDD.coalesce(1).foreachPartition(iter=>iter.foreach(println))

    println("----------join----------")
    val  joinRDD:RDD[(String,(Int,Int))]=pv.join(uv)
    joinRDD.coalesce(1).foreachPartition(f =iter => {
        Class.forName("com.mysqk.jdbc.Driver")
    joinRDD.foreach{
      case(data,(pv1,uv1))=>{
        println(s"date=$date,pv=$pv1,uv=$uv1")
      }
    }

      val url="jdbc:mysql://192.168.9.105/"
      val username="hivedb"
      val password="hivedb"

      var conn: Connection = null
      try{
        conn=DriverManager.getConnection(url,username,password)
        val pst: PreparedStatement=conn.prepareStatement("INSERT INTO tb_pvuv_result(data,pv,uv) VALUE(?,?,?)")
        iter.foreach {
          case (data, (pv, uv)) => {
            println(s"date=$date,pv=$pv,uv=$uv")
            pst.setString(1, data)
            pst.setInt(2, pv)
            pst.setInt(3, uv)
            pst.executeUpdate()
          }
        }
        }catch{
          case e:Exception =>e.printStackTrace()
        }finally {
          if (conn!=null) conn.close()
        }
      })
      filterRDD.unpersist()
      sc.stop()
  }
  }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/585023.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号