栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

三、RDD编程(1)

三、RDD编程(1)

目录
    • RDD编程基础
      • RDD创建
      • RDD Transformation
      • RDD Action
      • 持久化
      • 分区
      • WordCount实例
    • 键值对RDD
      • 常用的键值对Transformation
      • 综合实例

RDD编程基础 RDD创建
  1. 从文件系统中加载数据创建RDD
>>> lines = sc.textFile("file:///opt/module/spark/mycode/rdd/word.txt")
>>> lines.foreach(print)
Spark is fast                                                       (0 + 2) / 2]
Spark is better
Hello Hadoop
Hello Spark
Hadoop is good

不知道为什么是倒序输出

如果不是在命令行,需要自己创建SparkContext对象

from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My APP")
sc = SparkContext(conf = conf)

从HDFS中加载数据

lines = sc.textFile('hdfs://localhost:9000/user/hadoop/word.txt')
lines = sc.textFile("/user/hadoop/word.txt")
lines = sc.textFile("word.txt")

  1. 通过并行集合(列表)创建RDD

parallelize

>>> array = [1,2,3,4,5]
>>> rdd = sc.parallelize(array)
>>> rdd.foreach(print)
1
2
3
4
5

RDD Transformation
#filter(func) 筛选出满足函数func的元素,并返回一个新的数据集
>>> lines = sc.textFile("file:///opt/module/spark/mycode/rdd/word.txt")
>>> linesWithSpark = lines.filter(lambda line: "Spark" in line)
>>> linesWithSpark.foreach(print)
Spark is fast
Hello Spark
Spark is better

#map(func) 将每个元素传递到函数func中,并将结果返回为一个新的数据集
#例一
>>> data = [1,2,3,4,5]
>>> rdd1 = sc.parallelize(data)
>>> rdd2 = rdd1.map(lambda x : x + 10) #这是是对rdd1做map操作,容易写成sc
>>> rdd2.foreach(print)
13
11
12
14
15
#例二
>>> lines = sc.textFile("file:///opt/module/spark/mycode/rdd/word.txt")
>>> words = lines.map(lambda line : line.split(" ")) #split也要有对象
>>> words.foreach(print)
['Spark', 'is', 'fast']
['Spark', 'is', 'better']
['Hello', 'Hadoop']
['Hello', 'Spark']
['Hadoop', 'is', 'good']

#flatMap(func) 与map()相似,但每个输入元素都可以映射到0或多个输出结果 拍扁
>>> lines = sc.textFile("file:///opt/module/spark/mycode/rdd/word.txt")
>>> words = lines.flatMap(lambda line : line.split(" ")) #先map再flat拍扁
>>> words.foreach(print)
Spark
is
fast
Hello
Hadoop
Spark
is
better
Hello
Spark
Hadoop
is
good

#groupByKey() 应用于(K,V)键值对的数据集时,返回一个新的(K,Iterable)形式的数据集
>>> data = [("Spark", 1), ("is", 1), ("fast", 1), ("Hello", 1),
... ("Hadoop", 1), ("Spark", 1), ("is", 1), ("better", 1), ("Hello", 1),
... ("Spark", 1), ("Hadoop", 1), ("is", 1), ("good", 1)]
>>> words = sc.parallelize(data)
>>> words1 = words.groupByKey()
>>> words1.foreach(print)
('fast', )
('better', )
('good', )
('Spark', )
('is', )
('Hello', )
('Hadoop', )
#("Spark", 1) --> ("Spark", (1, 1, 1)) Iterable(一种可迭代集合)

#reduceByKey(func) 应用于(K,V)键值对的数据集时,返回一个新的(K,V)形式的数据集,其中每个值是将每个key传递到函数func中进行聚合后的结果
>>> words1 = words.reduceByKey(lambda a, b : a + b)
>>> words1.foreach(print)
('good', 1)
('Hadoop', 2)
('Spark', 3)
('is', 3)
('Hello', 2)
('fast', 1)
('better', 1)
RDD Action

行动操作是真正触发计算的地方

#常用的RDD行动操作API
>>> array = [3,4,5,6,7]
>>> rdd = sc.parallelize(array)

#count() 返回数据集中的元素个数
>>> rdd.count()
5

#collect() 以数组的形式返回数据集中的所有元素
>>> rdd.collect()
[3, 4, 5, 6, 7]

#first() 返回数据集中的第一个元素
>>> rdd.first()
3

#take(n) 以数组的形式返回数据集中的前n个元素
>>> rdd.take(3)
[3, 4, 5]

#reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
>>> rdd.reduce(lambda a, b : a + b)
25

#foreach(func) 将数据集中的每个元素传递到函数func中运行
>>> rdd.foreach(lambda elem : print(elem))
6
3
5
7
4
持久化

持久化(缓存)机制可以避免重复计算的开销

使用persist()方法将一个RDD标记为持久化

>>> myList = ["Hadoop", "Spark", "Hive"]
>>> rdd = sc.parallelize(myList)
>>> rdd.cache()
ParallelCollectionRDD[34] at readRDDFromFile at PythonRDD.scala:262
    #默认persist(MEMORY_ONLY)
    #第二种persist(MEMORY_AND_DISK)
>>> print(rdd.count())
3
>>> print(",".join(rdd.collect()))
Hadoop,Spark,Hive
>>> rdd.unpersist()
ParallelCollectionRDD[34] at readRDDFromFile at PythonRDD.scala:262

分区
  1. 增加并行度
  2. 减少通信开销

实例:对两个表进行join操作

表一:UserData(UserID, UserInfo)

表二:Events(UserID, linkInfo)

目标:(UserID, UserInfo, linkInfo)

RDD中的每个元素是(UserID, UserInfo),(UserID, linkInfo)键值对

原则: 分区的个数尽量等于集群中的CPU核心(core)数目

#手动分区
sc.textFile(path, partitionNum)

Array = [1,2,3,4,5]
rdd = sc.parallelize(Array, 2)
#default: spark.default.parallelism

#使用repartition方法重新设置分区个数
>>> len(rdd.glom().collect())
2
>>> rdd1 = rdd.repartition(1)
>>> len(rdd1.glom().collect())
1

#自定义分区
#1.HashPartitioner
#2.RangePartitioner
#3.自定义的分区函数
$ vim /opt/module/sprak/mycode/rdd/TestPartitioner.py
from pyspark import SparkConf, SparkContext
def MyPartitioner(key):
    print("MyPartitioner is running")
    print("The key is %d" % key)
    return key % 10
def main():
    print("The main function is running")
    conf = SparkConf().setMaster("local").setAppName("MyAPP")
    sc = SparkContext(conf = conf)
    data = sc.parallelize(range(10), 5)
    data.map(lambda x : (x, 1)) 
        .partitionBy(10, MyPartitioner) 
        .map(lambda x : x[0]) 
        .saveAsTextFile("file:///opt/module/spark/mycode/rdd/partitioner")
if __name__ == '__main__':
    main()
    
$ python3 TestPartitioner.py 
The main function is running
MyPartitioner is running
The key is 0
MyPartitioner is running
The key is 1
MyPartitioner is running                                            (1 + 1) / 5]
The key is 2
MyPartitioner is running
The key is 3
MyPartitioner is running========>                                   (2 + 1) / 5]
The key is 4
MyPartitioner is running
The key is 5
MyPartitioner is running
The key is 6
MyPartitioner is running
The key is 7
MyPartitioner is running
The key is 8
MyPartitioner is running
The key is 9

$ cd partitioner/
$ ls
part-00000  part-00002  part-00004  part-00006  part-00008  _SUCCESS
part-00001  part-00003  part-00005  part-00007  part-00009
$ cat part-00000
0

WordCount实例
>>> lines = sc.textFile("file:///opt/module/spark/mycode/rdd/word.txt")
>>> wordCount = lines.flatMap(lambda line : line.split(" "))  #映射加拍扁
...                  .map(lambda word : (word, 1))  #产生键值对
...                  .reduceByKey(lambda a, b : a + b) #迭代求和
>>> print(wordCount.collect())
[('Hello', 2), ('Spark', 3), ('is', 3), ('good', 1), ('Hadoop', 2), ('fast', 1), ('better', 1)]

键值对RDD
$ cd /opt/module/spark/mycode/
$ mkdir pairRdd
$ cd pairRdd/
$ vim word.txt
I love Python
I like Java
I download Hadoop
I use Spark
Hadoop is good
Spark is faster than Hadoop
>>> lines = sc.textFile("file:///opt/module/spark/mycode/pairRdd/word.txt")
>>> pairRdd = lines.flatMap(lambda line : line.split(" ")) 
...                .map(lambda word : (word, 1))
>>> print(pairRdd.collect())
[('I', 1), ('love', 1), ('Python', 1), ('I', 1), ('like', 1), ('Java', 1), ('I', 1), ('download', 1), ('Hadoop', 1), ('I', 1), ('use', 1), ('Spark', 1), ('Hadoop', 1), ('is', 1), ('good', 1), ('Spark', 1), ('is', 1), ('faster', 1), ('than', 1), ('Hadoop', 1), ('', 1)]

通过并行集合(列表)创建RDD

>>> myList = ['Hadoop', "Spark", "Hive", "Spark"]
>>> rdd = sc.parallelize(myList)
>>> pairRdd = rdd.map(lambda word : (word, 1))
>>> pairRdd.foreach(print)
('Hive', 1)
('Spark', 1)
('Spark', 1)
('Hadoop', 1)
常用的键值对Transformation
#reduceByKey(func) 使用func函数合并具有相同键的值
>>> pairRdd.reduceByKey(lambda a, b : a + b).foreach(print)
('Spark', 2)
('Hive', 1)
('Hadoop', 1)

#groupBykey() 对具有相同键的值进行分组
>>> myList = [("Spark", 3), ("Spark", 5), ("Hadoop", 2), ("Hadoop", 6)]
>>> rdd = sc.parallelize(myList)
>>> pairRdd = rdd.groupByKey()
>>> pairRdd.foreach(print)
('Spark', )
('Hadoop', )
#实现和reduceByKey(func)一样的功能
>>> pairRdd = rdd.groupByKey().map(lambda t : (t[0], sum(t[1])))
>>> pairRdd.foreach(print)
('Hadoop', 8)
('Spark', 8)

#keys() 只会把键值对RDD中的key返回,形成一个新的RDD
>>> pairRDD = sc.parallelize([("Hadoop", 1), ("Spark", 1), ("Hive", 1), ("Spark", 1)])
>>> pairRDD.keys().foreach(print)
Hive
Hadoop
Spark
Spark

#values() 只会把键值对RDD中的value返回
>>> pairRDD.values().foreach(print)
1
1
1
1

#sortByKey() 返回一个根据key排序的RDD
>>> pairRDD.sortByKey().foreach(print)
('Hadoop', 1)
('Hive', 1)
('Spark', 1)
('Spark', 1)
>>> pairRDD.foreach(print)
('Hive', 1)
('Spark', 1)
('Spark', 1)
('Hadoop', 1)

#sortBy() 根据其他字段进行排序 False表示降序
>>> d1 = sc.parallelize([("c", 8), ("b", 25), ("c", 17), ("a", 42), ("b", 4), 
... ("d", 9), ("e", 17), ("c", 2), ("f", 29), ('g', 21), ("b", 9)])
>>> d2 = d1.reduceByKey(lambda a, b : a + b).sortBy(lambda x : x, False)
>>> print(d2.collect())
[('g', 21), ('f', 29), ('e', 17), ('d', 9), ('c', 27), ('b', 38), ('a', 42)]
>>> d3 = d1.reduceByKey(lambda a, b : a + b).sortBy(lambda x : x[0], False)
>>> print(d3.collect())
[('g', 21), ('f', 29), ('e', 17), ('d', 9), ('c', 27), ('b', 38), ('a', 42)]
>>> d4 = d1.reduceByKey(lambda a, b : a + b).sortBy(lambda x : x[1], False)
>>> print(d4.collect())
[('a', 42), ('b', 38), ('f', 29), ('c', 27), ('g', 21), ('e', 17), ('d', 9)]

#mapValues(func) 对键值对RDD中的每一个value都应用一个函数,但是,key不会发生变化
>>> pairRDD = sc.parallelize([("Hadoop", 1), ("Spark", 1), ("Hive", 1), ("Spark", 1)])
>>> pairRDD1 = pairRDD.mapValues(lambda x : x + 1)
>>> print(pairRDD1.collect())
[('Hadoop', 2), ('Spark', 2), ('Hive', 2), ('Spark', 2)]

#join() 内连接 (K, V1) + (K, V2) = (K, (V1, V2))
>>> pairRDD1 = sc.parallelize([("Hadoop", 1), ("Spark", 1), ("Hive", 1), ("Spark", 1)])
>>> pairRDD2 = sc.parallelize([("Spark", "fast"), ("Hadoop", "good")])
>>> pairRDD3 = pairRDD1.join(pairRDD2)
>>> pairRDD3.foreach(print)
('Spark', (1, 'fast'))
('Spark', (1, 'fast'))
('Hadoop', (1, 'good'))

'''
combineByKey(createCombiner, mergevalue, mergeCombiners, partitioner, mapSideCombine)
1. createCombiner 在第一次遇到key时创建组合器函数,将RDD数据集中的V类型值转换为C类型值
2. mergevalue 合并值函数 再次遇到相同的key时,将createCombiner的C类型值与传入的V类型值合并成一个C类型值
3. mergeCombiners 合并组合器函数,将C类型值两两合并成一个C类型值
4. partitioner 使用已有的或自定义的分区函数,默认是HashPartitioner
5. mapSideCombine 是否在map端进行Combine操作,默认为true
实例 求公司总收入和月平均收入
'''
$ cd /opt/module/spark/mycode/rdd/
$ pwd
/opt/module/spark/mycode/rdd
$ vim Combine.py 
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("Combine ")
sc = SparkContext(conf = conf)
data = sc.parallelize([("company-1", 88), ("company-1", 96), ("company-1", 85), 
                       ("company-2", 94), ("company-2", 86), ("company-2", 74), 
                       ("company-3", 86), ("company-3", 88), ("company-3", 92)], 3)
res = data.combineByKey(lambda income : (income, 1), 
                        lambda acc, income : (acc[0] + income, acc[1] + 1), 
                        lambda acc1, acc2 : (acc1[0] + acc2[0], acc1[1] + acc2[1])). 
                        map(lambda x : (x[0], x[1][0], x[1][0]/float(x[1][1])))
res.repartition(1).saveAsTextFile("file:///opt/module/spark/mycode/rdd/combineresult")
$ python3 Combine.py 
$ cat combineresult/part-00000 
('company-1', 269, 89.66666666666667)
('company-3', 266, 88.66666666666667)
('company-2', 254, 84.66666666666667)

综合实例
'''
key 图书名称
value 图书当天销量
Task 计算每种图书日平均销量
'''
>>> rdd = sc.parallelize([("Spark", 2), ("Hadoop", 3), ("Spark", 8), ("Hadoop", 3)])
>>> rddAve = rdd.mapValues(lambda x : (x, 1)). 
...              reduceByKey(lambda x, y : (x[0] + y[0], x[1] + y[1])). 
...              mapValues(lambda x : x[0]/x[1])
>>> rddAve.foreach(print)
('Hadoop', 3.0)
('Spark', 5.0)
#注意map和mapValues的区别
>>> rdd.map(lambda x: (x, 1)).foreach(print)
(('Spark', 8), 1)
(('Hadoop', 3), 1)
(('Hadoop', 3), 1)
(('Spark', 2), 1)
>>> rdd.mapValues(lambda x: (x, 1)).foreach(print)
('Spark', (8, 1))
('Hadoop', (3, 1))
('Hadoop', (3, 1))
('Spark', (2, 1))
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/583061.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号