- 问题一:通过一张用户表和一张订单表生成每个用户的订单
- 1. 存储数据
- 2. Mapper
- 2. Reducer
- 3. 完整代码
- 4. 运行
- 5. 结果
- 问题二:还是合并两个表,显示效果为每个用户所下订单的订单总量和累计消费金额
- 1. 数据存储,Mapper部分同上
- 2. Reducer
- 3. 完整代码
- 4. 运行
- 5. 结果
- 以上
输入:
uid uname 01 user1 02 user2 03 user3 uid orderid order_price 01 01 80 01 02 90 02 03 82 02 04 95
输出:
"01:user1" "01:80,02:90" "02:user2" "03:82,04:95"1. 存储数据
用两个csv文件两张表
# user_table.csv uid uname 01 user1 02 user2 03 user3
# order_table.csv uid orderid order_price 01 01 80 01 02 90 02 03 82 02 04 952. Mapper
SORT_VALUES = True
def mapper(self, _, line):
fields = line.strip().split('t')
if len(fields) == 2:
source = 'A'
user_id = fields[0]
user_name = fields[1]
if not user_id.isdigit():
return
yield user_id, [source, user_name]
elif len(fields) == 3:
source = 'B'
user_id = fields[0]
order_id = fields[1]
price = fields[2]
if not user_id.isdigit():
return
yield user_id, [source, order_id, price]
else:
pass
结合图片和代码段:
Mapper中,为了能够在下一步Sort排序相同user_id的行时,用户表的行在前,订单表的行在后(下一张图中有详细说明),这里用source = A和source = B来区分用户表和订单表。
并且加了 SORT_VALUES = True
Secondary sort 1
MRJob.SORT_VALUES = None
Set this to True if you would like reducers to receive the values associated with any key in sorted order (sorted by their encoded value). Also known as secondary sort.
如果希望reducers按排序顺序(按编码值排序)接收与任何键相关联的值,请将其设置为True。也称为辅助排序。
根据文档的解释,把SORT_VALUES设置为True后,在排序时,会按照与键相关联的值编码值(本例中的source=A)的顺序作为辅助排序。
图为Sort的结果
def reducer(self, user_id, values): values = [v for v in values] if len(values) > 1: user_name = values[0][1] order_info = [':'.join([v[1],v[2]]) for v in values[1:]] yield ':'.join([user_id, user_name]), ','.join(order_info)
Reducer部分,结合图和代码段,目的是输出如下格式:
"01:user1" "01:80,02:90" "02:user2" "03:82,04:95"
比较复杂的是order_info = [’:’.join([v[1],v[2]]) for v in values[1:]]
图中粉色笔迹部分解释了这段代码。
# MergeUserOrderTable.py
#!/miniconda2/bin/python
# encoding=utf-8
from mrjob.job import MRJob
class MergeUserOrderTable(MRJob):
SORT_VALUES = True
def mapper(self, _, line):
fields = line.strip().split('t')
if len(fields) == 2:
source = 'A'
user_id = fields[0]
user_name = fields[1]
if not user_id.isdigit():
return
yield user_id, [source, user_name]
elif len(fields) == 3:
source = 'B'
user_id = fields[0]
order_id = fields[1]
price = fields[2]
if not user_id.isdigit():
return
yield user_id, [source, order_id, price]
else:
pass
def reducer(self, user_id, values):
values = [v for v in values]
if len(values) > 1:
user_name = values[0][1]
order_info = [':'.join([v[1],v[2]]) for v in values[1:]]
yield ':'.join([user_id, user_name]), ','.join(order_info)
def main():
MergeUserOrderTable.run()
if __name__ == '__main__':
main()
4. 运行
python MergeUserOrderTable.py ./user_table.csv ./order_table.csv -o output_order
output_order是接受运行结果的文件夹
5. 结果# part-00000 "01:user1" "01:80,02:90"
# part-00001 "02:user2" "03:82,04:95"
# part-00002问题二:还是合并两个表,显示效果为每个用户所下订单的订单总量和累计消费金额
输入:
uid uname 01 user1 02 user2 03 user3 uid orderid order_price 01 01 80 01 02 90 02 03 82 02 04 95
输出:
"01:user1" [2, 170] "02:user2" [2, 177]1. 数据存储,Mapper部分同上 2. Reducer
def reducer(self, user_id, values): values = [v for v in values] if len(values) > 1: user_name = values[0][1] order_count = len(values) - 1 order_info = [v[2] for v in values[1:]] # price_count = sum(order_info) price_count = 0 for price in order_info: price_count = price_count + int(price) yield ':'.join([user_id, user_name]), [order_count, price_count]
Reducer部分,目的是输出如下格式:
"01:user1" [2, 170] "02:user2" [2, 177]3. 完整代码
# UserOrderCount.py
#!/miniconda2/bin/python
# encoding=utf-8
from mrjob.job import MRJob
class UserOrderCount(MRJob):
SORT_VALUES = True
def mapper(self, _, line):
fields = line.strip().split('t')
if len(fields) == 2:
source = 'A'
user_id = fields[0]
user_name = fields[1]
if not user_id.isdigit():
return
yield user_id, [source, user_name]
elif len(fields) == 3:
source = 'B'
user_id = fields[0]
order_id = fields[1]
price = fields[2]
if not user_id.isdigit():
return
yield user_id, [source, order_id, price]
else:
pass
def reducer(self, user_id, values):
values = [v for v in values]
if len(values) > 1:
user_name = values[0][1]
order_count = len(values) - 1
order_info = [v[2] for v in values[1:]]
# price_count = sum(order_info)
price_count = 0
for price in order_info:
price_count = price_count + int(price)
yield ':'.join([user_id, user_name]), [order_count, price_count]
def main():
UserOrderCount.run()
if __name__ == '__main__':
main()
4. 运行
python UserOrderCount.py ./user_table.csv ./order_table.csv -o ./output_price
output_price是接受运行结果的文件夹
5. 结果# part-00000 "01:user1" [2, 170]
# part-00001 "02:user2" [2, 177]
# part-00002以上
https://mrjob.readthedocs.io/en/latest/job.html?highlight=SORT_VALUES#mrjob.job.MRJob.SORT_VALUES ↩︎


![[Hadoop]Python MRJob实现将两个文件合并 [Hadoop]Python MRJob实现将两个文件合并](http://www.mshxw.com/aiimages/31/327383.png)
