# coding:utf-8
from pyspark import SparkContext
sc = SparkContext(appName='pr')
links = [['A', ['B', 'C']], ['B', ['A', 'C']], ['C', ['A', 'B', 'D']], ['D', ['C']]]
links = sc.parallelize(links).partitionBy(4).persist()
ranks = links.mapValues(lambda x: 1.0) # 初始值为1.0,不能为1,后边要用它作为分母
for i in range(10):
contri = links.join(ranks).flatMap(lambda x: map(lambda dest: (dest, x[1][1]/len(x[1][0])), x[1][0]))
ranks = contri.reduceByKey(lambda x, y: x + y).mapValues(lambda x: 0.15+0.85*x)
print('%s==:' % i, ranks.collect())
print('result==', ranks.collect())
先看一下整体的执行过程及过程值:
为了方便,我们通过 pyspark 进入交互式命令行操作。实际使用中,通过 spark-submit 来提交任务。
>>> links = [['A', ['B', 'C']], ['B', ['A', 'C']], ['C', ['A', 'B', 'D']], ['D', ['C']]]
>>> links = sc.parallelize(links).partitionBy(4).persist()
>>> links.collect()
[('A', ['B', 'C']), ('D', ['C']), ('C', ['A', 'B', 'D']), ('B', ['A', 'C'])]
>>> ranks = links.mapValues(lambda x: 1.0)
>>> ranks.collect()
[('A', 1.0), ('D', 1.0), ('C', 1.0), ('B', 1.0)]
>>> for i in range(10):
... contri = links.join(ranks).flatMap(lambda x: map(lambda dest: (dest, x[1][1]/len(x[1][0])), x[1][0]))
... ranks = contri.reduceByKey(lambda x, y: x + y).mapValues(lambda x: 0.15+0.85*x)
... print('%s==:' % i, ranks.collect())
...
('0==:', [('A', 0.8583333333333333), ('D', 0.43333333333333335), ('C', 1.8499999999999999), ('B', 0.8583333333333333)])
('1==:', [('A', 1.0389583333333332), ('D', 0.6741666666666666), ('C', 1.2479166666666666), ('B', 1.0389583333333332)])
('2==:', [('A', 0.9451336805555555), ('D', 0.5035763888888889), ('C', 1.6061562499999997), ('B', 0.9451336805555555)])
('3==:', [('A', 1.0067594184027777), ('D', 0.6050776041666666), ('C', 1.3814035590277776), ('B', 1.0067594184027777)])
('4==:', [('A', 0.9692704278790508), ('D', 0.5413976750578703), ('C', 1.5200614691840277), ('B', 0.9692704278790508)])
('5==:', [('A', 0.9926240147840711), ('D', 0.5806840829354745), ('C', 1.4340678874963828), ('B', 0.9926240147840711)])
('6==:', [('A', 0.978184441073872), ('D', 0.5563192347906418), ('C', 1.4873118830616134), ('B', 0.978184441073872)])
('7==:', [('A', 0.9871334209905194), ('D', 0.5714050335341238), ('C', 1.454328124484837), ('B', 0.9871334209905194)])
('8==:', [('A', 0.9815913391916745), ('D', 0.5620596352707038), ('C', 1.4747576863459464), ('B', 0.9815913391916745)])
('9==:', [('A', 0.9850243302878131), ('D', 0.5678480111313514), ('C', 1.4621033282930214), ('B', 0.9850243302878131)])
>>> print('result==', ranks.collect())
('result==', [('A', 0.9850243302878131), ('D', 0.5678480111313514), ('C', 1.4621033282930214), ('B', 0.9850243302878131)])



