日志数据,下面是一行日志信息
2018-09-04T20:27:31+08:00 http://datacenter.bdqn.cn/logs/user?actionBegin=1536150451617&actionClient=Mozilla%2F5.0+%28Windows+NT+6.1%3B+WOW64%29+AppleWebKit%2F537.36+%28KHTML%2C+like+Gecko%29+Chrome%2F63.0.3239.132+Safari%2F537.36&actionEnd=1536150451705&actionName=viewQuestionAnalysis&actionTest=0&actionType=3&actionValue=272878&clientType=001_bdqn&examType=001&ifEquipment=web&questionId=32415&skillIdCount=0&userSID=EDEC6A9CF8220BE663A22BDD13E428E7.exam-tomcat-node3.exam-tomcat-node3&userUID=272878&userUIP=117.152.82.106 GET 200 192.168.168.63 - - Apache-HttpClient/4.1.2 (java 1.5)
package **.**.***.etl
import org.apache.commons.lang.StringUtils
import org.apache.spark.rdd.RDD
//import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataframe, Dataset, Row, SparkSession}
object EtlDemo {
def main(args: Array[String]): Unit = {
//获取SparkSession对象
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("etldemo1").getOrCreate()
//获取SparkContext对象
val sc: SparkContext = spark.sparkContext
//导入spark隐式类
import spark.implicits._
//读取本地文件获取rdd
val rdd: RDD[String] = sc.textFile("in/test.log")
//按照Tab切割数据
val rdd1: RDD[Array[String]] = rdd.map(x=>x.split("t"))
//过滤掉字段数量少于8个的
val rdd2: RDD[Array[String]] = rdd1.filter(x=>x.length==8)
//将切割后的字段放入Row中,获得一个rddRow
val rddRow: RDD[Row] = rdd2.map(x=>Row(x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7)))
//编写Schema
val schema: StructType = StructType(Array(
StructField("event_time", StringType),
StructField("url", StringType),
StructField("method", StringType),
StructField("status", StringType),
StructField("sip", StringType),
StructField("user_uip", StringType),
StructField("action_prepend", StringType),
StructField("action_client", StringType)
))
//获取dataframe
val logsDF: Dataframe = spark.createDataframe(rddRow,schema)
//按照第一列和第二列对数据进行去重
val filter_logs_row: Dataset[Row] = logsDF.dropDuplicates("event_time", "url")
//过滤掉状态码非200
.filter($"status" === "200")
//过滤掉event_time为空的数据
.filter(StringUtils.isNotEmpty("event_time").toString)
//获取到一个datasetRow :filter_logs_row
将url按照"&"以及"="切割
val full_logs_row2: RDD[Row] = filter_logs_row.map(row => {
//获得url字段的数据
val str: String = row.getAs[String]("url")
//将数据用[?]分割,获得一个String类型的数组
val paramasArray: Array[String] = str.split("\?")
//paramasArray(1)是问号以后的数据,是我们需要拆分的
//定义一个Map类型的paramasMap变量,用来接收[=]拆分后的数据
var paramasMap: Map[String, String] = null
//逻辑判断
if (paramasArray.length == 2) {
//获得元组
val tuples: Array[(String, String)] = paramasArray(1).split('&') //用&分割
.map(x => x.split('=')) //用=分割
.filter(x => x.length == 2) //逻辑判断
.map(x => (x(0), x(1))) //放入元组
//将元组放入到Map中
paramasMap = tuples.toMap
}
//返回一个元组
(
row.getAs[String]("event_time"),
row.getAs[String]("method"),
row.getAs[String]("status"),
row.getAs[String]("sip"),
row.getAs[String]("user_uip"),
row.getAs[String]("action_prepend"),
row.getAs[String]("action_client"),
paramasMap.getOrElse[String]("userSID", ""),
paramasMap.getOrElse[String]("userUID", ""),
paramasMap.getOrElse[String]("actionBegin", ""),
paramasMap.getOrElse[String]("actionEnd", ""),
paramasMap.getOrElse[String]("actionType", ""),
paramasMap.getOrElse[String]("actionName", ""),
paramasMap.getOrElse[String]("ifEquipment", ""),
paramasMap.getOrElse[String]("actionValue", ""),
paramasMap.getOrElse[String]("actionTest", "")
)
}).toDF().rdd //转df再转rdd,获得一个rdd[Row]类型
//创建schema
val full_log_schema: StructType = StructType(Array(
StructField("event_time", StringType),
StructField("userSID", StringType),
StructField("userUID", StringType),
StructField("actionBegin", StringType),
StructField("actionEnd", StringType),
StructField("actionType", StringType),
StructField("actionName", StringType),
StructField("ifEquipment", StringType),
StructField("actionValue", StringType),
StructField("actionTest", StringType),
StructField("method", StringType),
StructField("status", StringType),
StructField("sip", StringType),
StructField("user_uip", StringType),
StructField("action_prepend", StringType),
StructField("action_client", StringType)
))
//创建一个dataframe
val full_logs_DF: Dataframe = spark.createDataframe(full_logs_row2,full_log_schema)
//完成,打印结构和数据
full_logs_DF.printSchema()
full_logs_DF.show()
}
}