对采集到的埋点数据进行分析,因为埋点原始数据没有对应会话ID(sessionID),即对于一个人的一次访问超过一定时间间隔(根据业务需要进行设定,本次定为20min)没有操作则记录为一个不重复的id,超过该时间间隔后再次操作就记成另一个sessionid。
使用技术:1.sparkSQL
2.spark读写clickhouse
3.scala
业务实现: 1.pow依赖2.操作clickhouseorg.apache.spark spark-sql_2.123.2.0 provided org.apache.spark spark-core_2.123.2.0 com.thoughtworks.paranamer paranamer2.8 com.fasterxml.jackson.core jackson-core2.12.3 com.fasterxml.jackson.core jackson-databind2.12.3 ru.yandex.clickhouse clickhouse-jdbc0.2.3 com.alibaba fastjson1.2.62
因为埋点数据直接推送到kafka,后再使用clickhouse的kafka引擎表进行获取,所以需要先读取clickhouse中的数据
a.读取clickhouse
//创建sparkconf
val sparkConf: SparkConf = new SparkConf().setAppName("logSession").setMaster("local[*]")
//创建sparksession
val sparkSession:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val querySql =
s"""(select
|login_id,event,properties,login_id,distinct_id,timestamp,dt,time from ods_user_log) tmp""".stripMargin
//读取clickhouse
val dataDf = sparkSession
.read
.format("jdbc")
.option("url", "jdbc:clickhouse://ip:8123")
.option("fetchsize", "500000")
.option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
.option("user", "ck")
.option("password", "1234")
.option("dbtable",querySql )
.load()
读取参数设置https://spark.apache.org/docs/latest/sql-data-sources-jdbc.htmlhttps://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
b.写入clickhouse
这一个步骤需要你在clickhouse简历好对应的表,也是通过dataframe数据类型将数据写入clickhouse中
userLogDF
.write
.format("jdbc")
.mode(SaveMode.Append)
.option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
.option("url", "jdbc:clickhouse://ip:8123")
.option("batchsize", "50000")
.option("user", "ck")
.option("password", "1234")
.option("isolationLevel", "NONE")
.option("dbtable","tablename" )
.save()
3.会话切割实现
数据的处理过程中在实现会话切割的过程中也进行了对退出页面的判断和浏览顺序的排序;
import java.sql.{Date, Timestamp}
import com.alibaba.fastjson.{JSON, JSONObject}
import com.fasterxml.jackson.databind.KeyDeserializer.None
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}
import scala.collection.mutable
object UserLogETL {
def main(args: Array[String]): Unit = {
//传入参数实现查询条件的过滤
var endTime:String = args(0)
var startTime:String = args(1) + " "+"18:00:00"
//创建sparkconf
val sparkConf: SparkConf = new SparkConf().setAppName("logSession").setMaster("local[*]")
//创建sparksession
val sparkSession:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val querySql =
s"""(select
|login_id,event,properties,login_id,distinct_id,timestamp,dt,time from dev_ods.ods_user_log
|where time >toDateTime('$startTime') and dt between toDate('$startTime')and toDate('$endTime') ) tmp""".stripMargin
//读取clickhouse
val dataDf = sparkSession
.read
.format("jdbc")
.option("url", "jdbc:clickhouse://ip:8123")
.option("fetchsize", "500000")
.option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
.option("user", "ck")
.option("password", "1234")
.option("dbtable",querySql )
.load()
//将读取到的数据转为Json数据的df
val jsonStrRdd: RDD[String] = dataDf.toJSON.rdd
//将数据转为JSONobject,并且按照用户分组
val groupByuserRDD = jsonStrRdd.map(str => {
JSON.parseObject(str)
}).filter(f = jsonstr => {
jsonstr.getJSonObject("properties").getString("tenantid") != null
})
.groupBy(_.getString("distinct_id"))
val sessionLog: RDD[(String, mutable.Buffer[JSONObject])] = groupByuserRDD.mapValues(iter => {
//对组内值排序
val sortList: List[JSONObject] = iter.toList.sortWith(
(left, right) => {
left.getLongValue("timestamp") < right.getLongValue("timestamp")
}
)
val bufferArray: mutable.Buffer[JSONObject] = sortList.toArray.toBuffer
//sessionId切割
//定义sessionId
var sessionId: String = bufferArray.head.getString("distinct_id") + "_" + bufferArray.head.getString("timestamp")
//写入sessionId
for (i <- bufferArray.indices) {
if (i > 0) {
if (bufferArray(i).getLong("timestamp") - bufferArray(i - 1).getLong("timestamp") > 20 * 60 * 1000) {
sessionId = bufferArray(i).getString("distinct_id") + "_" + bufferArray(i).getString("timestamp")
}
}
bufferArray(i).put("session_id", sessionId)
bufferArray(i).put("tenant_id",bufferArray(i).getJSonObject("properties").getString("tenantid"))
bufferArray(i).put("shop_group_2_id",bufferArray(i).getJSonObject("properties").getString("shop_group_2_id"))
bufferArray(i).put("shop_id",bufferArray(i).getJSonObject("properties").getString("shop_id"))
bufferArray(i).put("shop_group_1_id",bufferArray(i).getJSonObject("properties").getString("shop_group_1_id"))
bufferArray(i).put("s_channel_id",bufferArray(i).getJSonObject("properties").getString("s_channel_id"))
}
bufferArray
})
import sparkSession.implicits._
//浏览时长,浏览顺序
val jsonObjectRDD: RDD[JSONObject] = sessionLog.map(_._2).flatMap(_.toList)
val stpDurRDD: RDD[UserLog] = jsonObjectRDD.
groupBy(_.getString("session_id")).mapValues(iter => {
//对组内值排序
val sortList: List[JSONObject] = iter.toList.sortWith(
(left, right) => {
left.getLongValue("timestamp") < right.getLongValue("timestamp")
})
val bufferArray: Array[JSONObject] = sortList.toArray
//定义是浏览顺序
var viewRk: Int = 0
//定义上一个时间
var lastViewIndex:Integer = 0
for (i <- 0 until bufferArray.length) {
if (viewRk == 0 && "pageShow".equals(bufferArray(i).getString("event")) || (viewRk == 0 && "detailPageView".equals(bufferArray(i).getString("event")))) {
viewRk += 1
bufferArray(i).put("view_rk", viewRk)
//判断是否为最后一个
if (i==bufferArray.length-1) {
bufferArray(i).put("step_dur",0)
bufferArray(i).put("isexit",1)
}else{
lastViewIndex = i
}
} else if (viewRk != 0 && "pageShow".equals(bufferArray(i).getString("event")) || (viewRk != 0 && "detailPageView".equals(bufferArray(i).getString("event")))) {
viewRk += 1
bufferArray(i).put("view_rk", viewRk)
bufferArray(lastViewIndex).put("step_dur",bufferArray(i).getLongValue("timestamp")-bufferArray(lastViewIndex).getLongValue("timestamp"))
if (i==bufferArray.length-1){
bufferArray(i).put("step_dur",0)
bufferArray(i).put("isexit",1)
}else{
lastViewIndex = i
}
} else {
if (i==bufferArray.length-1){
bufferArray(lastViewIndex).put("step_dur",0)
}
}
}
bufferArray
}).map(_._2).flatMap(_.toList)
.map(jsonobject => {
val log: UserLog = JSON.parseObject(jsonobject.toString(), classOf[UserLog])
log
})
.filter(userlog =>{
endTime.equals(userlog.dt.toString)
})
//写入clickhouse
val userLogDF: Dataframe = stpDurRDD.toDF()
userLogDF.show()
userLogDF
.write
.format("jdbc")
.mode(SaveMode.Append)
.option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
.option("url", "jdbc:clickhouse://ip:8123")
.option("batchsize", "50000")
.option("user", "ck")
.option("password", "1234")
.option("isolationLevel", "NONE")
.option("dbtable","tablename" )
.save()
println(userLogDF.count())
//关闭会话
sparkSession.close()
}
}
case class UserLog(tenant_id:String,
login_id:String,
shop_id:String,
shop_group_1_id:String,
shop_group_2_id:String,
s_channel_id:String,
session_id:String,
properties:String,
distinct_id:String,
time:Timestamp,
isexit:Int,
view_rk:Int,
step_dur:Long,
timestamp:Long,
event:String,
dt:Date,
)
整体会话切割的主要实现思路就是先根据用户进行分组,后再使用mapvalues算子对同一个分组中的值根据事件发生的时间排序,而后进行两两比较如果两个的时间大于指定的时间间隔(代码定义为20min),则通过时间戳加用户id生成一个新的sessionId,在对后来的数据赋值新生成的sessionid。
对于代码使用中的JSONobject两个getLong和getLongValue方法的区别源码解读:
public Long getLong(String key) {
Object value = get(key);
return castToLong(value);
}
public long getLongValue(String key) {
Object value = get(key);
Long longVal = castToLong(value);
if (longVal == null) {
return 0L;
}
附录:SQL实现埋点数据的会话切割
通过SQL切割session会话
参考文章:1.spark读写clickhouse
对大数据,程序语言有感兴趣的也可以来关注>> 迪答 <<公众号



