栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Spark 单Value类型算子案例详解

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Spark 单Value类型算子案例详解

1.
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
//测试Distinct函数
object _01DistinctDemo {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local").setAppName("getRDD")
        val sc = new SparkContext(conf)

        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 2, 3, 4, 4, 3, 0),2)
        val value: RDD[Int] = rdd.distinct(2)
        value.collect().foreach(println)
        //4
        //0
        //2
        //1
        //3
       
        val value1: RDD[(Int, Int)] = value.mapPartitionsWithIndex((index, iter) => {
            iter.map((index, _))
        }).distinct(2)
        value1.collect().foreach(println)
        //(0,4)
        //(0,0)
        //(1,1)
        //(1,3)
        //(0,2)
    }

}

2.
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

//Coalesce用于缩减分区数
object _02CoalesceDemo {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local").setAppName("getRDD")
        val sc = new SparkContext(conf)

        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 2, 3, 4, 4, 3, 0),2)
        val value: RDD[Int] = rdd.coalesce(2)
        val value1: RDD[(Int, Int)] = value.mapPartitionsWithIndex((index, iter) => {
            iter.map((index, _))
        }).distinct()  //可以增加.distinct()对集合内的数据去重再进行分区
        println(value1.getNumPartitions)
        value1.collect().foreach(println)
    }
}
//2
//(1,4)
//(1,0)
//(0,1)
//(0,3)
//(1,3)
//(0,2)

3.
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//Repatition函数
//用于重新分区
object _03RepatitionDemo {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("test")
        val sc: SparkContext = new SparkContext(conf)

        val rdd: RDD[Int] = sc.makeRDD(List(2, 3, 5, 6,4,3,4,5,2),2)
        //参数就是输入期望的分区数
        val value: RDD[Int] = rdd.repartition(4)
        val result: RDD[(Int, Int)] = value.mapPartitionsWithIndex((index, iter) => {
            iter.map((index, _))
        })
        result.collect().foreach(println)
    }
}
//(0,3)
//(0,3)
//(1,5)
//(1,4)
//(2,6)
//(2,5)
//(3,2)
//(3,4)
//(3,2)

4.
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

//SortBy函数的应用
//就是进行排序
object _04SortByDemo {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("test")
        val sc: SparkContext = new SparkContext(conf)

        val rdd: RDD[Int] = sc.makeRDD(List(2, 3, 5, 6,4,3,4,5,2),3)

       
        //val result: RDD[Int] = rdd.sortBy(num => num)  //就是升序排序的意思

        val value: RDD[Int] = rdd.sortBy(num => num, false)  //使用降序

        //输出分区号
        val result: RDD[(Int, Int)] = value.mapPartitionsWithIndex((index, iter) => {
            iter.map((index, _)) //可以打印出分区号
        })
        result.collect().foreach(println)
        //(0,6)
        //(0,5)
        //(0,5)
        //(1,4)
        //(1,4)
        //(2,3)
        //(2,3)
        //(2,2)
        //(2,2)
    }
}

5.
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//需求分析
//按key输出,进行排序 key是string类型
object _05SortBy_exercise {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("test")
        val sc: SparkContext = new SparkContext(conf)

        val rdd: RDD[(String, Int)] = sc.makeRDD(List(("11", 1), ("1", 1), ("2", 2)))
        //按照第一个元素做排序 第一个是字典排序 第二个是按Int排序
        //val result: RDD[(String, Int)] = rdd.sortBy(_._1.toInt)  //(1,1) (2,2) (11,1)
        val result: RDD[(String, Int)] = rdd.sortBy(_._1)//(1,1)
                                                            //(11,1)
                                                                //(2,2)
        result.collect().foreach(println)
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/336679.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号