栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark 获取RDD案例详解

Spark 获取RDD案例详解

1)获取内存中的RDD

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//从内存中获取RDD
object _01FromMemory {
    def main(args: Array[String]): Unit = {
        //配置文件
        val conf: SparkConf = new SparkConf().setMaster("local").setAppName("createRDD")
        //获取上下文对象
        val sc = new SparkContext(conf)
​
        //从内存中创建RDD
        val list = List(3, 6, 78, 9)
​
        
        val rdd: RDD[Int] = sc.parallelize(list)
        rdd.foreach(println)
​
        println("--------------------------------")
        
        val rdd2 = sc.makeRDD(list)
        rdd2.foreach(println)
​
        //3
        //6
        //78
        //9
        //--------------------------------
        //3
        //6
        //78
        //9
    }
}
​

2)获取外部数据的RDD

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
​
//从外部数据获取RDD
object _02FromOthers {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local").setAppName("getRDD")
        val sc = new SparkContext(conf)
​
        
        val rdd = sc.textFile("D://input//words.txt")
​
        
        val rdd2: RDD[String] = sc.textFile("data/1.txt")
        //打印结果
        rdd.collect().foreach(println)
        println("-------------------------")
        rdd2.collect().foreach(println)
        
        //hello world
        //hello spark
        //hello scala
        //hello java
        //hello spark
        //-------------------------
        //12
        //12345
        //3
    }
}

3)读取内存数据的并行度和分区

import org.apache.spark.{SparkConf, SparkContext}
​
//读取内存中的分区和并行度
//并行度指的就是分区的数量,也同时指的就是Task的数量
object _03PartitionFromMemory {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("readRDD")
        
        conf.set("spark.default.parallelism","4")  //如果取消掉就是走local[*]
        val sc = new SparkContext(conf)
​
        val rdd = sc.makeRDD(List(3, 5, 6, 6, 7))
​
        
        rdd.saveAsTextFile("out")
        sc.stop()
        //我使用的是4core 所以会有四个分区 每个分区平均分配到一个数据
    }
}
​

4)读取外部数据的并行度和分区

package com.qf.spark.wc.day03.getrdd
​
import org.apache.spark.{SparkConf, SparkContext}
​
object _04PartitionFromOuterFile {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("readRDD")
        val sc = new SparkContext(conf)
​
        //textFile的第二个参数解析:表示设置最小分区数,因此实际分区数可能大于该值
        val rdd = sc.textFile("data/1.txt", 2)
        rdd.saveAsTextFile("out")
        sc.stop()
    }
    //这里会生成了两个分区,因为第二个参数进行了设置
}
//生成分区1的数据
//12
//12345
​
//分区2
//3

5)内存分区的数据

import org.apache.spark.{SparkConf, SparkContext}
​
object _05PartitionDataFromMemory {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("MemoryData")
        val sc = new SparkContext(conf)
​
        val rdd = sc.makeRDD(List(1, 3, 8, 9),2)
        //获取分区数的方法
        rdd.saveAsTextFile("out")//生成分区
        sc.stop()
​
        //result
        //1 3 一个分分区
        //8 9 一个分区
        //总结:均分,如不能均分就从后面的分区开始加入数据
    }
}
​

6)外部文件的分区的数据

import org.apache.spark.{SparkConf, SparkContext}
//获取外部文件分区的数据
//spark的分区数量就是和MR的分片数量有关,而MR的分片计算有一个阈值1.1
object _06PartitionDataFromFile {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("MemoryData")
        val sc = new SparkContext(conf)
​
        val rdd = sc.textFile("data/1.txt", 2)
        rdd.saveAsTextFile("out")
        sc.stop()
​
        //解释
        //       比如数据如下:
        //              12@@
        //              12345@@
        //              1
        //
        //              总共12个字节,期望每个分区6个字节,但是通常来说,上一个分区和下一个分区的数据的临界行,会跨这两个分区
        //              也就是上一个分区的最后一行的数据是溢出的,因此要读取到换行符号才行
        //              第一个分区的数据: 12@@
        //                                12345@@  在同一个分区
        //              第二个分区的数据,不会重读读过的数据,因此只有1
        
        //result 生成如下两个分区
        //12
        //12345
        //-----------------
        //1
    }
}
​

6)RDD的读取和保存

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
​
object _07RDDIO {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("MemoryData")
        val sc = new SparkContext(conf)
​
        
        //        val value = sc.textFile("data/1.txt")
        //        val value: RDD[(String, String)] = sc.wholeTextFiles("data")
        //        val value: RDD[Nothing] = sc.objectFile("data/part-00001")
        val value: RDD[(String, Int)] = sc.sequenceFile("data/part-00001")
​
        
        value.saveAsTextFile("out")
        //        value.saveAsObjectFile("outObject")
        //        value.map((_,1)).saveAsSequenceFile("outSequence")
        value.foreach(println)
    }
}
​
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/335869.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号