栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

数据框pyspark到dict

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

数据框pyspark到dict

您可以使用数据框转换和udfs完成所有这些操作。唯一有点烦人的事情是,因为从技术上讲,您有两种不同类型的字典(一种是key = integer和value
= dictionary,另一种是key = integer value =
float),因此您将必须定义两个具有不同数据类型的udf。这是执行此操作的一种可能方法:

from pyspark.sql.functions import udf,collect_list,create_mapfrom pyspark.sql.types import MapType,IntegerType,FloatTypedata = [[160,163,27.0],[160,183,27.0],[161,162,22.0],      [161,170,31.0],[162,161,22.0],[162,167,24.0],      [163,160,27.0],[163,164,27.0],[164,163,27.0],      [164,165,35.0],[165,164,35.0],[165,166,33.0],      [166,165,33.0],[166,167,31.0],[167,162,24.0],      [167,166,31.0],[167,168,27.0],[168,167,27.0],      [168,169,23.0],[169,168,23.0]]cols = ['FromComponentID','ToComponentID','Cost']df = spark.createDataframe(data,cols)combineMap = udf(lambda maps: {key:f[key] for f in maps for key in f},  MapType(IntegerType(),FloatType()))combineDeepMap = udf(lambda maps: {key:f[key] for f in maps for key in f},  MapType(IntegerType(),MapType(IntegerType(),FloatType())))mapdf = df.groupBy('FromComponentID').agg(collect_list(create_map('ToComponentID','Cost')).alias('maps')).agg(combineDeepMap(collect_list(create_map('FromComponentID',combineMap('maps')))))result_dict = mapdf.collect()[0][0]

对于大型数据集,这应比要求将数据收集到单个节点上的解决方案提供一些性能提升。但是由于spark仍然必须序列化udf,因此基于rdd的解决方案不会有太大的收获。


更新:

rdd解决方案要紧凑得多,但在我看来,它并不是那么干净。这是因为pyspark不会很容易将大型词典存储为rdds。解决方案是将其存储为元组的分布式列表,然后在将其收集到单个节点时将其转换为字典。这是一种可能的解决方案:

maprdd = df.rdd.groupBy(lambda x:x[0]).map(lambda x:(x[0],{y[1]:y[2] for y in x[1]}))result_dict = dict(maprdd.collect())

再次,这应该比单个节点上的纯python实现提供性能提升,并且可能与数据框实现没有什么不同,但是我期望数据框版本将具有更高的性能。



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/669441.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号