栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 其他

Hadoop学习总结-上篇(HDFS、MapReduce)

其他 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Hadoop学习总结-上篇(HDFS、MapReduce)

跟学尚硅谷Hadoop3.x,个人总结

1 入门 1.1 优势与组成(面试重点)

 

HDFS-Hadoop Distributed File System,一个分布式文件系统

NameNode(nn);DataNode(dn);Secondary NameNode(2nn)

YARN-Yet Another Resource Negotiator ,另一种资源协调者,是 Hadoop 的资源管理器

MapReduce -Map 阶段并行处理输入数据; Reduce 阶段对 Map 结果进行汇总

 

1)Sqoop:一款开源的工具,主要用于在Hadoop、Hive 与传统的数据库(MySQL) 间进行数据的传递,可以将一个关系型数据库(例如 :MySQL,Oracle 等)中的数据导进 到Hadoop 的HDFS 中,也可以将HDFS 的数据导进到关系型数据库中。

2)Flume:一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统, Flume 支持在日志系统中定制各类数据发送方,用于收集数据;

3)Kafka:一种高吞吐量的分布式发布订阅消息系统;

4)Spark:当前最流行的开源大数据内存计算框架。可以基于Hadoop 上存储的大数 据进行计算。

5)Flink:当前最流行的开源大数据内存计算框架。用于实时计算的场景较多。

6)Oozie:一个管理Hadoop 作业(job)的工作流程调度管理系统。

7)Hbase:一个分布式的、面向列的开源数据库。Hbase 不同于一般的关系数据库, 它是一个适合于非结构化数据存储的数据库。

8)Hive:基于 Hadoop 的一个数据仓库工具,可以将结构化的数据文件映射为一张 数据库表,并提供简单的 SQL 查询功能,可以将 SQL 语句转换为 MapReduce 任务进行运 行。其优点是学习成本低,可以通过类 SQL 语句快速实现简单的 MapReduce 统计,不必开 发专门的MapReduce 应用,十分适合数据仓库的统计分析。

9)ZooKeeper:它是一个针对大型分布式系统的可靠协调系统,提供的功能包括:配置维护、 名字服务、分布式同步、组服务等。

1.2 Hadoop运行环境搭建

见尚硅谷大数据技术hadoop 3.x

1.3 Hadoop运行模式

Hadoop 运行模式包括:本地模式、伪分布式模式以及完全分布式模式(生产环境使用)

#集群分发脚本xsync 
[henry@hadoop102 ~]$ xsync /home/hengy/bin
#SSH无密登录配置

1.3.1 集群配置(面试重点)

配置workers

配置历史服务器

配置日志聚集

1.4 集群常用操作方法
# 各个模块分开启动/停止(配置 ssh 是前提)
#1 
# 整体启动/停止HDFS
[henry@hadoop102 hadoop-3.1.3]$ start-dfs.sh/ stop-dfs.sh
# 整体启动/停止YARN
[henry@hadoop102 hadoop-3.1.3]$ start-yarn.sh/ stop-yarn.sh
# 各个服务组件逐一启动/停止
hdfs --daemon start/stop namenode/datanode/secondarynamenode
yarn --daemon start/stop resourcemanager/nodemanager
​
#2 删除集群中所有的data/ 和 Logs/
[henry@hadoop102 hadoop-3.1.3]$ rm -rf data/ logs/
[henry@hadoop103 hadoop-3.1.3]$ rm -rf data/ logs/
[henry@hadoop104 hadoop-3.1.3]$ rm -rf data/ logs/
#3 格式化
[henry@hadoop102 hadoop-3.1.3]$ hdfs namenode -format
#4 启动集群
[henry@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh

两个常用Hadoop集群脚本

# Hadoop 集群启停脚本(包含HDFS,Yarn,Historyserver):myhadoop.sh
myhadoop.sh start/stop
# 查看三台服务器 Java 进程脚本:jpsall
1.5 常见问题及解决方案

见尚硅谷大数据技术止hadoop 3.x


2 HDFS 2.1 概述

 

 

 

2.2 Shell操作(开发重点)
#基本语法
hadoop fs 具体命令 OR hdfs dfs 具体命令,两个是完全相同的。
#常用命令实操
#----------------------------------------------上传--------------------------------------------------
#-moveFromLocal:从本地剪切粘贴到HDFS
[henry@hadoop102 hadoop-3.1.3]$ hadoop fs -moveFromLocal ./shuguo.txt /sanguo
#-put:等同于 copyFromLocal,从本地文件系统中拷贝文件到HDFS
[henry@hadoop102 hadoop-3.1.3]$ adoop fs -put weiguo.txt /sanguo
#-appendToFile:追加一个文件到已经存在的文件末尾
[henry@hadoop102 hadoop-3.1.3]$ hadoop fs -appendToFile liubei.txt /sanguo/shuguo.txt
#-----------------------------------------------下载-------------------------------------------------
#-get:等同于 copyToLocal,从HDFS 拷贝到本地
[henry@hadoop102 hadoop-3.1.3]$ hadoop fs -get /sanguo/shuguo.txt ./shuguo2.txt
#-------------------------------------------HDFS 直接操作---------------------------------------------
#-ls: 显示目录信息
[henry@hadoop102 hadoop-3.1.3]$ hadoop fs -ls /sanguo
#-cat:显示文件内容 
[henry@hadoop102 hadoop-3.1.3]$ hadoop fs -cat /sanguo/shuguo.txt
#-chgrp、-chmod、-chown:Linux 文件系统中的用法一样,修改文件所属权限 
[henry@hadoop102 hadoop-3.1.3]$ hadoop fs -chmod 777 /sanguo/shuguo.txt 
[henry@hadoop102 hadoop-3.1.3]$ hadoop fs -chown henry:henry /sanguo/shuguo.txt 
#-mkdir:创建路径 
[henry@hadoop102 hadoop-3.1.3]$ hadoop fs -mkdir /jinguo
#-cp:从HDFS 的一个路径拷贝到HDFS 的另一个路径 
[henry@hadoop102 hadoop-3.1.3]$ hadoop fs -cp /sanguo/shuguo.txt /jinguo
#-mv:在HDFS 目录中移动文件 
[henry@hadoop102 hadoop-3.1.3]$ hadoop fs -mv /sanguo/wuguo.txt /jinguo 
[henry@hadoop102 hadoop-3.1.3]$ hadoop fs -mv /sanguo/weiguo.txt /jinguo
#-tail:显示一个文件的末尾 1kb 的数据 
[henry@hadoop102 hadoop-3.1.3]$ hadoop fs -tail /jinguo/shuguo.txt
#-rm:删除文件或文件夹 
[henry@hadoop102 hadoop-3.1.3]$ hadoop fs -rm /sanguo/shuguo.txt
#-rm -r:递归删除目录及目录里面内容 
[henry@hadoop102 hadoop-3.1.3]$ hadoop fs -rm -r /sanguo
#-du 统计文件夹的大小信息
[henry@hadoop102 hadoop-3.1.3]$ hadoop fs -du -s -h /jinguo 
[henry@hadoop102 hadoop-3.1.3]$ hadoop fs -du -h /jinguo 
#-setrep:设置HDFS 中文件的副本数量
2.3 HDFS 的 API 操作

见尚硅谷大数据技术止hadoop 3.x

2.4 HDFS的读写流程(面试重点)

2.4.1写数据流程

(1)客户端通过Distributed FileSystem模块向NameNode 请求上传文件,NameNode 检查目标文件是否已存在,父目录是否存在。

(2)NameNode 返回是否可以上传。

(3)客户端请求第一个 Block 上传到哪几个DataNode 服务器上。

(4)NameNode 返回 3 个DataNode 节点,分别为 dn1、dn2、dn3。

(5)客户端通过 FSDataOutputStream 模块请求 dn1 上传数据,dn1 收到请求会继续调用dn2,然后 dn2 调用 dn3,将这个通信管道建立完成。

(6)dn1、dn2、dn3 逐级应答客户端。

(7)客户端开始往 dn1 上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以 Packet 为单位,dn1 收到一个 Packet 就会传给 dn2,dn2 传给 dn3;dn1 每传一个 packet 会放入一个应答队列等待应答。

(8)当一个Block 传输完成之后,客户端再次请求NameNode 上传第二个Block 的服务 器。(重复执行 3-7 步)。

2.4.2读数据流程

(1)客户端通过 DistributedFileSystem 向 NameNode 请求下载文件,NameNode 通过查 询元数据,找到文件块所在的DataNode 地址。

(2)挑选一台DataNode(就近原则,然后随机)服务器,请求读取数据。

(3)DataNode 开始传输数据给客户端(从磁盘里面读取数据输入流,以 Packet 为单位 来做校验)。

(4)客户端以 Packet 为单位接收,先在本地缓存,然后写入目标文件。

2.5 NameNode 和 SecondaryNameNode(了解)

1)NameNode 元数据需要存放在内存中——保证访问效率

2)磁盘中备份元数据的 FsImage——防止断电内存元数据丢失

3)引入 Edits 文件——只进行追加操作,效率很高

问题:元数据更新时,更新 FsImage效率过低;不更新FsImage则有一致性问题

解决:每当元数据有更新或者添加元数据时,修改内存中的元数据并追加到 Edits 中。这样,一旦NameNode 节点断电,可以通过 FsImage 和Edits 的合并,合成元数据。

4)引入一个新的节点SecondaryNamenode——专门用于 FsImage 和 Edits 的合并(定期执行,避免文件数据过大,效率降低)

2.6 DataNode(了解)

一个数据块在DataNode 上以文件形式存储在磁盘上

3 MapReduce 3.1 MapReduce概述

优点

1)易于编程——简单实现一些接口,就可以完成一个分布式程序。

2)良好的扩展性——当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。 3)高容错性——其中一台机器挂了,Hadoop内部自动把上面的计算任务转移到另外一个节点上运行, 不至于这个任务运行失败

4)适合 PB级以上海量数据的离线处理——可以实现上千台服务器集群并发工作,提供数据处理能力。

缺点

1)不擅长实时计算——无法像MySQL 一样,在毫秒或者秒级内返回结果。 2)不擅长流式计算——MapReduce 的输入数据集是静态的,不能动态变化。 3)不擅长DAG(有向无环图)计算——多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce 并不是不能做,而是使用后,每个 MapReduce 作业的输出结果都会写入到磁盘, 会造成大量的磁盘 IO,导致性能非常的低下。

核心思想

  

一个完整的MapReduce 程序在分布式运行时有三类实例进程:

(1)MrAppMaster:负责整个程序的过程调度及状态协调。 (2)MapTask:负责Map 阶段的整个数据处理流程。

(3)ReduceTask:负责Reduce 阶段的整个数据处理流程。

3.2 Hadoop 序列化

见尚硅谷大数据技术止hadoop 3.x

3.3 MapReduce 框架原理(面试重点)

3.3.1 InputFormat 数据输入

Job 提交流程源码和切片源码详解

1)Job 提交流程源码详解

waitForCompletion()
submit();
// 1 建立连接
   connect();
      // 1)创建提交 Job 的代理
      new Cluster(getConfiguration());
      // (1)判断是本地运行环境还是 yarn 集群运行环境
      initialize(jobTrackAddr, conf); 
// 2 提交 job
   submitter.submitJobInternal(Job.this, cluster)
// 1)创建给集群提交数据的 Stag 路径
   Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 2)获取 jobid ,并创建 Job 路径
   JobID jobId = submitClient.getNewJobID();
// 3)拷贝 jar 包到集群
   copyAndConfigureFiles(job, submitJobDir);
   rUploader.uploadFiles(job, jobSubmitDir);
// 4)计算切片,生成切片规划文件
   writeSplits(job, submitJobDir);
      maps = writeNewSplits(job, jobSubmitDir);
      input.getSplits(job);
// 5)向 Stag 路径写 XML 配置文件
   writeConf(conf, submitJobFile);
   conf.writeXml(out);
// 6)提交 Job,返回提交状态
   status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials(),job.getCredentials());

 2)FileInputFormat 切片源码解析(input.getSplits(job))

CombineTextInputFormat 切片机制

框架默认的 TextInputFormat 切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个 MapTask,这样如果有大量小文件,就会产生大量的 MapTask,处理效率极其低下。

(1)虚拟存储过程:

将输入目录下所有文件大小,依次和设置的 setMaxInputSplitSize 值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍, 那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值 2 倍,此时 将文件均分成 2 个虚拟存储块(防止出现太小切片)。

例如 setMaxInputSplitSize 值为 4M,输入文件大小为 8.02M,则先逻辑上分成一个 4M。剩余的大小为 4.02M,如果按照 4M 逻辑划分,就会出现 0.02M 的小的虚拟存储 文件,所以将剩余的 4.02M 文件切分成(2.01M 和 2.01M)两个文件。

(2)切片过程:

(a)判断虚拟存储的文件大小是否大于 setMaxInputSplitSize 值,大于等于则单独 形成一个切片。

(b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。

(c)测试举例:有 4 个小文件大小分别为 1.7M、5.1M、3.4M 以及 6.8M 这四个小 文件,则虚拟存储之后形成 6 个文件块,大小分别为: 1.7M,(2.55M、2.55M),3.4M 以及(3.4M、3.4M)

最终会形成 3 个切片,大小分别为: (1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M


3.3.2 MapReduce 工作流程

map阶段是对每一个单个map中的多个溢写文件中的相同分区进行归并

reduce阶段是将所有单个map归并完的数据按照分区归并

3.3.3 Shuffle 机制

Map 方法之后,Reduce 方法之前的数据处理过程称之为 Shuffle

3.3.4 OutputFormat 数据输出


3.4 MapReduce 内核源码解析(提升重点)

3.4.1 MapTask工作机制

(1)Read 阶段:MapTask 通过 InputFormat 获得的 RecordReader,从输入 InputSplit 中 解析出一个个 key/value。

(2)Map 阶段:该节点主要是将解析出的 key/value 交给用户编写 map()函数处理,并 产生一系列新的 key/value。

(3)Collect 收集阶段:在用户编写 map()函数中,当数据处理完成后,一般会调用 OutputCollector.collect()输出结果。在该函数内部,它会将生成的 key/value 分区(调用 Partitioner),并写入一个环形内存缓冲区中。

(4)Spill 阶段:即“溢写”,当环形缓冲区满后,MapReduce 会将数据写到本地磁盘上, 生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

(5)Merge 阶段:当所有数据处理完后,MapTask 会将所有临时文件合并成一个大文件,并保存到文件 output/file.out 中,同时生成相应的索引文件 output/file.out.index。 在进行文件合并过程中,MapTask 以分区为单位进行合并。

对于某个分区,它将采用多轮递归合并的方式。每轮合并 mapreduce.task.io.sort.factor(默认 10)个文件,并将产生的文 件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

让每个 MapTask 最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量 小文件产生的随机读取带来的开销。

3.4.2 ReduceTask工作机制

(1)Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

(2)Sort 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照 MapReduce 语义,用户编写 reduce()函数输入数据是按 key 进行聚集的一组数据。为了将 key 相同的数据聚在一 起,Hadoop 采用了基于排序的策略。由于各个MapTask 已经实现对自己的处理结果进行了 局部排序,因此,ReduceTask 只需对所有数据进行一次归并排序即可。

(3)Reduce 阶段:reduce()函数将计算结果写到 HDFS 上。

ReduceTask 并行度决定机制

回顾:MapTask 并行度由切片个数决定,切片个数由输入文件和切片规则决定。

思考:ReduceTask 并行度由谁决定?

3.4.3 MapTask & ReduceTask 源码解析

 

3.5 Join应用

详见尚硅谷大数据技术止hadoop 3.x

3.6 数据清洗(ETL)

详见尚硅谷大数据技术止hadoop 3.x

3.8 MapReduce 开发总结

1)输入数据接口:InputFormat

(1)默认使用的实现类是:TextInputFormat

(2)TextInputFormat 的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为 key,行内容作为 value 返回。

(3)CombineTextInputFormat 可以把多个小文件合并成一个切片处理,提高处理效率。

2)逻辑处理接口:Mapper

用户根据业务需求实现其中三个方法:map() setup() cleanup ()

3)Partitioner 分区

(1)有默认实现 HashPartitioner,逻辑是根据 key 的哈希值和 numReduces 来返回一个 分区号;key.hashCode()&Integer.MAXVALUE % numReduces

(2)如果业务上有特别的需求,可以自定义分区。

4)Comparable 排序

(1)当我们用自定义的对象作为 key 来输出时,就必须要实现 WritableComparable 接 口,重写其中的 compareTo()方法。

(2)部分排序:对最终输出的每一个文件进行内部排序。

(3)全排序:对所有数据进行排序,通常只有一个 Reduce。

(4)二次排序:排序的条件有两个。

5)Combiner 合并

Combiner 合并可以提高程序执行效率,减少 IO 传输。但是使用时必须不能影响原有的 业务处理结果。

6)逻辑处理接口:Reducer

用户根据业务需求实现其中三个方法:reduce() setup() cleanup ()

7)输出数据接口:OutputFormat

(1)默认实现类是 TextOutputFormat,功能逻辑是:将每一个 KV 对,向目标文本文件 输出一行。

(2)用户还可以自定义 OutputFormat。

3.9 压缩

压缩的优点:以减少磁盘 IO、减少磁盘存储空间。

压缩的缺点:增加 CPU 开销。

压缩原则 (1)运算密集型的 Job,少用压缩 (2)IO 密集型的 Job,多用压缩

 

3.10 常见错误及解决方案

详见尚硅谷大数据技术止hadoop 3.x

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/279720.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号