这是一个例子,将说明正在发生的事情。
让我们考虑一下
reduce使用某些函数调用列表时会发生什么
f:
reduce(f, [a,b,c]) = f(f(a,b),c)
如果以您的示例为例,
f = lambda u, v: u[1] + v[1]则以上表达式可分解为:
reduce(f, [a,b,c]) = f(f(a,b),c) = f(a[1]+b[1],c)
但是
a[1] + b[1]是整数
__getitem__,因此没有方法,因此会出错。
通常,更好的方法(如下所示)是
map()先提取所需格式的数据,然后应用
reduceByKey()。
MCVE与您的数据
element = sc.parallelize( [ ('A', ('toto' , 10)), ('A', ('titi' , 30)), ('5', ('tata', 10)), ('A', ('toto', 10)) ])您可以使用更复杂的reduce函数 几乎 获得所需的输出:
def add_tuple_values(a, b): try: u = a[1] except: u = a try: v = b[1] except: v = b return u + vprint(element.reduceByKey(add_tuple_values).collect())
除了会导致:
[('A', 50), ('5', ('tata', 10))]为什么? 因为密钥只有一个值
'5',所以没有什么可以减少的。
由于这些原因,最好先致电
map。要获得所需的输出,可以执行以下操作:
>>> print(element.map(lambda x: (x[0], x[1][1])).reduceByKey(lambda u, v: u+v).collect())[('A', 50), ('5', 10)]更新1
这是另一种方法:
您可以
tuple在
reduce函数中创建,然后调用
map以提取所需的值。(基本上颠倒了
map和的顺序
reduce。)
print( element.reduceByKey(lambda u, v: (0,u[1]+v[1])) .map(lambda x: (x[0], x[1][1])) .collect())[('A', 50), ('5', 10)]笔记
- 如果每个键至少有2条记录,使用
add_tuple_values()
将为您提供正确的输出。



