栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

用户行为数据会话(session)切割及spark读写clickhouse完整实现

用户行为数据会话(session)切割及spark读写clickhouse完整实现

业务背景:

        对采集到的埋点数据进行分析,因为埋点原始数据没有对应会话ID(sessionID),即对于一个人的一次访问超过一定时间间隔(根据业务需要进行设定,本次定为20min)没有操作则记录为一个不重复的id,超过该时间间隔后再次操作就记成另一个sessionid。

使用技术:

1.sparkSQL

2.spark读写clickhouse

3.scala

业务实现: 1.pow依赖
   
        
        
            org.apache.spark
            spark-sql_2.12
            3.2.0
            provided
        

        
        
            org.apache.spark
            spark-core_2.12
            3.2.0
        

        
            com.thoughtworks.paranamer
            paranamer
            2.8
        


        
        
            com.fasterxml.jackson.core
            jackson-core
            2.12.3
        

        
            com.fasterxml.jackson.core
            jackson-databind
            2.12.3
        


        
        
            ru.yandex.clickhouse
            clickhouse-jdbc
            0.2.3
        

        
            com.alibaba
            fastjson
            1.2.62
        
    
2.操作clickhouse

        因为埋点数据直接推送到kafka,后再使用clickhouse的kafka引擎表进行获取,所以需要先读取clickhouse中的数据

a.读取clickhouse

  //创建sparkconf
     val sparkConf: SparkConf = new SparkConf().setAppName("logSession").setMaster("local[*]")

    //创建sparksession
    val sparkSession:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    val querySql =
      s"""(select
         |login_id,event,properties,login_id,distinct_id,timestamp,dt,time from ods_user_log) tmp""".stripMargin

    //读取clickhouse
    val dataDf = sparkSession
      .read
      .format("jdbc")
      .option("url", "jdbc:clickhouse://ip:8123")
      .option("fetchsize", "500000")
      .option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
      .option("user", "ck")
      .option("password", "1234")
      .option("dbtable",querySql )
      .load()

读取参数设置https://spark.apache.org/docs/latest/sql-data-sources-jdbc.htmlhttps://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

b.写入clickhouse

这一个步骤需要你在clickhouse简历好对应的表,也是通过dataframe数据类型将数据写入clickhouse中

    userLogDF
      .write
      .format("jdbc")
      .mode(SaveMode.Append)
      .option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
      .option("url", "jdbc:clickhouse://ip:8123")
      .option("batchsize", "50000")
      .option("user", "ck")
      .option("password", "1234")
      .option("isolationLevel", "NONE")
      .option("dbtable","tablename" )
      .save()
3.会话切割实现

数据的处理过程中在实现会话切割的过程中也进行了对退出页面的判断和浏览顺序的排序;

import java.sql.{Date, Timestamp}
import com.alibaba.fastjson.{JSON, JSONObject}
import com.fasterxml.jackson.databind.KeyDeserializer.None
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}
import scala.collection.mutable



object UserLogETL {

  def main(args: Array[String]): Unit = {

//传入参数实现查询条件的过滤
    var endTime:String = args(0)
    var startTime:String = args(1) + " "+"18:00:00"

    //创建sparkconf
     val sparkConf: SparkConf = new SparkConf().setAppName("logSession").setMaster("local[*]")

    //创建sparksession
    val sparkSession:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    val querySql =
      s"""(select
         |login_id,event,properties,login_id,distinct_id,timestamp,dt,time from dev_ods.ods_user_log
         |where time >toDateTime('$startTime')  and dt between toDate('$startTime')and toDate('$endTime') ) tmp""".stripMargin

    //读取clickhouse
    val dataDf = sparkSession
      .read
      .format("jdbc")
      .option("url", "jdbc:clickhouse://ip:8123")
      .option("fetchsize", "500000")
      .option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
      .option("user", "ck")
      .option("password", "1234")
      .option("dbtable",querySql )
      .load()


    //将读取到的数据转为Json数据的df
    val jsonStrRdd: RDD[String] = dataDf.toJSON.rdd

    //将数据转为JSONobject,并且按照用户分组
    val groupByuserRDD = jsonStrRdd.map(str => {
      JSON.parseObject(str)
    }).filter(f = jsonstr => {
      jsonstr.getJSonObject("properties").getString("tenantid") != null
    })
      .groupBy(_.getString("distinct_id"))

    val sessionLog: RDD[(String, mutable.Buffer[JSONObject])] = groupByuserRDD.mapValues(iter => {
      //对组内值排序
      val sortList: List[JSONObject] = iter.toList.sortWith(
        (left, right) => {
          left.getLongValue("timestamp") < right.getLongValue("timestamp")
        }
      )

      val bufferArray: mutable.Buffer[JSONObject] = sortList.toArray.toBuffer

      //sessionId切割
      //定义sessionId
      var sessionId: String = bufferArray.head.getString("distinct_id") + "_" + bufferArray.head.getString("timestamp")
      //写入sessionId
      for (i <- bufferArray.indices) {
        if (i > 0) {
          if (bufferArray(i).getLong("timestamp") - bufferArray(i - 1).getLong("timestamp") > 20 * 60 * 1000) {
            sessionId = bufferArray(i).getString("distinct_id") + "_" + bufferArray(i).getString("timestamp")
          }

        }
        bufferArray(i).put("session_id", sessionId)
        bufferArray(i).put("tenant_id",bufferArray(i).getJSonObject("properties").getString("tenantid"))
        bufferArray(i).put("shop_group_2_id",bufferArray(i).getJSonObject("properties").getString("shop_group_2_id"))
        bufferArray(i).put("shop_id",bufferArray(i).getJSonObject("properties").getString("shop_id"))
        bufferArray(i).put("shop_group_1_id",bufferArray(i).getJSonObject("properties").getString("shop_group_1_id"))
        bufferArray(i).put("s_channel_id",bufferArray(i).getJSonObject("properties").getString("s_channel_id"))
      }
      bufferArray
    })
    import sparkSession.implicits._

    //浏览时长,浏览顺序
    val jsonObjectRDD: RDD[JSONObject] = sessionLog.map(_._2).flatMap(_.toList)
    val stpDurRDD: RDD[UserLog] = jsonObjectRDD.
      groupBy(_.getString("session_id")).mapValues(iter => {
      //对组内值排序
      val sortList: List[JSONObject] = iter.toList.sortWith(
        (left, right) => {
          left.getLongValue("timestamp") < right.getLongValue("timestamp")
        })
      val bufferArray: Array[JSONObject] = sortList.toArray
      //定义是浏览顺序
      var viewRk: Int = 0
      //定义上一个时间
      var lastViewIndex:Integer = 0

      for (i <- 0 until bufferArray.length) {
        if (viewRk == 0 && "pageShow".equals(bufferArray(i).getString("event")) || (viewRk == 0 && "detailPageView".equals(bufferArray(i).getString("event")))) {

          viewRk += 1
          bufferArray(i).put("view_rk", viewRk)
          //判断是否为最后一个
          if (i==bufferArray.length-1) {
            bufferArray(i).put("step_dur",0)
            bufferArray(i).put("isexit",1)
          }else{
            lastViewIndex = i
          }
        } else if (viewRk != 0 && "pageShow".equals(bufferArray(i).getString("event")) || (viewRk != 0 && "detailPageView".equals(bufferArray(i).getString("event")))) {
          viewRk += 1
          bufferArray(i).put("view_rk", viewRk)
          bufferArray(lastViewIndex).put("step_dur",bufferArray(i).getLongValue("timestamp")-bufferArray(lastViewIndex).getLongValue("timestamp"))
          if (i==bufferArray.length-1){
            bufferArray(i).put("step_dur",0)
            bufferArray(i).put("isexit",1)
          }else{
            lastViewIndex = i
          }
        } else {
          if (i==bufferArray.length-1){
            bufferArray(lastViewIndex).put("step_dur",0)
          }
        }
      }
      bufferArray
    }).map(_._2).flatMap(_.toList)
      .map(jsonobject => {
        val log: UserLog = JSON.parseObject(jsonobject.toString(), classOf[UserLog])
        log
      })
      .filter(userlog =>{
      endTime.equals(userlog.dt.toString)
    })

    //写入clickhouse

    val userLogDF: Dataframe = stpDurRDD.toDF()
    userLogDF.show()

    userLogDF
      .write
      .format("jdbc")
      .mode(SaveMode.Append)
      .option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
      .option("url", "jdbc:clickhouse://ip:8123")
      .option("batchsize", "50000")
      .option("user", "ck")
      .option("password", "1234")
      .option("isolationLevel", "NONE")
      .option("dbtable","tablename" )
      .save()

    println(userLogDF.count())

    //关闭会话
    sparkSession.close()


  }
}
  case class UserLog(tenant_id:String,
                     login_id:String,
                     shop_id:String,
                     shop_group_1_id:String,
                     shop_group_2_id:String,
                     s_channel_id:String,
                     session_id:String,
                     properties:String,
                     distinct_id:String,
                     time:Timestamp,
                     isexit:Int,
                     view_rk:Int,
                     step_dur:Long,
                     timestamp:Long,
                     event:String,
                     dt:Date,
                    )

        整体会话切割的主要实现思路就是先根据用户进行分组,后再使用mapvalues算子对同一个分组中的值根据事件发生的时间排序,而后进行两两比较如果两个的时间大于指定的时间间隔(代码定义为20min),则通过时间戳加用户id生成一个新的sessionId,在对后来的数据赋值新生成的sessionid。

        对于代码使用中的JSONobject两个getLong和getLongValue方法的区别源码解读:

 public Long getLong(String key) {
        Object value = get(key);

        return castToLong(value);
    }

    public long getLongValue(String key) {
        Object value = get(key);

        Long longVal = castToLong(value);
        if (longVal == null) {
            return 0L;
        }
附录:SQL实现埋点数据的会话切割

通过SQL切割session会话

参考文章:

1.spark读写clickhouse


对大数据,程序语言有感兴趣的也可以来关注>> 迪答 <<公众号

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/679849.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号