from unittest import result
from pyspark import SparkConf,SparkContext
import json
# /opt/module/spark/bin/spark-submit /opt/Code/broadcast.py
if __name__ == '__main__':
conf = SparkConf().setAppName("WorldCount").setMaster("local[*]")
sc = SparkContext(conf=conf)
stu_info_list = [(1,'张大仙',11),
(2,'王晓晓',13),
(3,'张甜甜',11),
(4,'王大力',11),
]
broadcast = sc.broadcast(stu_info_list)
score_info_rdd = sc.parallelize([(1,'语文',99),
(2,'语文',89),
(3,'语文',79),
(4,'语文',69)
])
def map_func(data):
id = data[0]
name = ''
value = broadcast.value
for i in value:
if id == i[0]:
name = i[1]
return (name,data[1],data[2])
print(score_info_rdd.map(map_func).collect())
from unittest import result
from pyspark import SparkConf,SparkContext
import json
# /opt/module/spark/bin/spark-submit /opt/Code/broadcast.py
if __name__ == '__main__':
conf = SparkConf().setAppName("WorldCount").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10],2)
count = 0
def map_func(data):
global count
count += 1
print(count)
rdd.map(map_func).collect()
print(count)
from unittest import result
from pyspark import SparkConf,SparkContext
import json
# /opt/module/spark/bin/spark-submit /opt/Code/broadcast.py
if __name__ == '__main__':
conf = SparkConf().setAppName("WorldCount").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10],2)
acmlt = sc.accumulator(0)
def map_func(data):
global acmlt
acmlt += 1
print(acmlt)
rdd.map(map_func).collect()
print(acmlt)
from unittest import result
from pyspark import SparkConf,SparkContext
import json
# /opt/module/spark/bin/spark-submit /opt/Code/broadcast.py
if __name__ == '__main__':
conf = SparkConf().setAppName("WorldCount").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10],2)
acmlt = sc.accumulator(0)
def map_func(data):
global acmlt
acmlt += 1
print(acmlt)
rdd2 = rdd.map(map_func)
rdd2.cache()
rdd2.collect()
rdd3 = rdd2.map(lambda x:x)
rdd3.collect()
print(acmlt)
from unittest import result
from pyspark import SparkConf,SparkContext
import json
import re
from operator import add
# /opt/module/spark/bin/spark-submit /opt/Code/broadcast.py
if __name__ == '__main__':
conf = SparkConf().setAppName("WorldCount").setMaster("local[*]")
sc = SparkContext(conf=conf)
file_rdd = sc.textFile('file:///opt/Data/accumulator_broadcast_data.txt')
line_rdd = file_rdd.filter(lambda line:line.strip()) # 过滤空行
data_rdd = line_rdd.map(lambda x:x.strip()) # 将前后空格去除
words_rdd = data_rdd.flatMap(lambda x:re.split('s+',x))
abnormal_char = [',','.','!','#','$','%']
broadcast = sc.broadcast(abnormal_char)
acmlt = sc.accumulator(0)
def filter_func(data):
global acmlt
abnormal_char = broadcast.value
if data in abnormal_char:
acmlt += 1
return False
else:
return True
normal_words_rdd =words_rdd.filter(filter_func)
result_rdd = normal_words_rdd.map(lambda x: (x,1)).reduceByKey(add)
print('正常单词计数结果: ',result_rdd.collect())
print('特殊字符数量: ',acmlt)
相关资料:
黑马全网首发PySpark视频教程
链接:https://pan.baidu.com/s/1AY7FmO6w-jCAj7t_tD9oGw
提取码:1234



