栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

大数据spark部分方法

大数据spark部分方法

引入依赖:


    
        org.apache.spark
        spark-core_2.12
        3.0.0
    


    
        
        
            net.alchim31.maven
            scala-maven-plugin
            3.2.2
            
                
                    
                    
                        testCompile
                    
                
            
        
        
            org.apache.maven.plugins
            maven-assembly-plugin
            3.1.0
            
                
                    jar-with-dependencies
                
            
            
                
                    make-assembly
                    package
                    
                        single
                    
                
            
        
    

zip:

  • 函数说明

将两个RDD中的元素,以键值对的形式进行合并。其中,键值对中的Key为第1个RDD中的元素,Value为第2个RDD中的相同位置的元素。

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val dataRDDzip1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDDzip2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDDZip = dataRDDzip1.zip(dataRDDzip2)

输出:

(3,5)
(4,6)
(2,4)
(1,3)

partitionBy:

  • 将数据按照指定Partitioner重新进行分区。Spark默认的分区器是HashPartitioner
import org.apache.spark.HashPartitioner
val rdd: RDD[(Int, String)] =
  sparkContext.makeRDD(Array((1, "aaa"), (2, "bbb"), (3, "ccc")), 3)
val rdd2: RDD[(Int, String)] = {
  rdd.partitionBy(new HashPartitioner(2))
}
rdd2.foreach(println)

reduceByKey:

  • 可以将数据按照相同的Key对Value进行聚合
val dataRDD1 = sparkContext.makeRDD(List(("a", 1), ("b", 2), ("c", 3),("a",3)))
val dataRDD2 = dataRDD1.reduceByKey(_ + _)
val dataRDD3 = dataRDD1.reduceByKey(_ + _, 2)
dataRDD2.foreach(println)
dataRDD3.foreach(println)

输出:

(b,2)
(a,4)
(c,3)

groupByKey:

  • 将数据源的数据根据key对value进行分组
  • val dataRDD1 =
      sparkContext.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("a", 4)))
    val dataRDD2 = dataRDD1.groupByKey()
    val dataRDD3 = dataRDD1.groupByKey(2)
    val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2))

输出:(b,CompactBuffer(2))
(a,CompactBuffer(1, 4))
(c,CompactBuffer(3))

aggregateByKey:

将数据根据不同的规则进行分区内计算和分区间计算

val rdd =
  sparkContext.makeRDD(List(
    ("a", 1), ("c", 3),
    ("b", 4), ("c", 5), ("c", 6), ("a", 2)
  ), 3)

val resultRDD =
  rdd.aggregateByKey(0)(
    (x, y) => math.max(x, y),
    (x, y) => x + y
  )

resultRDD.collect().foreach(println)

输出:

(c,14)
(a,3)
(b,4)

foldByKey():

当分区内计算规则和分区间计算规则相同时,aggregateByKey就可以简化为foldByKey

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3),("a",5),("b",19),("c",5)))
val dataRDD2 = dataRDD1.foldByKey(0)(_+_)
dataRDD2.foreach(println)

输出:

(b,21)
(a,6)
(c,8)

combineByKey

最通用的对key-value型rdd进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))
val input: RDD[(String, Int)] = sparkContext.makeRDD(list, 2)

val combineRdd: RDD[(String, (Int, Int))] = input.combineByKey(
  (_, 1),
  (acc: (Int, Int), v) => (
    acc._1 + v, acc._2 + 1),

  (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)

combineRdd.foreach(println)

(b,(286,3))
(a,(274,3))

sortByKey

在一个(K,V)的RDD上调用,K必须实现Ordered接口(特质),返回一个按照key进行排序的

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3),("a",5),("b",5),("c",5)))
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)
val sortRDD2: RDD[(String, Int)] = dataRDD1.sortByKey(false)
sortRDD1.foreach(println)
sortRDD2.foreach(println)

输出:

(c,3)
(a,1)
(b,2)
(a,5)
(c,5)
(b,5)

(b,2)
(b,5)
(a,1)
(c,3)
(a,5)
(c,5)

join

在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素连接在一起的(K,(V,W))的RDD

val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))

val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))

rdd.join(rdd1).collect().foreach(println)

输出:

(1,(a,4))
(2,(b,5))
(3,(c,6))

leftOuterJoin

类似于SQL语句的左外连接

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))

val dataRDD2 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))

val rdd: RDD[(String, (Int, Option[Int]))] = dataRDD1.leftOuterJoin(dataRDD2)

输出:

(b,(2,Some(2)))
(a,(1,Some(1)))
(c,(3,Some(3)))

cogroup

在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("a",2),("c",3)))

val dataRDD2 = sparkContext.makeRDD(List(("a",1),("c",2),("c",3)))

val value: RDD[(String, (Iterable[Int], Iterable[Int]))] =

dataRDD1.cogroup(dataRDD2)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/336112.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号