- 1、准备数据
- 2、创建 map.py函数
- 3、创建 red.py函数
- 4、脚本实现需求
与这篇博文有点关系的可以参考下:
① MapReduce 计算框架 —— 执行流程详解
② 在Linux环境实现wordcount:mapper,reducer的代码创建,脚本实现map,reduce
③ Linux实现 map 返回列表形式操作
④ Linux hadoop 脚本实现 reduce合并数据
1、准备数据
有 a_join.txt 数据
user_id order_id
aaa1 123 aaa2 123 aaa3 123 aaa4 123 aaa5 123 aaa6 123 aaa7 123 aaa8 123 aaa9 123 aaa10 123 aaa11 123
有 b_join.txt 数据
user_id amount
aaa1 hadoop aaa2 hadoop aaa3 hadoop aaa4 hadoop aaa5 hadoop aaa6 hadoop aaa7 hadoop aaa8 hadoop aaa9 hadoop aaa10 hadoop aaa11 hadoop
使得两个数据,以key合成新的数据。
user_id order_id amount
aaa1 123 hadoop aaa2 123 hadoop
怎么办?
可以先把 a_join 转为 map_a;
aaa1 1 123 aaa2 1 123
同理,b_join 转为 map_b;
aaa1 2 hadoop aaa2 2 hadoop
2、创建 map.py函数
vi map_a.py
#!/usr/local/bin/python
import sys
for line in sys.stdin:
ss = line.strip().split(' ')
key = ss[0]
val = ss[1]
print ("%st1t%s" % (key, val))
把输出数据复制到 a1 中
vi map_b.py
#!/usr/local/bin/python
import sys
for line in sys.stdin:
ss = line.strip().split(' ')
key = ss[0]
val = ss[1]
print ("%st2t%s" % (key, val))
把输出数据复制到 b2 中
要对两个map函数的返回结果文件a1,b2 进行reduce操作。
3、创建 red.py函数
import sys
val_1 = ""
for line in sys.stdin:
key, flag, val = line.strip().split('t')
if flag == '1':
val_1 = val
elif flag == '2' and val_1 != "":
val_2 = val
print ("%st%st%s" % (key, val_1, val_2))
val_1 = ""
如上图,已经把两个文件reduce成,我们想要的结果。
4、脚本实现需求
创建 run.sh
set -e -x
HADOOP_CMD="/usr/hadoop/hadoop-2.7.3/bin/hadoop"
# hdfs输入路径
INPUT_FILE_PATH_A="/test/a_join.txt"
INPUT_FILE_PATH_B="/test/b_join.txt"
# hdfs输出路径
OUTPUT_A_PATH="/output/a"
OUTPUT_B_PATH="/output/b"
OUTPUT_JOIN_PATH="/output/join"
# 已经有输出路径就进行删除
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_A_PATH $OUTPUT_B_PATH $OUTPUT_JOIN_PATH
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_JOIN_PATH
# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH
-input $INPUT_FILE_PATH_A
-output $OUTPUT_A_PATH
-mapper "python map_a.py"
-file ./map_a.py
# Step 2.
$HADOOP_CMD jar $STREAM_JAR_PATH
-input $INPUT_FILE_PATH_B
-output $OUTPUT_B_PATH
-mapper "python map_b.py"
-file ./map_b.py
# Step 3.
$HADOOP_CMD jar $STREAM_JAR_PATH
-input $OUTPUT_A_PATH,$OUTPUT_B_PATH
-output $OUTPUT_JOIN_PATH
-mapper "cat"
-reducer "python red_join.py"
-file ./red_join.py
-jobconf stream.num.map.output.key.fields=2
-jobconf num.key.fields.for.partition=1
-mapper "cat" 意思是把前面两个map输出结果作为red的输入; -jobconf stream.num.map.output.key.fields=2 意思是: 组合key=(aaa1 1), value=123; -jobconf num.key.fields.for.partition=1 意思是:使用数据的第一列作为partition。
查看是否已经上次输入文件hadoop fs -ls /test ,没有就上传文件 hadoop fs -put ./a_join.txt b_join.txt /test/ 。
查看输出文件是否存在,hadoop fs -ls /output
执行脚本命令:sh -x run.sh
查看输出目录的 /output/join
hadoop fs -ls /output/join hadoop fs -cat /output/join | head
脚本实现完成!!!
【注意细节】:
(1)修改别人代码前,必须备份!!!
(2)查看别人代码时,习惯使用 :q! 强制退出不保存



