1.client通过 Distributed FileSystem 模块向NameNode请求上传文件,NameNode会检查目标文件是否存在,路径是否正确,用户是否有权限。 2.NameNode向client返回是否可以上传,同时返回三个离client近的DataNode节点,记为DN1/DN2/DN3。 3.client通过DFSOutPutStream进行数据切割。 4.使用chunk校验信息(512bytes校验信息+4bytes校验头)加Data数据信息组成一个64KB的packet数据块(二进制)。 5.以packet为单位传输文件,加载到data queue中。 6.DN1接受到packet后,会有两个数据流向,一是:DN1-ack queue,二是DN1-DN2-DN3。 7.DN1和DN2/DN3之间建立pipeline进行文件传输。 8.文件传输完后,DN3,DN2会向DN1回传成功标识。 9.DN1根据DN2/DN3回传标识决定从 ack queue 中删除此块还是将此块追加到 data queue 末尾。2.HDFS读流程
1.client通过 Distributed FileSystem 模块向NameNode请求下载文件,NameNode会检查目标文件是否存在,路径是否正确,用户是否有权限。 2.NameNode向client返回这个文件有几个块,分别存放在哪些DataNode上。 3.client通过FSDataInputStream实例化DFSInputStream,并遵从就近原则按顺序从DataNode节点上读取数据块的packet。 4.client以packet为单位接收,先在本地缓存,然后写入目标文件,最终整合成block。3.MapReduce工作流程
1. 获取待处理文本路径,作业路径,探查文本详情,生成计划配置文件。 2. Client向RM提交作业,RM将任务分配给NM1,NM1生成MrAppMaster。 3. MrAppMaster从路径下载资源到本地,并申请运行资源NM2/NM3,将文件切分连同相关资源传输给NM2/NM3。 4. NM2/NM3生成mapTask任务,使用RecordReader读数据(默认使用TextInputFormat)。 5. RecordReader将数据切分成K-V形式的数据给mapper。 6. mapper计算完将数据输出给OutputCollectre。 7. OutputCollectre将数据写入环形缓冲区,环形缓冲区默认100M,从赤道开始双向写数据和索引,写到80%后反向写 8. 环形缓冲区内对数据按key值分区排序(快排) 9. 当达到环形缓冲区达到阈值后进行溢写,merge 归并同key分区并溢写到磁盘上 10. reduceTask拉取分区数据到本地磁盘 11. reduceTask合并文件,归并排序 12. 通过OutPutformat输出到指定位置结果。默认TextOutputFormat4.Yarn的job提交流程
1. client向RM申请一个application 2. RM返回application的资源提交路径和ID 3. client提交作业运行所需资源到application路径 4. 资源提交完毕后client向RM申请运行APPmaster 5. 将用户的请求初始化为一个Task,并放入调度器中 6. RM通知一个空闲的NM来领取Task任务 7. 该NM创建容器container,并产生一个MrAppMaster 8. 该NM将job资源下载到本地 9. MrAppMaster向RM申请运行多个MapTask的任务资源 10. RM将运行MapTask的任务分配给其他NM 11. MrAppMaster向负责MapTask运行的NM发送程序启动脚本 12. MrAppMaster等待MapTask运行完毕后向RM申请运行ReduceTask的资源 13. ReduceTask拉去MapTask相应分区的数据,进行reduce操作 14. 程序运行完毕后,MrAppMaster会向RM申请注销自己5.Yarn调度器
yarn有三种调度器:
FIFO调度器:先进先出,单队列,同一时间队列中只有一个任务在执行。容量调度器:默认调度器,多队列,同一时间队列中只有一个任务在执行。公平调度器:多队列,每个队列按照内部缺额大小分配资源启动任务,同一时间队列有多个任务执行,队列的并行度大于队列的个数。
缺额:每个job理想情况下获取的计算资源与实际获得的计算资源存在的差距。缺额越大,优先级越高。 调优 1. HDFS 小文件归档
hdfs会将每个小文件的元数据信息保存在内存中,一个小文件占用内存150KB空间。一般NameNode节点使用128GB内存节点,一个这样的内存节点只能存储9.1亿个小文件,所以需要对小文件进行归档,降低内存元数据大小。
HAR归档
- 归档 hadoop archive -archiveName dataName.har -p /tmp/data/input /tmp/data/output - 解压 hadoop fs -cp har:///tmp/data/output/dataName.har/* /tmp/data/input - 注意事项 归档后,源文件需要手动删除
采用CombineTextInputFormat
job.setInputFormatClass(CombineTextInputFormat.class) CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
JVM重用
2.NameNode心跳并发配置mapreduce.job.jvm.numtasks 10 (增大这里,一般10-20之间)How many tasks to run per jvm,if set to -1 ,there is no limit
公式: 20*ln集群规模3.纠删码原理
纠删码是为了改变HDFS存副本造成的存储成本指数级上升的问题。
有一个文件300m,如果使用HDFS2进行存储,假设存为3副本,需要耗费900m空间。
使用HDFS3进行存储,假设存为3副本,只需要耗费500m空间(将300m文件分三份,一份100m,再增加两个校验单元GT,一个100m),而且就算数据丢失两份,也可以通过剩余的反向计算出来。
4.冷热数据分离HDFS存储类型:
| 存储类型 | 存储方式 |
|---|---|
| RAM_DISK | 内存镜像文件系统,存放在内存中 |
| SSD | 固态硬盘 |
| DISK | 普通磁盘,默认存储类型 |
| ARCHIVE | 没有特指哪种存储介质,主要用于归档 |
HDFS存储策略:
| 策略ID | 策略名称 | 副本分布 | 解释 |
|---|---|---|---|
| 15 | Lazy_Persist | RAM_DISK:1,DISK:n-1 | 一个副本保存在内存中,其他保存在磁盘中 |
| 12 | All_SSD | SSD:n | 所有副本保存在SSD中 |
| 10 | One_SSD | SSD:n,DISK:n-1 | 一个副本保存在SSD中,其他保存在磁盘中 |
| 7 | Hot(default) | DISK:n | 所有副本保存在磁盘中 |
| 5 | Warm | DISK:1,ARCHIVE:n-1 | 一个副本保存在磁盘中,其他进行归档 |
| 2 | Cold | ARCHIVE:n | 所有副本都归档 |
常用命令:
# 查看当前有哪些存储策略可以用 hdfs storagepolicies -listPolicies # 为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePolicy -path xxx -policy xxx # 获取指定路径(数据存储目录或文件)的存储策略 hdfs storagepolicies -getStoragePolicy -path xxx # 取消存储策略;执行改命令之后该目录或者文件,以其上级的目录为准,如果是根目录,那么就是HOT hdfs storagepolicies -unsetStoragePolicy -path xxx # 查看文件块的分布 bin/hdfs fsck xxx -files -blocks -locations # 查看集群节点 hadoop dfsadmin -report # 常用配置 hdfs-site.xml 配置目录的存储类型5.HDFS负载均衡 节点间负载均衡dfs.datanode.data.dir [SSD]file:///opt/module/hadoop-3.1.4/hdfsdata/ssd,[DISK]file:///opt/module/hadoop-3.1.4/hdfsdata/disk
开启数据均衡命令:
start-balancer.sh -threshold 10
对于参数 10,代表的是集群中各个节点的磁盘空间利用率相差不超过 10%,可根据实际情况进行调整。
停止数据均衡命令:
stop-balancer.sh磁盘间负载均衡
(1)生成均衡计划(我们只有一块磁盘,不会生成计划)
hdfs diskbalancer -plan 主机名
(2)执行均衡计划
hdfs diskbalancer -execute 主机名.plan.json
(3)查看当前均衡任务的执行情况
hdfs diskbalancer -query 主机名
(4)取消均衡任务
hdfs diskbalancer -cancel 主机名.plan.json6.HDFS基准测试
(1)测试HDFS写性能
// 测试内容:向HDFS集群写10个128M的文件 hadoop jar /opt/module/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3- tests.jar TestDFSIO -write -nrFiles 10 -fileSize 128MB
(2)测试HDFS读性能
// 测试内容:读取HDFS集群10个128M的文件 hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3- tests.jar TestDFSIO -read -nrFiles 10 -fileSize 128MB
(3)删除测试生成数据
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -clean
(4)使用Sort程序评测MR
// 使用RandomWriter来产生随机数,每个节点产生10个Map任务,每个Map产生大约1G大小的二进制随机数 hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar randomwriter random-dat // 执行Sort程序排序 hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar sort random-data sorted-data // 验证数据是否真的排序好了 hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar testmapredsort -sortInput random-data -sortOutput sorted-data7.Hadoop-lzo压缩
编译jar包
1.安装maven 2.编译环境准备 yum -y install gcc-c++ lzo-devel zlib-devel autoconf automake libtool 3.编译 # 下载lzo wget http://www.oberhumer.com/opensource/lzo/download/lzo-2.10.tar.gz tar -zxvf lzo-2.10.tar.gz cd lzo-2.10 ./configure -prefix=/usr/local/hadoop/lzo/ make make install 4.编译Hadoop-Lzo git clone https://github.com/twitter/hadoop-lzo/archive/master.zip unzip master.zip cd hadoop-lzo-master # 修改hadoop版本号 vi pom.xml3.1.3 # 声明两个临时环境变量 export C_INCLUDE_PATH=/usr/local/hadoop/lzo/include export LIBRARY_PATH=/usr/local/hadoop/lzo/lib # 编译 mvn package -Dmaven.test.skip=true cd target # 这里是编译成功的hadoop-lzo.jar包
配置core-site.xml
cp hadoop-lzo-0.4.20.jar hadoop3/share/hadoop/common/ vi /hadoop3/etc/hadoop/core-site.xml同步jar包和配置到其他节点 io.compression.codecs org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.BZip2Codec, org.apache.hadoop.io.compress.SnappyCodec, com.hadoop.compression.lzo.LzoCodec, com.hadoop.compression.lzo.LzopCodec io.compression.codec.lzo.class com.hadoop.compression.lzo.LzoCodec
配置Lzo索引
Lzo压缩文件的切片依赖于索引,我们需要为Lzo压缩文件创建索引,如果没有索引,Lzo文件的切片只有一个
hadoop jar /opt/module/hadoop3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /input/bigtable.lzo
# 减少溢写的次数 mapreduce.task.io.sort.mb shuffle的环形缓冲区大小,可以适当提高 mapreduce.task.io.sort.spill.percent shuffle的环形缓冲区溢写阈值,可适当提高 # 增加每次merge合并数量 mapreduce.task.io.sort.factor 降低merge合并次数 # 调整MapTask内存、堆栈和CPU核数 mapreduce.map.memory.mb 可以按照128M数据增加1G内存进行调控 mapreduce.map.java.opts 增加MapTask堆内存大小 mapreduce.map.cpu.vcores 增加MapTask的CPU核数 # 增加ReduceTask拉取数据的并行数 mapreduce.reduce.shuffle.parallelcopiees 可以适当提高 # 增加Buffer占用ReduceTask的内存比例 mapreduce.reduce.shuffle.input.buffer.percent 默认值0.7,可以适当提高 # 调整Buffer中数据达到多大比例写入磁盘 mapreduce.reduce.shuffle.merge.percent 默认值0.66,可以适当提高 # 调整ReduceTask内存、堆栈和CPU核数 mapreduce.reduce.memory.mb 可以按照128M数据增加1G内存进行调控 mapreduce.reduce.java.opts 增加ReduceTask堆内存大小 mapreduce.reduce.cpu.vcores 增加ReduceTask的CPU核数 # 调整ReduceTask启动时刻 mapreduce.job.reduce.slowstart.completedmaps 默认当MapTask完成50%时才会启动ReduceTask # 设置作业超时时间 mapreduce.task.timeout 默认10分钟,Task无响应则杀死此任务9.Yarn优化
# 配置调度器,默认容量调度器 yarn.resourcemanager.scheduler.class # ResourceManager处理调度器请求的线程数量,默认50 yarn.resourcemanager.scheduler.client.thread-count # 是否让yarn自己检测硬件进行配置,默认false yarn.nodemanager.resource.detect-hardware-capabilities # 是否将虚拟核数当作CPU核数,默认false yarn.nodemanager.resource.count-logical-processors-as-cores # 虚拟核数和物理核数乘数,例如:4核8线程,该参数就应设为2,默认1.0 yarn.nodemanager.resource.pcores-vcores-multiplier # NodeManager使用内存,默认8G yarn.nodemanager.resource.memory-mb # NodeManager为系统保留多少内存 yarn.nodemanager.resource.system-reserved-memory-mb # NodeManager使用CPU核数,默认8个 yarn.nodemanager.resource.cpu-vcores # 是否开启物理内存检查限制container,默认打开 yarn.nodemanager.pmem-check-enabled # 是否开启虚拟内存检查限制container,默认打开 yarn.nodemanager.vmem-check-enabled # 虚拟内存物理内存比例,默认2.1 yarn.nodemanager.vmem-pmem-ratio # 容器最最小内存,默认1G yarn.scheduler.minimum-allocation-mb # 容器最最大内存,默认8G yarn.scheduler.maximum-allocation-mb # 容器最小CPU核数,默认1个 yarn.scheduler.minimum-allocation-vcores # 容器最大CPU核数,默认4个 yarn.scheduler.maximum-allocation-vcores



