栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark:日志文件数据清洗

Spark:日志文件数据清洗

日志数据,下面是一行日志信息
2018-09-04T20:27:31+08:00   http://datacenter.bdqn.cn/logs/user?actionBegin=1536150451617&actionClient=Mozilla%2F5.0+%28Windows+NT+6.1%3B+WOW64%29+AppleWebKit%2F537.36+%28KHTML%2C+like+Gecko%29+Chrome%2F63.0.3239.132+Safari%2F537.36&actionEnd=1536150451705&actionName=viewQuestionAnalysis&actionTest=0&actionType=3&actionValue=272878&clientType=001_bdqn&examType=001&ifEquipment=web&questionId=32415&skillIdCount=0&userSID=EDEC6A9CF8220BE663A22BDD13E428E7.exam-tomcat-node3.exam-tomcat-node3&userUID=272878&userUIP=117.152.82.106  GET    200    192.168.168.63 -  -  Apache-HttpClient/4.1.2 (java 1.5)

 

package **.**.***.etl

import org.apache.commons.lang.StringUtils
import org.apache.spark.rdd.RDD
//import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataframe, Dataset, Row, SparkSession}


object EtlDemo {
  def main(args: Array[String]): Unit = {
    //获取SparkSession对象
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("etldemo1").getOrCreate()
    //获取SparkContext对象
    val sc: SparkContext = spark.sparkContext
    //导入spark隐式类
    import spark.implicits._
    //读取本地文件获取rdd
    val rdd: RDD[String] = sc.textFile("in/test.log")
    //按照Tab切割数据
    val rdd1: RDD[Array[String]] = rdd.map(x=>x.split("t"))
    //过滤掉字段数量少于8个的
    val rdd2: RDD[Array[String]] = rdd1.filter(x=>x.length==8)
    //将切割后的字段放入Row中,获得一个rddRow
    val rddRow: RDD[Row] = rdd2.map(x=>Row(x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7)))
    //编写Schema
    val schema: StructType = StructType(Array(
      StructField("event_time", StringType),
      StructField("url", StringType),
      StructField("method", StringType),
      StructField("status", StringType),
      StructField("sip", StringType),
      StructField("user_uip", StringType),
      StructField("action_prepend", StringType),
      StructField("action_client", StringType)
    ))
    //获取dataframe
    val logsDF: Dataframe = spark.createDataframe(rddRow,schema)
    //按照第一列和第二列对数据进行去重
    val filter_logs_row: Dataset[Row] = logsDF.dropDuplicates("event_time", "url")
      //过滤掉状态码非200
      .filter($"status" === "200")
      //过滤掉event_time为空的数据
      .filter(StringUtils.isNotEmpty("event_time").toString)
    //获取到一个datasetRow :filter_logs_row
    将url按照"&"以及"="切割
    val full_logs_row2: RDD[Row] = filter_logs_row.map(row => {
      //获得url字段的数据
      val str: String = row.getAs[String]("url")
      //将数据用[?]分割,获得一个String类型的数组
      val paramasArray: Array[String] = str.split("\?")
      //paramasArray(1)是问号以后的数据,是我们需要拆分的
      //定义一个Map类型的paramasMap变量,用来接收[=]拆分后的数据
      var paramasMap: Map[String, String] = null
      //逻辑判断
      if (paramasArray.length == 2) {
        //获得元组
        val tuples: Array[(String, String)] = paramasArray(1).split('&') //用&分割
          .map(x => x.split('=')) //用=分割
          .filter(x => x.length == 2) //逻辑判断
          .map(x => (x(0), x(1))) //放入元组
        //将元组放入到Map中
        paramasMap = tuples.toMap
      }
      //返回一个元组
      (
        row.getAs[String]("event_time"),
        row.getAs[String]("method"),
        row.getAs[String]("status"),
        row.getAs[String]("sip"),
        row.getAs[String]("user_uip"),
        row.getAs[String]("action_prepend"),
        row.getAs[String]("action_client"),
        paramasMap.getOrElse[String]("userSID", ""),
        paramasMap.getOrElse[String]("userUID", ""),
        paramasMap.getOrElse[String]("actionBegin", ""),
        paramasMap.getOrElse[String]("actionEnd", ""),
        paramasMap.getOrElse[String]("actionType", ""),
        paramasMap.getOrElse[String]("actionName", ""),
        paramasMap.getOrElse[String]("ifEquipment", ""),
        paramasMap.getOrElse[String]("actionValue", ""),
        paramasMap.getOrElse[String]("actionTest", "")
      )
    }).toDF().rdd //转df再转rdd,获得一个rdd[Row]类型
    //创建schema
    val full_log_schema: StructType = StructType(Array(
      StructField("event_time", StringType),
      StructField("userSID", StringType),
      StructField("userUID", StringType),
      StructField("actionBegin", StringType),
      StructField("actionEnd", StringType),
      StructField("actionType", StringType),
      StructField("actionName", StringType),
      StructField("ifEquipment", StringType),
      StructField("actionValue", StringType),
      StructField("actionTest", StringType),
      StructField("method", StringType),
      StructField("status", StringType),
      StructField("sip", StringType),
      StructField("user_uip", StringType),
      StructField("action_prepend", StringType),
      StructField("action_client", StringType)
    ))
    //创建一个dataframe
    val full_logs_DF: Dataframe = spark.createDataframe(full_logs_row2,full_log_schema)
    //完成,打印结构和数据
    full_logs_DF.printSchema()
    full_logs_DF.show()
  }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/670939.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号