栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Python:Linux hadoop 脚本实现 reduce合并数据

Python:Linux hadoop 脚本实现 reduce合并数据

文章目录
    • 1、准备数据
    • 2、创建 map.py函数
    • 3、创建 red.py函数
    • 4、脚本实现需求


与这篇博文有点关系的可以参考下:

① MapReduce 计算框架 —— 执行流程详解

② 在Linux环境实现wordcount:mapper,reducer的代码创建,脚本实现map,reduce

③ Linux实现 map 返回列表形式操作

④ Linux hadoop 脚本实现 reduce合并数据


1、准备数据

有 a_join.txt 数据
user_id order_id

aaa1    123
aaa2    123
aaa3    123
aaa4    123
aaa5    123
aaa6    123
aaa7    123
aaa8    123
aaa9    123
aaa10   123
aaa11   123

有 b_join.txt 数据
user_id amount

aaa1    hadoop
aaa2    hadoop
aaa3    hadoop
aaa4    hadoop
aaa5    hadoop
aaa6    hadoop
aaa7    hadoop
aaa8    hadoop
aaa9    hadoop
aaa10   hadoop
aaa11   hadoop

使得两个数据,以key合成新的数据。
user_id order_id amount

aaa1 123 hadoop
aaa2 123 hadoop

怎么办?

可以先把 a_join 转为 map_a;

aaa1	1	123
aaa2	1	123

同理,b_join 转为 map_b;

aaa1	2	hadoop
aaa2	2	hadoop

2、创建 map.py函数

vi map_a.py

#!/usr/local/bin/python
import sys

for line in sys.stdin:
    ss = line.strip().split('   ')

    key = ss[0]
    val = ss[1]

    print ("%st1t%s" % (key, val))


把输出数据复制到 a1 中


vi map_b.py

#!/usr/local/bin/python
import sys

for line in sys.stdin:
    ss = line.strip().split('   ')

    key = ss[0]
    val = ss[1]

    print ("%st2t%s" % (key, val))


把输出数据复制到 b2 中

要对两个map函数的返回结果文件a1,b2 进行reduce操作。


3、创建 red.py函数
import sys

val_1 = ""

for line in sys.stdin:
    key, flag, val = line.strip().split('t')

    if flag == '1':
        val_1 = val
    elif flag == '2' and val_1 != "":
        val_2 = val
        print ("%st%st%s" % (key, val_1, val_2))
        val_1 = ""


如上图,已经把两个文件reduce成,我们想要的结果。


4、脚本实现需求

创建 run.sh

set -e -x

HADOOP_CMD="/usr/hadoop/hadoop-2.7.3/bin/hadoop"
# hdfs输入路径
INPUT_FILE_PATH_A="/test/a_join.txt"
INPUT_FILE_PATH_B="/test/b_join.txt"

# hdfs输出路径
OUTPUT_A_PATH="/output/a"
OUTPUT_B_PATH="/output/b"

OUTPUT_JOIN_PATH="/output/join"

# 已经有输出路径就进行删除
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_A_PATH $OUTPUT_B_PATH $OUTPUT_JOIN_PATH
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_JOIN_PATH

# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH 
    -input $INPUT_FILE_PATH_A 
    -output $OUTPUT_A_PATH 
    -mapper "python map_a.py" 
    -file ./map_a.py 

# Step 2.
$HADOOP_CMD jar $STREAM_JAR_PATH 
    -input $INPUT_FILE_PATH_B 
    -output $OUTPUT_B_PATH 
    -mapper "python map_b.py" 
    -file ./map_b.py 

# Step 3.
$HADOOP_CMD jar $STREAM_JAR_PATH 
    -input $OUTPUT_A_PATH,$OUTPUT_B_PATH 
    -output $OUTPUT_JOIN_PATH 
    -mapper "cat" 
    -reducer "python red_join.py" 
    -file ./red_join.py 
    -jobconf stream.num.map.output.key.fields=2 
    -jobconf num.key.fields.for.partition=1

-mapper "cat" 
意思是把前面两个map输出结果作为red的输入;

-jobconf stream.num.map.output.key.fields=2 
意思是: 组合key=(aaa1	1), value=123;

-jobconf num.key.fields.for.partition=1
意思是:使用数据的第一列作为partition。

查看是否已经上次输入文件hadoop fs -ls /test ,没有就上传文件 hadoop fs -put ./a_join.txt b_join.txt /test/ 。
查看输出文件是否存在,hadoop fs -ls /output
执行脚本命令:sh -x run.sh

查看输出目录的 /output/join

hadoop fs -ls /output/join
hadoop fs -cat /output/join | head


脚本实现完成!!!

【注意细节】:
(1)修改别人代码前,必须备份!!!
(2)查看别人代码时,习惯使用 :q! 强制退出不保存

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/434092.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号