栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark用户留存分析:计算用户的次日留存率(次周留存率同理)

Spark用户留存分析:计算用户的次日留存率(次周留存率同理)

日志数据,下面是一行日志信息,已经经过数据清洗写入到

full_access_logs表中

2018-09-04T20:27:31+08:00   http://datacenter.bdqn.cn/logs/user?actionBegin=1536150451617&actionClient=Mozilla%2F5.0+%28Windows+NT+6.1%3B+WOW64%29+AppleWebKit%2F537.36+%28KHTML%2C+like+Gecko%29+Chrome%2F63.0.3239.132+Safari%2F537.36&actionEnd=1536150451705&actionName=viewQuestionAnalysis&actionTest=0&actionType=3&actionValue=272878&clientType=001_bdqn&examType=001&ifEquipment=web&questionId=32415&skillIdCount=0&userSID=EDEC6A9CF8220BE663A22BDD13E428E7.exam-tomcat-node3.exam-tomcat-node3&userUID=272878&userUIP=117.152.82.106  GET    200    192.168.168.63 -  -  Apache-HttpClient/4.1.2 (java 1.5)

 

package zk.bn.etlpractice

import java.text.SimpleDateFormat
import java.util.Properties

import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.types.{DecimalType, DoubleType}
import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}

连接数据库,读取数据库中的表,可单独写成一个类,为了方便我就写一起了,工作中应该用面向对象的方法单独写成一个类
object JdbcUtils2 {
  //mysql地址
  val URL="jdbc:mysql://LinuxIP/YourDatabase"
  //驱动
  val DRIVER="com.mysql.jdbc.Driver"
  //用户名
  val USER="**"
  //密码
  val PWD="**"
  //要读取数据库中的表
  val TABLE_RETENTION_TABLE="full_access_logs"
  //要写入数据库中的表
  val TABLE_DAY_RETENTION="day_retention"
  //配置文件放入Properties中
  private val properties = new Properties()
  properties.setProperty("user",USER)
  properties.setProperty("password",PWD)
  properties.setProperty("driver",DRIVER)

  //将dataframe写入到数据中的方法
  
  def  dfToMysql(df:Dataframe,table:String,op:Int=1):Unit={
    //默认op=1追加
    if (op==1){
      df.write.mode(SaveMode.Append).jdbc(URL,table,properties)
    }
      //op等于其他数字等于覆盖
    else {
      df.write.mode(SaveMode.Overwrite).jdbc(URL,table,properties)
    }
  }

  //从数据库中读取某个表
  
  def get_DF_By_Tb_Name(spark:SparkSession,table:String):Dataframe={
    spark.read.jdbc(URL,table,properties)
  }
}

object Retention1 {

  def main(args: Array[String]): Unit = {
    //获得spark 对象
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("retention1").getOrCreate()
    //导入隐式类
    import spark.implicits._
    //导入函数
    import org.apache.spark.sql.functions._
    //调用读取数据库表的方法,获得一个dataframe对象
    val logsDF: Dataframe = JdbcUtils2.get_DF_By_Tb_Name(spark,JdbcUtils2.TABLE_RETENTION_TABLE).cache()
    //过滤出来注册过的数据
    val filterRigister: Dataframe = logsDF.filter($"actionName" === "Registered")
      .withColumnRenamed("event_time", "register_time")
      .select($"userUID", $"register_time")
    //过滤出登陆过的数据
    val filterSignin: Dataframe = logsDF.filter($"actionName" === "Signin")
      .withColumnRenamed("event_time", "signin_time")
      .select($"userUID", $"signin_time")
    //过滤后的两表关联
    val joined: Dataframe = filterRigister.join(filterSignin,Seq("userUID"),"left")
    //因为要比较时间,所以要用自定义函数将时间转换为时间戳
    //1用java的方法
    val spdf = new SimpleDateFormat("yyyy-MM-dd")
    //自定义函数
    val stod: UserDefinedFunction = spark.udf.register("stod", (event_time: String) => {
      if (StringUtils.isEmpty(event_time)) {
        0
      } //判断是不是空
      else {
        spdf.parse(event_time).getTime
      }
    })
    //获得带有时间戳的数据
    val joined2: Dataframe = joined.withColumn("register_time", stod($"register_time"))
      .withColumn("signin_time", stod($"signin_time"))
    //2用spark自身带的函数也可以解决
//    joined.withColumn("register_time",date_format($"register_time","yyyy-MM-dd"))
//      .withColumn("register_time2",date_add($"register_time",1))
    //比较注册过的用户是否登陆过
    val signinNum: Dataframe = joined2.filter($"register_time" + 86400000 === $"signin_time")
      .groupBy("register_time")//以时间分组
      .agg(countDistinct("userUID").as("signinNum"))//获得当天的登录人数
      .select("register_time", "signinNum")
    //获得注册过的人数
    val registerNum: Dataframe = joined2
      .groupBy("register_time")//以时间分组
      .agg(countDistinct("userUID").as("registerNum"))//获得注册过的人数
      .select("register_time", "registerNum")
    //两个表相关联
    val reJoinSi: Dataframe = signinNum.join(registerNum,Seq("register_time"))
    //计算留存率 先将时间戳的毫秒数转换为秒,用from_unixtime函数将时间戳转换为日期格式
        //DecimalType(10,2),10位数保留两位小数
    val frame: Dataframe = reJoinSi.select(from_unixtime($"register_time" / 1000, "yyyy-MM-dd").as("day"),
      ($"signinNum" / $"registerNum").cast(DoubleType).cast(DecimalType(10, 2)).as("percent")
    )
    //写入数据库
    JdbcUtils2.dfToMysql(frame,JdbcUtils2.TABLE_DAY_RETENTION)
    println("over")
  }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/673578.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号