栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

[Hadoop]Python MRJob实现将两个文件合并

[Hadoop]Python MRJob实现将两个文件合并

[Hadoop]Python MRJob实现将两个文件合并
  • 问题一:通过一张用户表和一张订单表生成每个用户的订单
    • 1. 存储数据
    • 2. Mapper
    • 2. Reducer
    • 3. 完整代码
    • 4. 运行
    • 5. 结果
  • 问题二:还是合并两个表,显示效果为每个用户所下订单的订单总量和累计消费金额
    • 1. 数据存储,Mapper部分同上
    • 2. Reducer
    • 3. 完整代码
    • 4. 运行
    • 5. 结果
      • 以上

问题一:通过一张用户表和一张订单表生成每个用户的订单

输入:

uid uname
01 user1 
02 user2
03 user3
uid orderid order_price
01   01     80
01   02     90
02   03    82
02   04    95

输出:

"01:user1"	"01:80,02:90"
"02:user2"	"03:82,04:95"
1. 存储数据

用两个csv文件两张表

# user_table.csv
uid uname
01 user1 
02 user2
03 user3
# order_table.csv
uid orderid order_price
01   01     80
01   02     90
02   03    82
02   04    95
2. Mapper

	SORT_VALUES = True
	def mapper(self, _, line):
		fields = line.strip().split('t')
		if len(fields) == 2:
			source = 'A'
			user_id = fields[0]
			user_name = fields[1]
			if not user_id.isdigit():
				return
			yield user_id, [source, user_name]
		elif len(fields) == 3:
			source = 'B'
			user_id = fields[0]
			order_id = fields[1]
			price = fields[2]
			if not user_id.isdigit():
				return
			yield user_id, [source, order_id, price]
		else:
			pass

结合图片和代码段:
Mapper中,为了能够在下一步Sort排序相同user_id的行时,用户表的行在前,订单表的行在后(下一张图中有详细说明),这里用source = A和source = B来区分用户表和订单表。
并且加了 SORT_VALUES = True

Secondary sort 1
MRJob.SORT_VALUES = None
Set this to True if you would like reducers to receive the values associated with any key in sorted order (sorted by their encoded value). Also known as secondary sort.

如果希望reducers按排序顺序(按编码值排序)接收与任何键相关联的值,请将其设置为True。也称为辅助排序。

根据文档的解释,把SORT_VALUES设置为True后,在排序时,会按照与键相关联的值编码值(本例中的source=A)的顺序作为辅助排序。

图为Sort的结果

2. Reducer

	def reducer(self, user_id, values):
		values = [v for v in values]
		if len(values) > 1:
			user_name = values[0][1]
			order_info = [':'.join([v[1],v[2]]) for v in values[1:]]
			yield ':'.join([user_id, user_name]), ','.join(order_info)

Reducer部分,结合图和代码段,目的是输出如下格式:

"01:user1"	"01:80,02:90"
"02:user2"	"03:82,04:95"

比较复杂的是order_info = [’:’.join([v[1],v[2]]) for v in values[1:]]
图中粉色笔迹部分解释了这段代码。

3. 完整代码
# MergeUserOrderTable.py
#!/miniconda2/bin/python
# encoding=utf-8

from mrjob.job import MRJob

class MergeUserOrderTable(MRJob):
	SORT_VALUES = True
	def mapper(self, _, line):
		fields = line.strip().split('t')
		if len(fields) == 2:
			source = 'A'
			user_id = fields[0]
			user_name = fields[1]
			if not user_id.isdigit():
				return
			yield user_id, [source, user_name]
		elif len(fields) == 3:
			source = 'B'
			user_id = fields[0]
			order_id = fields[1]
			price = fields[2]
			if not user_id.isdigit():
				return
			yield user_id, [source, order_id, price]
		else:
			pass
	
	def reducer(self, user_id, values):
		values = [v for v in values]
		if len(values) > 1:
			user_name = values[0][1]
			order_info = [':'.join([v[1],v[2]]) for v in values[1:]]
			yield ':'.join([user_id, user_name]), ','.join(order_info)

def main():
	MergeUserOrderTable.run()

if __name__ == '__main__':
	main()
4. 运行
python MergeUserOrderTable.py ./user_table.csv ./order_table.csv -o output_order

output_order是接受运行结果的文件夹

5. 结果
# part-00000
"01:user1"	"01:80,02:90"
# part-00001
"02:user2"	"03:82,04:95"
# part-00002

问题二:还是合并两个表,显示效果为每个用户所下订单的订单总量和累计消费金额

输入:

uid uname
01 user1 
02 user2
03 user3
uid orderid order_price
01   01     80
01   02     90
02   03    82
02   04    95

输出:

"01:user1"	[2, 170]
"02:user2"	[2, 177]
1. 数据存储,Mapper部分同上 2. Reducer
	def reducer(self, user_id, values):
		values = [v for v in values]
		if len(values) > 1:
			user_name = values[0][1]
			order_count = len(values) - 1
			order_info = [v[2] for v in values[1:]]
			# price_count = sum(order_info)
			price_count = 0
			for price in order_info:
				price_count = price_count + int(price)
			yield ':'.join([user_id, user_name]), [order_count, price_count]

Reducer部分,目的是输出如下格式:

"01:user1"	[2, 170]
"02:user2"	[2, 177]
3. 完整代码
# UserOrderCount.py
#!/miniconda2/bin/python
# encoding=utf-8

from mrjob.job import MRJob

class UserOrderCount(MRJob):
	SORT_VALUES = True
	def mapper(self, _, line):
		fields = line.strip().split('t')
		if len(fields) == 2:
			source = 'A'
			user_id = fields[0]
			user_name = fields[1]
			if not user_id.isdigit():
				return
			yield user_id, [source, user_name]
		elif len(fields) == 3:
			source = 'B'
			user_id = fields[0]
			order_id = fields[1]
			price = fields[2]
			if not user_id.isdigit():
				return
			yield user_id, [source, order_id, price]
		else:
			pass
	
	def reducer(self, user_id, values):
		values = [v for v in values]
		if len(values) > 1:
			user_name = values[0][1]
			order_count = len(values) - 1
			order_info = [v[2] for v in values[1:]]
			# price_count = sum(order_info)
			price_count = 0
			for price in order_info:
				price_count = price_count + int(price)
			yield ':'.join([user_id, user_name]), [order_count, price_count]

def main():
	UserOrderCount.run()

if __name__ == '__main__':
	main()
4. 运行
python UserOrderCount.py ./user_table.csv ./order_table.csv -o ./output_price

output_price是接受运行结果的文件夹

5. 结果
# part-00000
"01:user1"	[2, 170]
# part-00001
"02:user2"	[2, 177]
# part-00002

以上
  1. https://mrjob.readthedocs.io/en/latest/job.html?highlight=SORT_VALUES#mrjob.job.MRJob.SORT_VALUES ↩︎

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/327383.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号