栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SparkSession: rdd算子 和SQL练习

SparkSession: rdd算子 和SQL练习

下面应用到的三个表数据部分如下,仅供参考

 customers.csv:

"1","Richard","Hernandez","XXXXXXXXX","XXXXXXXXX","6303 Heather Plaza","Brownsville","TX","78521"
"2","Mary","Barrett","XXXXXXXXX","XXXXXXXXX","9526 Noble Embers Ridge","Littleton","CO","80126"
"3","Ann","Smith","XXXXXXXXX","XXXXXXXXX","3422 Blue Pioneer Bend","Caguas","PR","00725"
"4","Mary","Jones","XXXXXXXXX","XXXXXXXXX","8324 Little Common","San Marcos","CA","92069"
"5","Robert","Hudson","XXXXXXXXX","XXXXXXXXX","10 Crystal River Mall ","Caguas","PR","00725"
"6","Mary","Smith","XXXXXXXXX","XXXXXXXXX","3151 Sleepy Quail Promenade","Passaic","NJ","07055"
"7","Melissa","Wilcox","XXXXXXXXX","XXXXXXXXX","9453 High Concession","Caguas","PR","00725"
"8","Megan","Smith","XXXXXXXXX","XXXXXXXXX","3047 Foggy Forest Plaza","Lawrence","MA","01841"
"9","Mary","Perez","XXXXXXXXX","XXXXXXXXX","3616 Quaking Street","Caguas","PR","00725"
"10","Melissa","Smith","XXXXXXXXX","XXXXXXXXX","8598 Harvest Beacon Plaza","Stafford","VA","22554"
"11","Mary","Huffman","XXXXXXXXX","XXXXXXXXX","3169 Stony Woods","Caguas","PR","00725"
"12","Christopher","Smith","XXXXXXXXX","XXXXXXXXX","5594 Jagged Embers By-pass","San Antonio","TX","78227"
"13","Mary","Baldwin","XXXXXXXXX","XXXXXXXXX","7922 Iron Oak Gardens","Caguas","PR","00725"
"14","Katherine","Smith","XXXXXXXXX","XXXXXXXXX","5666 Hazy Pony Square","Pico Rivera","CA","90660"
"15","Jane","Luna","XXXXXXXXX","XXXXXXXXX","673 Burning Glen","Fontana","CA","92336"
"16","Tiffany","Smith","XXXXXXXXX","XXXXXXXXX","6651 Iron Port","Caguas","PR","00725"
"17","Mary","Robinson","XXXXXXXXX","XXXXXXXXX","1325 Noble Pike","Taylor","MI","48180"
"18","Robert","Smith","XXXXXXXXX","XXXXXXXXX","2734 Hazy Butterfly Circle","Martinez","CA","94553"
"19","Stephanie","Mitchell","XXXXXXXXX","XXXXXXXXX","3543 Red Treasure Bay","Caguas","PR","00725"
"20","Mary","Ellis","XXXXXXXXX","XXXXXXXXX","4703 Old Route","West New York","NJ","07093"

orders.csv:

"1","2013-07-25 00:00:00","11599","CLOSED"
"2","2013-07-25 00:00:00","256","PENDING_PAYMENT"
"3","2013-07-25 00:00:00","12111","COMPLETE"
"4","2013-07-25 00:00:00","8827","CLOSED"
"5","2013-07-25 00:00:00","11318","COMPLETE"
"6","2013-07-25 00:00:00","7130","COMPLETE"
"7","2013-07-25 00:00:00","4530","COMPLETE"
"8","2013-07-25 00:00:00","2911","PROCESSING"
"9","2013-07-25 00:00:00","5657","PENDING_PAYMENT"
"10","2013-07-25 00:00:00","5648","PENDING_PAYMENT"
"11","2013-07-25 00:00:00","918","PAYMENT_REVIEW"
"12","2013-07-25 00:00:00","1837","CLOSED"
"13","2013-07-25 00:00:00","9149","PENDING_PAYMENT"
"14","2013-07-25 00:00:00","9842","PROCESSING"
"15","2013-07-25 00:00:00","2568","COMPLETE"
"16","2013-07-25 00:00:00","7276","PENDING_PAYMENT"
"17","2013-07-25 00:00:00","2667","COMPLETE"
"18","2013-07-25 00:00:00","1205","CLOSED"
"19","2013-07-25 00:00:00","9488","PENDING_PAYMENT"
"20","2013-07-25 00:00:00","9198","PROCESSING"
"21","2013-07-25 00:00:00","2711","PENDING"
"22","2013-07-25 00:00:00","333","COMPLETE"

order_items.csv:

1,1,971,1,26.87,26.87
2,1,217,3,23.61,7.87
3,1,98,2,33.1,16.55
4,2,998,1,86.39,86.39
5,2,813,1,100.93,100.93
6,2,171,2,191.08,95.54
7,2,639,1,23.8,23.8
8,3,273,3,21.36,7.12
9,4,285,1,48.57,48.57
10,4,628,3,40.53,13.51
11,4,1,3,109.62,36.54
12,4,354,3,95.43,31.81
13,5,426,3,203.07,67.69
14,5,534,3,251.04001,83.68
15,5,688,1,102.22,102.22
16,5,539,2,16.04,8.02
17,6,608,2,26.68,13.34
18,6,535,1,91.71,91.71
19,6,276,2,131.6,65.8
20,7,36,2,52.34,26.17
21,7,40,2,97.28,48.64
22,7,451,1,63.39,63.39
23,8,156,1,104.62,104.62
24,8,105,1,9.88,9.88
25,8,165,3,114.72,38.24
26,8,428,2,182.72,91.36
27,8,734,3,283.44,94.48
28,9,917,3,73.8,24.6
29,10,613,1,25.84,25.84
30,11,949,2,166.74,83.37
31,12,694,2,149.6,74.8
32,12,914,1,72.57,72.57
33,12,999,3,306.66,102.22
34,13,746,1,79.34,79.34
35,13,483,3,144.24,48.08
36,14,254,3,247.98001,82.66
37,14,263,3,66.12,22.04

import java.math.MathContext

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel

import scala.math.BigDecimal.RoundingMode

object App2 {

  val HDFS_ROOT="hdfs://single01:9000"

  //SparkSession既可以做面向rdd的计算或也可以做面向sql的运算
  def sparkSubmit(fs:SparkSession=>Unit*)={
    val spark:SparkSession=SparkSession
      .builder()
      .appName("spark_sql_01")
      .master("local[*]")
      //开启动态分区,默认是false
//      .config("hive.exec.dynamic.partition","true")
      //开启允许所有分区都是动态的,否则必须要有静态分区才能使用
//      .config("hive.exec.dynamic.partition.mode","nonstrict")
//      .enableHiveSupport()
      .getOrCreate()

    fs.foreach(f=>f(spark))

    spark.close()

  }
  //write.config("spark.sql.parquet.writeLegacyFormat",true).mode(SaveMode.Append).saveAsTable(table)
  //write.mode(SaveMode.Append).insertInto(table)
  //交集、并集、差集
  def unionintersectionsub (spark:SparkSession)={
    val sc:SparkContext=spark.sparkContext
  //makeRDD造数据,Seq类型,里面可以放样例类或元组
    val rdd1: RDD[(Int, String)] = sc.makeRDD(Seq((1, "henry"), (2, "pola"), (4, "ariel"), (7, "jack"), (10, "rose")))
    val rdd2: RDD[(Int, String)] = sc.makeRDD(Seq((3, "mary"), (2, "pola"), (5, "sherry"), (7, "jack"), (10, "rose")))
    //并集
    rdd1.union(rdd2).foreach(println)
    //交集
    rdd1.intersection(rdd2).foreach(println)
    //差集
    rdd1.subtract(rdd2).foreach(println)
    rdd2.subtract(rdd1).foreach(println)
  }


//rdd存储
  def persistCheckpoint(sparkSession: SparkSession):Unit={
    //血统(血缘关系)
    //计算过程复杂的rdd1
    sparkSession.sparkContext.setCheckpointDir("hdfs://single01:9000/spark/checkpoint/app_name")
    val rdd1:RDD[String]=null

    rdd1.cache()//等同于=>  persist(StorageLevel.MEMORY_ONLY)
    rdd1.persist(StorageLevel.MEMORY_AND_DISK_SER_2)//内存放不下,可以放磁盘,序列化,多备份一份
    rdd1.checkpoint()
    val checkpointed: Boolean = rdd1.isCheckpointed
    val file: Option[String] = rdd1.getCheckpointFile
    //rdd2.union(rdd1)
    //rdd3.join(rdd1)
    //rdd1.map
  }
  
  

//  1.根据hadoop上的表信息,创建样例类************************************************************************************
  
  //    #  顾客信息  id,姓,名,邮箱,密码,详细地址,城市,国家,邮政编码
  case class Customer(id :Int,fname:String
                      ,lname:String,email:String,
                      password:String,street:String,
                      city:String,state:String,zipcode:String)

  //    # 订单目录   id,订单日期,订单顾客编号,订单状态
  case class Order(id:Int,date:String,customer_id:Int,status:String)

  //    #订单项目信息   id,订单id,产品id,数量,总价,单价
  case class OrderItem(id:Int,order_id:Int,product_id:Int,
                               quantity:Short,subtotal:Float,product_price:Float)


  
//  2.根据表数据和样例类,创建隐式类,做类型扩展(便于表数据转化为样例类实例)***********************************************
  implicit class StringTo(line:String){
    private val cut=(line:String)=>{
      line.split( ",").map(_.replaceAll(""",""))
    }

    def toCustomer:Customer={
      val arr: Array[String] = cut(line)
      Customer(arr(0).toInt,arr(1),arr(2),arr(3),arr(4),arr(5),arr(6),arr(7),arr(8))
    }
    def toOrder:Order={
      val arr: Array[String] = cut(line)
      Order(arr(0).toInt,arr(1),arr(2).toInt,arr(3))
    }
    def toOrderItem:OrderItem={
      val arr: Array[String] = cut(line)
      OrderItem(arr(0).toInt,arr(1).toInt,arr(2).toInt,arr(3).toShort,arr(4).toFloat,arr(5).toFloat)
    }
  }


  //3.根据后面SQL需求,对rdd算子预处理************************************************************************************
  def join(sparkSession: SparkSession)={
    val sc=sparkSession.sparkContext
    val rddCus: RDD[(Int,String)] = sc.textFile(s"${HDFS_ROOT}/spark/resource/electronic_business/customers.csv", 3)
      .map(_.toCustomer)//转换为样例类Customer
      .map(cus=>(cus.id,s"${cus.lname} ${cus.fname}"))
    val rddOrd: RDD[Order] = sc.textFile(s"${HDFS_ROOT}/spark/resource/electronic_business/orders.csv", 3)
      .map(_.toOrder)//转换为样例类Order
    val rddOI: RDD[(Int,Float)] = sc.textFile(s"${HDFS_ROOT}/spark/resource/electronic_business/order_items.csv", 3)
      .map(_.toOrderItem)//转换为样例类OrderItem
      .map(oi=>(oi.order_id,oi.subtotal))
    val rddOrd1: RDD[(Int, Int)] = rddOrd
      .filter(_.status.equalsIgnoreCase("complete"))//过滤行(缩减数据规模)
      .map(o => (o.id, o.customer_id))//列裁剪(缩减数据规模)

//    4.对rdd算子做SQL练习***********************************************************************************************
//    查找谁是最大买“货”?(谁购买的最多,以¥¥¥算)
    println(rddOrd1
      //相同的只保留一份
      .join(rddOI)
      // (o.customer_id,oi.subtotal)
      .map(_._2)
      //按键分组,同一个键相邻的值一次聚合
      .reduceByKey(_ + _)
      //(customer_id,(sum(subtotal),customer_name))
      .join(rddCus)
      //金额升序前三名
//        .sortBy(_._2._1)
      //金额降序前三名
      .sortBy(_._2._1,false)
        .take(3).mkString("n"))

    //右外连接,左边Option;左外右边Option


    //右外连接,左边Option;左外右边Option

  }

  
  def testCount(sparkSession: SparkSession)={
    val value: RDD[(Int, Int)] = sparkSession
      .sparkContext
      .makeRDD(Seq((1, 2), (1, 3), (1, 2), (2, 3), (3, 4)), 1)
    println(value.count())
    println(value.countByKey())
    println(value.countByValue())
  }



  def groupTest(spark: SparkSession)={
    val sc:SparkContext=spark.sparkContext
    val rdd1: RDD[(Int, String)] = sc.makeRDD(Seq((1, "henry"), (2, "pola"), (4, "ariel"), (7, "jack"), (10, "rose")))
    val rdd2: RDD[(Int, String)] = sc.makeRDD(Seq((3, "mary"), (2, "pola"), (5, "sherry"), (7, "jack"), (10, "rose")))

    //同键合并,键在前,值变成二元组里的迭代器
//    cogroup可以分区,groupWith不行,其他一样
   
    rdd1.cogroup(rdd2)
      .mapValues(tp2=>s"${tp2._1.mkString(",")}_${tp2._2.mkString(",")}")
      .foreach(println)
  }

  def main(args: Array[String]): Unit = {

//    sparkSubmit(unionintersectionsub)
//    sparkSubmit(join)
//      sparkSubmit(testCount)
    sparkSubmit(groupTest)
  }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758524.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号