栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark数据分析及处理

Spark数据分析及处理

使用Spark完成下列日志分析项目需求: 日志数据清洗 用户留存分析 活跃用户分析
import org.apache.commons.lang.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql._

import scala.reflect.io.Path


object EtlDemo {

  def dataframeToPath(df:Dataframe,ph:String,op:Int=1):Unit={
    if (op==1)
      df.write.mode(SaveMode.Append).save(ph)
    else
      df.write.mode(SaveMode.Overwrite).save(ph)
  }

  def getDFByPath(sparkSession: SparkSession,ph:String):Dataframe={
    val frame: Dataframe = sparkSession.read.load(ph)
    frame
  }


  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = SparkSession.builder().appName("eltDemo").master("local[*]").getOrCreate()
    val sc: SparkContext = sparkSession.sparkContext

    import sparkSession.implicits._

    
    val rowRDD: RDD[Row] = sc.textFile("in/test.log").map(x => x.split("t"))
      .filter(x => x.length == 8)
      .map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7)))

    val logs_schema = StructType(
      Array(
        StructField("event_time", StringType),
        StructField("url", StringType),
        StructField("method", StringType),
        StructField("status", StringType),
        StructField("sip", StringType),
        StructField("user_uip", StringType),
        StructField("action_prepend", StringType),
        StructField("action_client", StringType)
      )
    )

    

    val logDF: Dataframe = sparkSession.createDataframe(rowRDD,logs_schema)
//    logDF.printSchema()
//    logDF.show()
//    println(logDF.count()) //19163
//    val ds2: Dataset[Row] = logDF.dropDuplicates("event_time")  //列去重
//    println(ds2.count())  //922

    val filterLogs: Dataset[Row] = logDF.dropDuplicates("event_time", "url")
      .filter(x => x(3) == "200")
      .filter(x => StringUtils.isNotEmpty(x(0).toString))

    val full_logs_rdd: RDD[Row] = filterLogs.map(row => {
      val str: String = row.getAs[String]("url")
      val paramsArray: Array[String] = str.split("\?")
      var paramsMap: Map[String, String] = null
      if (paramsArray.length == 2) {
        val tuples: Array[(String, String)] = paramsArray(1).split("&").map(x => x.split("="))
          .filter(x => x.length == 2).map(x => (x(0), x(1)))
        paramsMap = tuples.toMap
      }
      (
        row.getAs[String]("event_time"),
        paramsMap.getOrElse[String]("userSID", ""),
        paramsMap.getOrElse[String]("userUID", ""),
        paramsMap.getOrElse[String]("actionBegin", ""),
        paramsMap.getOrElse[String]("actionEnd", ""),
        paramsMap.getOrElse[String]("actionType", ""),
        paramsMap.getOrElse[String]("actionName", ""),
        paramsMap.getOrElse[String]("actionValue", ""),
        paramsMap.getOrElse[String]("actionTest", ""),
        paramsMap.getOrElse[String]("ifEquipment", ""),
        row.getAs[String]("method"),
        row.getAs[String]("status"),
        row.getAs[String]("sip"),
        row.getAs[String]("user_uip"),
        row.getAs[String]("action_prepend"),
        row.getAs[String]("action_client")
      )
    }).toDF().rdd
    full_logs_rdd

    val full_log_schema = StructType(
      Array(
        StructField("event_time", StringType),
        StructField("userSID", StringType),
        StructField("userUID", StringType),
        StructField("actionBegin", StringType),
        StructField("actionEnd", StringType),
        StructField("actionType", StringType),
        StructField("actionName", StringType),
        StructField("actionValue", StringType),
        StructField("actionTest", StringType),
        StructField("ifEquipment", StringType),
        StructField("method", StringType),
        StructField("status", StringType),
        StructField("sip", StringType),
        StructField("user_uip", StringType),
        StructField("action_prepend", StringType),
        StructField("action_client", StringType)
      )
    )

    val full_logDF: Dataframe = sparkSession.createDataframe(full_logs_rdd,full_log_schema)
//    full_logDF.printSchema()
//    full_logDF.show()
    
    //    println("将full_logDF写到入本地")
//    dataframeToPath(full_logDF,"in/save",1)
//    println("将full_logDF写到入本地完成")

    getDFByPath(sparkSession,"in/save").show()
  }
}
import java.text.SimpleDateFormat

import org.apache.commons.lang.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.types.{DecimalType, DoubleType}
import org.apache.spark.sql.{Dataframe, SparkSession}


object Retention {
  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = SparkSession.builder().appName("elt.retention").master("local[*]").getOrCreate()
    val sc: SparkContext = sparkSession.sparkContext

    import sparkSession.implicits._

    val logs: Dataframe = EtlDemo.getDFByPath(sparkSession,"in/save").cache()

    // 注册时间
    val registered: Dataframe = logs.filter($"actionName" === "Registered")
      .withColumnRenamed("event_time", "register_time")
      .select("userUID", "register_time")

    //登陆时间
    val singin: Dataframe = logs.filter($"actionName" === "Signin")
      .withColumnRenamed("event_time", "signin_time")
      .select("userUID", "signin_time")

    //  userUID  register_time  signin_time
    val joined: Dataframe = registered.join(singin,Seq("userUID"),"left")
//    joined.printSchema()
//    joined.show(false)

    import org.apache.spark.sql.functions._

    val spdf = new SimpleDateFormat("yyyy-MM-dd")

    val stod: UserDefinedFunction = sparkSession.udf.register("stod", (event_time: String) => {
      if (StringUtils.isEmpty(event_time))
        0
      else
        spdf.parse(event_time).getTime
    })

    //转换列的值
    val joined2: Dataframe = joined.withColumn("register_time", stod($"register_time"))
      .withColumn("signin_time", stod($"signin_time"))

//    求出 注册日期后一天登陆的人数  355
    val signumDF: Dataframe = joined2.filter($"register_time" + 86400000 === $"signin_time")
      .groupBy("register_time")
      .agg(countDistinct("userUID").as("signinNum"))
    signumDF.printSchema()
    signumDF.show()

    println("-------------------")

    // 求出 注册日当天 新注册用户  381
    val registernumDF: Dataframe = joined2.groupBy("register_time")
      .agg(countDistinct($"userUID").as("registerNum"))
    registernumDF.show()
    registernumDF.printSchema()

    val registerJoinSignDF: Dataframe = signumDF.join(registernumDF,Seq("register_time"))
    registerJoinSignDF.printSchema()
    registerJoinSignDF.show()

    val day_retention: Dataframe = registerJoinSignDF.select(
      from_unixtime($"register_time"/1000,"yyyy-MM-dd").as("register_time"),
      ($"signinNum" / $"registerNum".cast(DoubleType))
        .cast(DecimalType(10, 2)).as("percent"))
    day_retention.show()

    EtlDemo.dataframeToPath(day_retention,"in/save2",0)
  }
}
import org.apache.spark.SparkContext
import org.apache.spark.sql.{Dataframe, Dataset, SparkSession}


object Active {
  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = SparkSession.builder().appName("elt.retention").master("local[*]").getOrCreate()
    val sc: SparkContext = sparkSession.sparkContext

    import sparkSession.implicits._

    val logs: Dataframe = EtlDemo.getDFByPath(sparkSession,"in/save").cache()
    logs.show(100)

    val ds2: Dataset[(String, String)] = logs.filter($"actionName" === "StartLearn" || $"actionName" === "ByCourse")
      .map(x => {
        (
          x.getAs[String]("userUID"),
          x.getAs[String]("event_time").substring(0, 10)
        )
      })
    ds2.show()

    import org.apache.spark.sql.functions._

    val frame: Dataframe = ds2.withColumnRenamed("_1", "userid")
      .withColumnRenamed("_2", "date")
      .groupBy("date").agg(countDistinct("userid").as("activeNum"))
    frame.show()
    frame.printSchema()

    EtlDemo.dataframeToPath(frame,"in/save3",0)

  }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/671368.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号