可以,但是您必须在组合键中包含所有必需的信息:
from pyspark.rdd import portable_hashn = 2def partitioner(n): """Partition by the first item in the key tuple""" def partitioner_(x): return portable_hash(x[0]) % n return partitioner_(rdd .keyBy(lambda kv: (kv[0], kv[1][0])) # Create temporary composite key .repartitionAndSortWithinPartitions( numPartitions=n, partitionFunc=partitioner(n), ascending=False) .map(lambda x: x[1])) # Drop key (note: there is no partitioner set anymore)
分步说明:
keyBy(lambda kv: (kv[0], kv[1][0]))
创建一个替换键,该键由原始键和值的第一个元素组成。换句话说,它可以转换为:(0, (5,1))
进入
((0, 5), (0, (5, 1)))
实际上,简单地将数据重塑为
((0, 5), 1)
partitioner
基于键的第一个元素的哈希定义分区函数,因此:partitioner(7)((0, 5))
0
partitioner(7)((0, 6))
0
partitioner(7)((0, 99))
0
partitioner(7)((3, 99))
3
如您所见,它是一致的,并且忽略了第二位。
我们使用默认
keyfunc
函数identity(lambda x: x
)并依赖于Python上定义的字典顺序tuple
:(0, 5) < (1, 5)
True
(0, 5) < (0, 4)
False
如前所述,您可以改为重塑数据:
rdd.map(lambda kv: ((kv[0], kv[1][0]), kv[1][1]))
然后删除final
map以提高性能。



