对hadoop的新认识:
pybistreaming架构下的编程方式跟原生的hadoop有一些不同:
为了方便使用,这里不再需要用户自己判断是不是来了一个新的key,
对于同一个key下的所有value,reduce() 函数只会被调用一次
itervalues是一个generator, 可以直接用 for value in itervalues 迭代它 (不要用下标[i]索引,不支持)
对于原生hadoop,以词频统计为例,每执行一次reduce就需要判断当前stdin里的key,也就是word,跟上一个词是不是一样,因为虽然它把key相同的键值对集合在了前后相邻的位置,但还是一条一条分开的,并没有把对应的vals整合到同一个数据结构里:
而对于pybistreaming,:
import pybistreaming
class MyMapper(pybistreaming.BistreamingMapper):
def setup(self)(self):
#在程序启动后,所有的数据被map处理前被调用
#如果不需要,这个函数可以不用重写
return 0
def cleanup(self):
#在所有的数据被map处理后,程序退出关被调用
#如果不需要,这个函数可以不用重写
return 0
def map(self, input_record):
#每条record都会被这个函数处理一次
key = input_record.key()
value = input_record.value()
# 这里写自己的逻辑
out_key = "xxx"
out_value = "xxxx"
# 输出一条K-V,
self.emit(out_key, out_value)
return 0
class MyReducer(pybistreaming.BistreamingReducer):
def setup(self):
#在程序启动后,所有的数据被reduce处理前被调用
#如果不需要,这个函数可以不用重写
return 0
def cleanup(self):
#在所有的数据被map处理后,程序退出关被调用
#如果不需要,这个函数可以不用重写
return 0
def reduce(self, key, itervalues):
"""为了方便使用,这里不再需要用户自己判断是不是来了一个新的key,
对于同一个key下的所有value,reduce() 函数只会被调用一次
itervalues是一个generator, 可以直接用 for value in itervalues 迭代它 (不要用下标[i]索引,不支持)
是不是很简单 :-)
"""
for value in itervalues:
# do something
out_key="xxx"
out_value="xxx"
self.emit(out_key, out_value)
return 0
所以,同一个key下的所有value会被集合到一个迭代器里。
同时,可以这么写,把迭代器转换成list数据结构:
def reduce(self, key, itervalues):
values = list(itervalues)
if len(values) == 1:
for val in eval(values[0]):
val_json = json.loads(val)



