目录
用例1:数据清洗EtlDemo
JdbcUtils连接Mysql
用例2:用户留存分析Retention
用例3:活跃用户分析Active
用例1:数据清洗EtlDemo
用例1:数据清洗 读入日志文件并转化为RDD[Row]类型 按照Tab切割数据 过滤掉字段数量少于8个的 对数据进行清洗 按照第一列和第二列对数据进行去重 过滤掉状态码非200 过滤掉event_time为空的数据 将url按照”&”以及”=”切割 保存数据 将数据写入mysql表中 日志拆分字段: event_time url method status sip user_uip action_prepend action_client
package cn.kgc.etl
import org.apache.spark.rdd.RDD
import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataframe, Dataset, Row, SparkSession}
object EtlDemo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("etlDemo").setMaster("local[*]")
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val sc:SparkContext=spark.sparkContext
import spark.implicits._
val rdd: RDD[Row] = sc.textFile("in/test.log").map(x => x.split("t"))
.filter(x => x.length == 8).map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7)))
val logs_schame: StructType = StructType(
Array(
StructField("event_time", StringType),
StructField("url", StringType),
StructField("method", StringType),
StructField("status", StringType),
StructField("sip", StringType),
StructField("user_uip", StringType),
StructField("action_prepend", StringType),
StructField("action_client", StringType)
)
)
val logDF: Dataframe = spark.createDataframe(rdd,logs_schame)
val ds1: Dataset[Row] = logDF.dropDuplicates("event_time","url")
// println(ds1.count())
val ds2: Dataset[Row] = logDF.dropDuplicates("event_time")
// println(ds2.count())
val ds3: Dataset[Row] = logDF.dropDuplicates("url")
// println(ds3.count())
println("------------------------------------------------")
val filterLogs: Dataset[Row] = logDF.dropDuplicates("event_time", "url")
.filter(x => x(3) == "200")
.filter(x => StringUtils.isNotEmpty(x(0).toString))
// println(filterLogs.count())
println("---------------------------------")
val full_logs_rdd: RDD[Row] = filterLogs.map(row => {
// val str: String = row.getString(1)
// val string: String = row(1).toString
// println(str)
// str
val str: String = row.getAs[String]("url")
val paramsArray: Array[String] = str.split("\?")
var paramsMap: Map[String, String] = null
if (paramsArray.length == 2) {
val tuples: Array[(String, String)] = paramsArray(1).split("&")
.map(x => x.split("="))
.filter(x => x.length == 2)
.map(x => (x(0), x(1)))
// tuples.foreach(println)
// var paramsMap: Map[String, String] = tuples.toMap
paramsMap=tuples.toMap
}
(
row.getAs[String]("event_time"),
paramsMap.getOrElse[String]("userSID", ""),
paramsMap.getOrElse[String]("userUID", ""),
paramsMap.getOrElse[String]("actionBegin", ""),
paramsMap.getOrElse[String]("actionEnd", ""),
paramsMap.getOrElse[String]("actionType", ""),
paramsMap.getOrElse[String]("actionName", ""),
paramsMap.getOrElse[String]("actionValue", ""),
paramsMap.getOrElse[String]("actionTest", ""),
paramsMap.getOrElse[String]("ifEquipment", ""),
row.getAs[String]("method"),
row.getAs[String]("status"),
row.getAs[String]("sip"),
row.getAs[String]("user_uip"),
row.getAs[String]("action_prepend"),
row.getAs[String]("action_client")
)
}).toDF().rdd
// ds.printSchema()
// ds.show()
val full_logs_schame: StructType = StructType(
Array(
StructField("event_time", StringType),
StructField("userSID", StringType),
StructField("userUID", StringType),
StructField("actionBegin", StringType),
StructField("actionEnd", StringType),
StructField("actionType", StringType),
StructField("actionName", StringType),
StructField("actionValue", StringType),
StructField("actionTest", StringType),
StructField("ifEquipment", StringType),
StructField("method", StringType),
StructField("status", StringType),
StructField("sip", StringType),
StructField("user_uip", StringType),
StructField("action_prepend", StringType),
StructField("action_client", StringType)
)
)
val full_logDF: Dataframe = spark.createDataframe(full_logs_rdd,full_logs_schame)
// full_logDF.printSchema()
// full_logDF.show()
println("将full_logDF写入到数据库")
JdbcUtils.dataframeToMysql(full_logDF,JdbcUtils.TABLE_FULL_ACCESS_LOGS,0)
println("将full_logDF写入数据库完成")
}
}
JdbcUtils连接Mysql
package cn.kgc.etl
import java.util.Properties
import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}
object JdbcUtils {
//create database etldemo
val url="jdbc:mysql://192.168.111.131:3306/etldemo"
val driver="com.mysql.jdbc.Driver"
val usr="root"
val pwd="root"
val TABLE_FULL_ACCESS_LOGS:String="full_access_logs"
val TABLE_DAY_RETENTION :String="day_retention"
val TABLE_DAY_ACTIVE :String="day_active"
private val properties=new Properties()
properties.setProperty("user",usr)
properties.setProperty("password",pwd)
properties.setProperty("driver",driver)
def dataframeToMysql(df:Dataframe,table:String,op:Int=1):Unit={
if(op==1)
df.write.mode(SaveMode.Append).jdbc(url,table,properties)
else
df.write.mode(SaveMode.Overwrite).jdbc(url,table,properties)
}
// def getDataframeMysqlByTableName
def getDFByTableName(spark:SparkSession,table:String):Dataframe={
val frame: Dataframe = spark.read.jdbc(url,table,properties)
frame
}
}
用例2:用户留存分析Retention
计算用户的次日留存率 求当天新增用户总数n 求当天新增的用户ID与次日登录的用户ID的交集,得出新增用户次日登录总数m (次日留存数) m/n*100% 计算用户的次周留存率
package cn.kgc.etl
import java.text.SimpleDateFormat
import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.types.{DecimalType, DoubleType}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataframe, SparkSession}
//留存率
object Retention {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("etlDemo").setMaster("local[*]")
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val sc:SparkContext=spark.sparkContext
import spark.implicits._
import org.apache.spark.sql.functions._
val logs: Dataframe = JdbcUtils.getDFByTableName(spark,JdbcUtils.TABLE_FULL_ACCESS_LOGS).cache()
// logs.printSchema()
// logs.show()
val registered: Dataframe = logs.filter($"actionName" === "Registered")
.withColumnRenamed("event_time", "register_time")
.select("userUID", "register_time")
val signin: Dataframe = logs.filter($"actionName" === "Signin")
.withColumnRenamed("event_time", "signin_time")
.select("userUID", "signin_time")
val joined: Dataframe = registered.join(signin,Seq("userUID"),"left")
joined.show(false)
val spdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
// val spdf2: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
// println(spdf.parse(""))
val stod: UserDefinedFunction = spark.udf.register("stod", (event_time: String) => {
if (StringUtils.isEmpty(event_time))
0
else
spdf.parse(event_time).getTime
})
val joined2: Dataframe = joined.withColumn("register_time", stod($"register_time"))
.withColumn("signin_time", stod($"signin_time"))
//求出注册后 注册日期后一天登陆的人数 355
val signumDF: Dataframe = joined2.filter($"register_time" + 86400000 === $"signin_time")
.groupBy("register_time")
.agg(countDistinct("userUID").as("signinNum"))
signumDF.printSchema()
signumDF.show()
//求出 注册日当天 新注册用户 381
val registernumDF: Dataframe = joined2.groupBy("register_time")
.agg(countDistinct($"userUID").as("registerNum"))
registernumDF.printSchema()
registernumDF.show()
val registerJoinDF: Dataframe = signumDF.join(registernumDF,"register_time") //必须使用inner join
registerJoinDF.printSchema()
registerJoinDF.show()
val frame: Dataframe = registerJoinDF.select(from_unixtime($"register_time"/1000,"yyyy-MM-dd").as("register_time"),
($"signinNum"/$"registerNum".cast(DoubleType)).cast(DecimalType(10,2)).as("percent"))
println("写入数据库")
JdbcUtils.dataframeToMysql(frame,JdbcUtils.TABLE_DAY_RETENTION,0)
println("写入数据库成功")
// import org.apache.spark.sql.functions._
// joined.withColumn("register_time",
// date_format($"register_time","yyyy-MM-dd"))
// .withColumn("register_time2",date_add($"register_time",1))
}
}
用例3:活跃用户分析Active
package cn.kgc.etl
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataframe, Dataset, SparkSession}
object Active {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("etlDemo").setMaster("local[*]")
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val sc:SparkContext=spark.sparkContext
import spark.implicits._
val logs: Dataframe = JdbcUtils.getDFByTableName(spark,JdbcUtils.TABLE_FULL_ACCESS_LOGS).cache()
// logs.printSchema()
// logs.show()
val ds2: Dataset[(String, String)] = logs.filter($"actionName" === "StartLearn" || $"actionName" === "BuyCourse")
.map(x => {
(
x.getAs[String]("userUID"),
x.getAs[String]("event_time").substring(0, 10)
)
})
import org.apache.spark.sql.functions._
val frame:Dataframe=ds2.withColumnRenamed("_1","userid")
.withColumnRenamed("_2","date")
.groupBy("date")
.agg(countDistinct("userid").as("activeNum"))
println("写入数据库")
JdbcUtils.dataframeToMysql(frame,JdbcUtils.TABLE_DAY_ACTIVE)
println("写入数据库完成")
}
}



