栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021.12.15Spark数据分析及处理

2021.12.15Spark数据分析及处理

目录

用例1:数据清洗EtlDemo

 JdbcUtils连接Mysql

 用例2:用户留存分析Retention 

 用例3:活跃用户分析Active


 

用例1:数据清洗EtlDemo

用例1:数据清洗
读入日志文件并转化为RDD[Row]类型                            
按照Tab切割数据
过滤掉字段数量少于8个的
对数据进行清洗
按照第一列和第二列对数据进行去重
过滤掉状态码非200
过滤掉event_time为空的数据
将url按照”&”以及”=”切割
保存数据
将数据写入mysql表中


日志拆分字段:
event_time
url
method
status
sip
user_uip
action_prepend
action_client
package cn.kgc.etl

import org.apache.spark.rdd.RDD
import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataframe, Dataset, Row, SparkSession}

object EtlDemo {
  def main(args: Array[String]): Unit = {

    

    val conf: SparkConf = new SparkConf().setAppName("etlDemo").setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    val sc:SparkContext=spark.sparkContext

    import spark.implicits._

    val rdd: RDD[Row] = sc.textFile("in/test.log").map(x => x.split("t"))
      .filter(x => x.length == 8).map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7)))


    val logs_schame: StructType = StructType(
      Array(
        StructField("event_time", StringType),
        StructField("url", StringType),
        StructField("method", StringType),
        StructField("status", StringType),
        StructField("sip", StringType),
        StructField("user_uip", StringType),
        StructField("action_prepend", StringType),
        StructField("action_client", StringType)
      )
    )
    val logDF: Dataframe = spark.createDataframe(rdd,logs_schame)

    val ds1: Dataset[Row] = logDF.dropDuplicates("event_time","url")
//    println(ds1.count())
    val ds2: Dataset[Row] = logDF.dropDuplicates("event_time")
//    println(ds2.count())
    val ds3: Dataset[Row] = logDF.dropDuplicates("url")
//    println(ds3.count())
    println("------------------------------------------------")

    val filterLogs: Dataset[Row] = logDF.dropDuplicates("event_time", "url")
      .filter(x => x(3) == "200")
      .filter(x => StringUtils.isNotEmpty(x(0).toString))
//    println(filterLogs.count())

    println("---------------------------------")

    val full_logs_rdd: RDD[Row] = filterLogs.map(row => {
      //      val str: String = row.getString(1)
      //      val string: String = row(1).toString
      //      println(str)
      //      str
      val str: String = row.getAs[String]("url")
      val paramsArray: Array[String] = str.split("\?")
      var paramsMap: Map[String, String] = null
      if (paramsArray.length == 2) {
        val tuples: Array[(String, String)] = paramsArray(1).split("&")
          .map(x => x.split("="))
          .filter(x => x.length == 2)
          .map(x => (x(0), x(1)))
        //        tuples.foreach(println)
//        var paramsMap: Map[String, String] = tuples.toMap
                paramsMap=tuples.toMap
      }
      (
        row.getAs[String]("event_time"),
        paramsMap.getOrElse[String]("userSID", ""),
        paramsMap.getOrElse[String]("userUID", ""),
        paramsMap.getOrElse[String]("actionBegin", ""),
        paramsMap.getOrElse[String]("actionEnd", ""),
        paramsMap.getOrElse[String]("actionType", ""),
        paramsMap.getOrElse[String]("actionName", ""),
        paramsMap.getOrElse[String]("actionValue", ""),
        paramsMap.getOrElse[String]("actionTest", ""),
        paramsMap.getOrElse[String]("ifEquipment", ""),
        row.getAs[String]("method"),
        row.getAs[String]("status"),
        row.getAs[String]("sip"),
        row.getAs[String]("user_uip"),
        row.getAs[String]("action_prepend"),
        row.getAs[String]("action_client")
      )
    }).toDF().rdd

    //    ds.printSchema()
    //    ds.show()
        val full_logs_schame: StructType = StructType(
          Array(
            StructField("event_time", StringType),
            StructField("userSID", StringType),
            StructField("userUID", StringType),
            StructField("actionBegin", StringType),
            StructField("actionEnd", StringType),
            StructField("actionType", StringType),
            StructField("actionName", StringType),
            StructField("actionValue", StringType),
            StructField("actionTest", StringType),
            StructField("ifEquipment", StringType),
            StructField("method", StringType),
            StructField("status", StringType),
            StructField("sip", StringType),
            StructField("user_uip", StringType),
            StructField("action_prepend", StringType),
            StructField("action_client", StringType)
          )
        )

    val full_logDF: Dataframe = spark.createDataframe(full_logs_rdd,full_logs_schame)
//    full_logDF.printSchema()
//    full_logDF.show()

    println("将full_logDF写入到数据库")
    JdbcUtils.dataframeToMysql(full_logDF,JdbcUtils.TABLE_FULL_ACCESS_LOGS,0)
    println("将full_logDF写入数据库完成")






  }

}

 JdbcUtils连接Mysql

package cn.kgc.etl

import java.util.Properties

import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}

object JdbcUtils {
  //create database etldemo
  val url="jdbc:mysql://192.168.111.131:3306/etldemo"
  val driver="com.mysql.jdbc.Driver"
  val usr="root"
  val pwd="root"
  val TABLE_FULL_ACCESS_LOGS:String="full_access_logs"
  val TABLE_DAY_RETENTION :String="day_retention"
  val TABLE_DAY_ACTIVE :String="day_active"


  private val properties=new Properties()
  properties.setProperty("user",usr)
  properties.setProperty("password",pwd)
  properties.setProperty("driver",driver)

  

  def dataframeToMysql(df:Dataframe,table:String,op:Int=1):Unit={
    if(op==1)
    df.write.mode(SaveMode.Append).jdbc(url,table,properties)
    else
      df.write.mode(SaveMode.Overwrite).jdbc(url,table,properties)
  }

//  def getDataframeMysqlByTableName
  
  def getDFByTableName(spark:SparkSession,table:String):Dataframe={
    val frame: Dataframe = spark.read.jdbc(url,table,properties)
    frame
  }

}

 用例2:用户留存分析Retention 

计算用户的次日留存率
求当天新增用户总数n
求当天新增的用户ID与次日登录的用户ID的交集,得出新增用户次日登录总数m (次日留存数)
m/n*100%
计算用户的次周留存率
package cn.kgc.etl

import java.text.SimpleDateFormat

import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.types.{DecimalType, DoubleType}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataframe, SparkSession}
//留存率
object Retention {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setAppName("etlDemo").setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    val sc:SparkContext=spark.sparkContext

    import spark.implicits._
    import org.apache.spark.sql.functions._


    val logs: Dataframe = JdbcUtils.getDFByTableName(spark,JdbcUtils.TABLE_FULL_ACCESS_LOGS).cache()
//    logs.printSchema()
//    logs.show()

    val registered: Dataframe = logs.filter($"actionName" === "Registered")
      .withColumnRenamed("event_time", "register_time")
      .select("userUID", "register_time")

    val signin: Dataframe = logs.filter($"actionName" === "Signin")
      .withColumnRenamed("event_time", "signin_time")
      .select("userUID", "signin_time")


    val joined: Dataframe = registered.join(signin,Seq("userUID"),"left")
    joined.show(false)


    val spdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
//    val spdf2: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
//    println(spdf.parse(""))

    val stod: UserDefinedFunction = spark.udf.register("stod", (event_time: String) => {
      if (StringUtils.isEmpty(event_time))
        0
      else
        spdf.parse(event_time).getTime
    })

    val joined2: Dataframe = joined.withColumn("register_time", stod($"register_time"))
      .withColumn("signin_time", stod($"signin_time"))

    //求出注册后 注册日期后一天登陆的人数 355
    val signumDF: Dataframe = joined2.filter($"register_time" + 86400000 === $"signin_time")
      .groupBy("register_time")
      .agg(countDistinct("userUID").as("signinNum"))
    signumDF.printSchema()
    signumDF.show()

    //求出 注册日当天 新注册用户 381
    val registernumDF: Dataframe = joined2.groupBy("register_time")
      .agg(countDistinct($"userUID").as("registerNum"))
    registernumDF.printSchema()
    registernumDF.show()

    val registerJoinDF: Dataframe = signumDF.join(registernumDF,"register_time")  //必须使用inner join
    registerJoinDF.printSchema()
    registerJoinDF.show()

    val frame: Dataframe = registerJoinDF.select(from_unixtime($"register_time"/1000,"yyyy-MM-dd").as("register_time"),
      ($"signinNum"/$"registerNum".cast(DoubleType)).cast(DecimalType(10,2)).as("percent"))

    println("写入数据库")
    JdbcUtils.dataframeToMysql(frame,JdbcUtils.TABLE_DAY_RETENTION,0)
    println("写入数据库成功")






//    import org.apache.spark.sql.functions._
//    joined.withColumn("register_time",
//      date_format($"register_time","yyyy-MM-dd"))
//      .withColumn("register_time2",date_add($"register_time",1))






  }

}

 用例3:活跃用户分析Active

package cn.kgc.etl

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataframe, Dataset, SparkSession}

object Active {
  def main(args: Array[String]): Unit = {

      val conf: SparkConf = new SparkConf().setAppName("etlDemo").setMaster("local[*]")
      val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
      val sc:SparkContext=spark.sparkContext

      import spark.implicits._


    val logs: Dataframe = JdbcUtils.getDFByTableName(spark,JdbcUtils.TABLE_FULL_ACCESS_LOGS).cache()
//        logs.printSchema()
//        logs.show()

    

    val ds2: Dataset[(String, String)] = logs.filter($"actionName" === "StartLearn" || $"actionName" === "BuyCourse")
      .map(x => {
        (
          x.getAs[String]("userUID"),
          x.getAs[String]("event_time").substring(0, 10)
        )
      })

    import org.apache.spark.sql.functions._
    val frame:Dataframe=ds2.withColumnRenamed("_1","userid")
      .withColumnRenamed("_2","date")
      .groupBy("date")
      .agg(countDistinct("userid").as("activeNum"))
    println("写入数据库")
    JdbcUtils.dataframeToMysql(frame,JdbcUtils.TABLE_DAY_ACTIVE)
    println("写入数据库完成")


  }

}

 

 

 

 

 

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/671054.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号