使用Spark完成下列日志分析项目需求: 日志数据清洗 用户留存分析 活跃用户分析
import org.apache.commons.lang.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql._
import scala.reflect.io.Path
object EtlDemo {
def dataframeToPath(df:Dataframe,ph:String,op:Int=1):Unit={
if (op==1)
df.write.mode(SaveMode.Append).save(ph)
else
df.write.mode(SaveMode.Overwrite).save(ph)
}
def getDFByPath(sparkSession: SparkSession,ph:String):Dataframe={
val frame: Dataframe = sparkSession.read.load(ph)
frame
}
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder().appName("eltDemo").master("local[*]").getOrCreate()
val sc: SparkContext = sparkSession.sparkContext
import sparkSession.implicits._
val rowRDD: RDD[Row] = sc.textFile("in/test.log").map(x => x.split("t"))
.filter(x => x.length == 8)
.map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7)))
val logs_schema = StructType(
Array(
StructField("event_time", StringType),
StructField("url", StringType),
StructField("method", StringType),
StructField("status", StringType),
StructField("sip", StringType),
StructField("user_uip", StringType),
StructField("action_prepend", StringType),
StructField("action_client", StringType)
)
)
val logDF: Dataframe = sparkSession.createDataframe(rowRDD,logs_schema)
// logDF.printSchema()
// logDF.show()
// println(logDF.count()) //19163
// val ds2: Dataset[Row] = logDF.dropDuplicates("event_time") //列去重
// println(ds2.count()) //922
val filterLogs: Dataset[Row] = logDF.dropDuplicates("event_time", "url")
.filter(x => x(3) == "200")
.filter(x => StringUtils.isNotEmpty(x(0).toString))
val full_logs_rdd: RDD[Row] = filterLogs.map(row => {
val str: String = row.getAs[String]("url")
val paramsArray: Array[String] = str.split("\?")
var paramsMap: Map[String, String] = null
if (paramsArray.length == 2) {
val tuples: Array[(String, String)] = paramsArray(1).split("&").map(x => x.split("="))
.filter(x => x.length == 2).map(x => (x(0), x(1)))
paramsMap = tuples.toMap
}
(
row.getAs[String]("event_time"),
paramsMap.getOrElse[String]("userSID", ""),
paramsMap.getOrElse[String]("userUID", ""),
paramsMap.getOrElse[String]("actionBegin", ""),
paramsMap.getOrElse[String]("actionEnd", ""),
paramsMap.getOrElse[String]("actionType", ""),
paramsMap.getOrElse[String]("actionName", ""),
paramsMap.getOrElse[String]("actionValue", ""),
paramsMap.getOrElse[String]("actionTest", ""),
paramsMap.getOrElse[String]("ifEquipment", ""),
row.getAs[String]("method"),
row.getAs[String]("status"),
row.getAs[String]("sip"),
row.getAs[String]("user_uip"),
row.getAs[String]("action_prepend"),
row.getAs[String]("action_client")
)
}).toDF().rdd
full_logs_rdd
val full_log_schema = StructType(
Array(
StructField("event_time", StringType),
StructField("userSID", StringType),
StructField("userUID", StringType),
StructField("actionBegin", StringType),
StructField("actionEnd", StringType),
StructField("actionType", StringType),
StructField("actionName", StringType),
StructField("actionValue", StringType),
StructField("actionTest", StringType),
StructField("ifEquipment", StringType),
StructField("method", StringType),
StructField("status", StringType),
StructField("sip", StringType),
StructField("user_uip", StringType),
StructField("action_prepend", StringType),
StructField("action_client", StringType)
)
)
val full_logDF: Dataframe = sparkSession.createDataframe(full_logs_rdd,full_log_schema)
// full_logDF.printSchema()
// full_logDF.show()
// println("将full_logDF写到入本地")
// dataframeToPath(full_logDF,"in/save",1)
// println("将full_logDF写到入本地完成")
getDFByPath(sparkSession,"in/save").show()
}
}
import java.text.SimpleDateFormat
import org.apache.commons.lang.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.types.{DecimalType, DoubleType}
import org.apache.spark.sql.{Dataframe, SparkSession}
object Retention {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder().appName("elt.retention").master("local[*]").getOrCreate()
val sc: SparkContext = sparkSession.sparkContext
import sparkSession.implicits._
val logs: Dataframe = EtlDemo.getDFByPath(sparkSession,"in/save").cache()
// 注册时间
val registered: Dataframe = logs.filter($"actionName" === "Registered")
.withColumnRenamed("event_time", "register_time")
.select("userUID", "register_time")
//登陆时间
val singin: Dataframe = logs.filter($"actionName" === "Signin")
.withColumnRenamed("event_time", "signin_time")
.select("userUID", "signin_time")
// userUID register_time signin_time
val joined: Dataframe = registered.join(singin,Seq("userUID"),"left")
// joined.printSchema()
// joined.show(false)
import org.apache.spark.sql.functions._
val spdf = new SimpleDateFormat("yyyy-MM-dd")
val stod: UserDefinedFunction = sparkSession.udf.register("stod", (event_time: String) => {
if (StringUtils.isEmpty(event_time))
0
else
spdf.parse(event_time).getTime
})
//转换列的值
val joined2: Dataframe = joined.withColumn("register_time", stod($"register_time"))
.withColumn("signin_time", stod($"signin_time"))
// 求出 注册日期后一天登陆的人数 355
val signumDF: Dataframe = joined2.filter($"register_time" + 86400000 === $"signin_time")
.groupBy("register_time")
.agg(countDistinct("userUID").as("signinNum"))
signumDF.printSchema()
signumDF.show()
println("-------------------")
// 求出 注册日当天 新注册用户 381
val registernumDF: Dataframe = joined2.groupBy("register_time")
.agg(countDistinct($"userUID").as("registerNum"))
registernumDF.show()
registernumDF.printSchema()
val registerJoinSignDF: Dataframe = signumDF.join(registernumDF,Seq("register_time"))
registerJoinSignDF.printSchema()
registerJoinSignDF.show()
val day_retention: Dataframe = registerJoinSignDF.select(
from_unixtime($"register_time"/1000,"yyyy-MM-dd").as("register_time"),
($"signinNum" / $"registerNum".cast(DoubleType))
.cast(DecimalType(10, 2)).as("percent"))
day_retention.show()
EtlDemo.dataframeToPath(day_retention,"in/save2",0)
}
}
import org.apache.spark.SparkContext
import org.apache.spark.sql.{Dataframe, Dataset, SparkSession}
object Active {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder().appName("elt.retention").master("local[*]").getOrCreate()
val sc: SparkContext = sparkSession.sparkContext
import sparkSession.implicits._
val logs: Dataframe = EtlDemo.getDFByPath(sparkSession,"in/save").cache()
logs.show(100)
val ds2: Dataset[(String, String)] = logs.filter($"actionName" === "StartLearn" || $"actionName" === "ByCourse")
.map(x => {
(
x.getAs[String]("userUID"),
x.getAs[String]("event_time").substring(0, 10)
)
})
ds2.show()
import org.apache.spark.sql.functions._
val frame: Dataframe = ds2.withColumnRenamed("_1", "userid")
.withColumnRenamed("_2", "date")
.groupBy("date").agg(countDistinct("userid").as("activeNum"))
frame.show()
frame.printSchema()
EtlDemo.dataframeToPath(frame,"in/save3",0)
}
}