栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

常用Spark-Shell记录

常用Spark-Shell记录

启动

/home/work/路径/bin/spark-shell --master yarn --num-executors 40 --conf spark.speculation=true --queue 队列名

1 分析落盘文件某域名某时间点下值总和

val path = “/xxxx/*”

val data = sc.textFile(path)

var data1 = data.flatMap(_.split("n"))

var data2 = data1.filter(x=>x.contains(“域名”))

var data3 = data2.filter(x=>x.contains(“时间戳”))

var res = data3.map(x=>x.split("t")).map(x=>(x(0),x(x.length-1).toDouble.toLong))

var res2 = res.reduceByKey(+)

res2.take(10)

Array[(String, Long)] = Array((cdninfo.proc.topic.cnt.node,3499825092))

2 分析afs数据源

import java.text.SimpleDateFormat

import java.util.{Date, Locale}

val format = new SimpleDateFormat("[dd/MMM/yyyy:HH:mm:ss Z]", Locale.US)

val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

val path = "path1,path2..."

val data = sc.sequenceFile[Long, String](path).map(_._2).map(l => l.substring(l.indexOf("t") + 1, l.lastIndexOf("t"))).flatMap(_.split("n")).filter(x => x != null && x.length > 0)

val c = data.filter(x => x.contains("xxx域名"))

val d = c.filter(x => {

  val ts = format.parse(x.split("t")(2)).getTime / 1000 // perf日志

  if (ts >= 1632816000 && ts <= 1632819600) true else false

})



val tmp = d.map(x => x.split("t")).filter(x => if (x.length == 41 || x.length == 39) true else false).filter(x => x(6).equals("xxx") || x(4).equals("xxx")).map(x => {

  if (x.length == 41) {

    ((format.parse(x(4)).getTime / 1000 / 60 * 60, x(6)), x(9).toDouble.toLong)

  } else {

    ((format.parse(x(2)).getTime / 1000 / 60 * 60, x(4)), x(7).toDouble.toLong)

  }

}).reduceByKey(_ + _)

val result = tmp.map(x => (fm.format(new Date(x._1._1 * 1000)), x._2 * 8 / 60)).map { case (str, long) => s"$str $long" }

result.repartition(1).saveAsTextFile("/输出路径")

val t = d.map(x => x.split("t")).filter(x => if (x.length == 41) true else false))



val tmp3 = z.filter(x => {val remoteAddr = x(2).split(":")(0)

  if (remoteAddr.contains("xxx") || remoteAddr.startsWith("xxx")) false else true

}).filter(x => {

  val ua = x(27)

  if ((ua.equals("xxx")

    && x(0).split(":")(0).startsWith("10.")) || ua.contains("xxx")) {

    false

  } else true

}).filter(x => {

  val uri = x(7).split(" ")(2)

  if (uri.startsWith("xxx") || uri.startsWith("xxx")) {

    false

  } else true

}).filter(x => { // 去重

  if (mutableSet.contains(x(23) + "#" + x(40))) {

    false

  } else {

    mutableSet.add(x(23) + "#" + x(40))

    true

  }

}).filter(x => {

  if (x(38).indexOf("xxx") != -1) false else true //  去父层

})

3 打点计算

val domain = “xxx”

val path = s"xxx路径*"

val s = sc.textFile(path).filter(x => x != null && x.length > 0)

s.map(x=>x.split("|")(14).toLong).sum

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/736283.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号