日志数据,下面是一行日志信息,已经经过数据清洗写入到
full_access_logs表中
2018-09-04T20:27:31+08:00 http://datacenter.bdqn.cn/logs/user?actionBegin=1536150451617&actionClient=Mozilla%2F5.0+%28Windows+NT+6.1%3B+WOW64%29+AppleWebKit%2F537.36+%28KHTML%2C+like+Gecko%29+Chrome%2F63.0.3239.132+Safari%2F537.36&actionEnd=1536150451705&actionName=viewQuestionAnalysis&actionTest=0&actionType=3&actionValue=272878&clientType=001_bdqn&examType=001&ifEquipment=web&questionId=32415&skillIdCount=0&userSID=EDEC6A9CF8220BE663A22BDD13E428E7.exam-tomcat-node3.exam-tomcat-node3&userUID=272878&userUIP=117.152.82.106 GET 200 192.168.168.63 - - Apache-HttpClient/4.1.2 (java 1.5)
package zk.bn.etlpractice
import java.text.SimpleDateFormat
import java.util.Properties
import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.types.{DecimalType, DoubleType}
import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}
连接数据库,读取数据库中的表,可单独写成一个类,为了方便我就写一起了,工作中应该用面向对象的方法单独写成一个类
object JdbcUtils2 {
//mysql地址
val URL="jdbc:mysql://LinuxIP/YourDatabase"
//驱动
val DRIVER="com.mysql.jdbc.Driver"
//用户名
val USER="**"
//密码
val PWD="**"
//要读取数据库中的表
val TABLE_RETENTION_TABLE="full_access_logs"
//要写入数据库中的表
val TABLE_DAY_RETENTION="day_retention"
//配置文件放入Properties中
private val properties = new Properties()
properties.setProperty("user",USER)
properties.setProperty("password",PWD)
properties.setProperty("driver",DRIVER)
//将dataframe写入到数据中的方法
def dfToMysql(df:Dataframe,table:String,op:Int=1):Unit={
//默认op=1追加
if (op==1){
df.write.mode(SaveMode.Append).jdbc(URL,table,properties)
}
//op等于其他数字等于覆盖
else {
df.write.mode(SaveMode.Overwrite).jdbc(URL,table,properties)
}
}
//从数据库中读取某个表
def get_DF_By_Tb_Name(spark:SparkSession,table:String):Dataframe={
spark.read.jdbc(URL,table,properties)
}
}
object Retention1 {
def main(args: Array[String]): Unit = {
//获得spark 对象
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("retention1").getOrCreate()
//导入隐式类
import spark.implicits._
//导入函数
import org.apache.spark.sql.functions._
//调用读取数据库表的方法,获得一个dataframe对象
val logsDF: Dataframe = JdbcUtils2.get_DF_By_Tb_Name(spark,JdbcUtils2.TABLE_RETENTION_TABLE).cache()
//过滤出来注册过的数据
val filterRigister: Dataframe = logsDF.filter($"actionName" === "Registered")
.withColumnRenamed("event_time", "register_time")
.select($"userUID", $"register_time")
//过滤出登陆过的数据
val filterSignin: Dataframe = logsDF.filter($"actionName" === "Signin")
.withColumnRenamed("event_time", "signin_time")
.select($"userUID", $"signin_time")
//过滤后的两表关联
val joined: Dataframe = filterRigister.join(filterSignin,Seq("userUID"),"left")
//因为要比较时间,所以要用自定义函数将时间转换为时间戳
//1用java的方法
val spdf = new SimpleDateFormat("yyyy-MM-dd")
//自定义函数
val stod: UserDefinedFunction = spark.udf.register("stod", (event_time: String) => {
if (StringUtils.isEmpty(event_time)) {
0
} //判断是不是空
else {
spdf.parse(event_time).getTime
}
})
//获得带有时间戳的数据
val joined2: Dataframe = joined.withColumn("register_time", stod($"register_time"))
.withColumn("signin_time", stod($"signin_time"))
//2用spark自身带的函数也可以解决
// joined.withColumn("register_time",date_format($"register_time","yyyy-MM-dd"))
// .withColumn("register_time2",date_add($"register_time",1))
//比较注册过的用户是否登陆过
val signinNum: Dataframe = joined2.filter($"register_time" + 86400000 === $"signin_time")
.groupBy("register_time")//以时间分组
.agg(countDistinct("userUID").as("signinNum"))//获得当天的登录人数
.select("register_time", "signinNum")
//获得注册过的人数
val registerNum: Dataframe = joined2
.groupBy("register_time")//以时间分组
.agg(countDistinct("userUID").as("registerNum"))//获得注册过的人数
.select("register_time", "registerNum")
//两个表相关联
val reJoinSi: Dataframe = signinNum.join(registerNum,Seq("register_time"))
//计算留存率 先将时间戳的毫秒数转换为秒,用from_unixtime函数将时间戳转换为日期格式
//DecimalType(10,2),10位数保留两位小数
val frame: Dataframe = reJoinSi.select(from_unixtime($"register_time" / 1000, "yyyy-MM-dd").as("day"),
($"signinNum" / $"registerNum").cast(DoubleType).cast(DecimalType(10, 2)).as("percent")
)
//写入数据库
JdbcUtils2.dfToMysql(frame,JdbcUtils2.TABLE_DAY_RETENTION)
println("over")
}
}



