Hadoop是一个由Apache基金会所开发的分布式系统基础架构,一个能够对大量数据进行分布式处理的软件框架; Hadoop以一种可靠、高效、可伸缩的方式进行数据处理;用户可以在不了解分布式底层细节的情况下,开发分布式程序。
关键词解析:
Apache基金会:Apache软件基金会(也就是Apache Software Foundation,简称为ASF),是专门为支持开源软件项目而办的一个非盈利性组织。在它所支持的Apache项目与子项目中,所发行的软件产品都遵循Apache许可证(Apache
License)。
1.2 hadoop的发展历程分布式系统:分布式系统是由一组通过网络进行通信、为了完成共同的任务而协调工作的计算机节点组成的系统。分布式系统的出现是为了用廉价的、普通的机器完成单个计算机无法完成的计算、存储任务。其目的是利用更多的机器,处理更多的数据。
Hadoop最早起源于Nutch。Nutch的设计目标是构建一个大型的全网搜索引擎,包括网页抓取、索引、查询等功能,但随着抓取网页数量的增加,遇到了严重的可扩展性问题——如何解决数十亿网页的存储和索引问题。
2003年、2004年谷歌发表的两篇论文为该问题提供了可行的解决方案。
——分布式文件系统(GFS),可用于处理海量网页的存储
——分布式计算框架MAPREDUCE,可用于处理海量网页的索引计算问题。
Nutch的开发人员完成了相应的开源实现HDFS和MAPREDUCE,并从Nutch中剥离成为独立项目HADOOP,到2008年1月,HADOOP成为Apache顶级项目(同年,cloudera公司成立),迎来了它的快速发展期。
狭义上来说,hadoop就是单独指代hadoop这个软件,
广义上来说,hadoop指代大数据的一个生态圈,包括很多其他的软件
Hadoop生态的图例:
对于Hadoop的发展起到至关重要的一个人我们不得不提,这个人就是Hadoop之父:
Hadoop三大发行版本:Apache、Cloudera、Hortonworks。
Apache版本最原始(最基础)的版本,对于入门学习最好。
Cloudera在大型互联网企业中用的较多。
Hortonworks文档较好。
Apache Hadoop
官网地址:http://hadoop.apache.org/releases.html
下载地址:https://archive.apache.org/dist/hadoop/common/
Cloudera Hadoop
官网地址:https://www.cloudera.com/downloads/cdh/5-10-0.html
下载地址:http://archive-primary.cloudera.com/cdh5/cdh/5/(1)2008年成立的Cloudera是最早将Hadoop商用的公司,为合作伙伴提供Hadoop的商用解决方案,主要是包括支持、咨询服务、培训。
(2)2009年Hadoop的创始人Doug Cutting也加盟Cloudera公司。Cloudera产品主要为CDH,ClouderaManager,Cloudera Support
(3)CDH是Cloudera的Hadoop发行版,完全开源,比Apache Hadoop在兼容性,安全性,稳定性上有所增强。
(4)Cloudera Manager是集群的软件分发及管理监控平台,可以在几个小时内部署好一个Hadoop集群,并对集群的节点及服务进行实时监控。Cloudera Support即是对Hadoop的技术支持。
(5)Cloudera的标价为每年每个节点4000美元。Cloudera开发并贡献了可实时处理大数据的Impala项目。
2.为什么要使用hadoop 2.1 hadoop的设计目的Hortonworks Hadoop
官网地址:https://hortonworks.com/products/data-center/hdp/
下载地址:https://hortonworks.com/downloads/#data-platform
(1)2011年成立的Hortonworks是雅虎与硅谷风投公司Benchmark Capital合资组建。(2)公司成立之初就吸纳了大约25名至30名专门研究Hadoop的雅虎工程师,上述工程师均在2005年开始协助雅虎开发Hadoop,贡献了Hadoop80%的代码。
(3)雅虎工程副总裁、雅虎Hadoop开发团队负责人Eric Baldeschwieler出任Hortonworks的首席执行官。
(4)Hortonworks的主打产品是Hortonworks Data Platform(HDP),也同样是100%开源的产品,HDP除常见的项目外还包括了Ambari,一款开源的安装和管理系统。
(5)HCatalog,一个元数据管理系统,HCatalog现已集成到Facebook开源的Hive中。Hortonworks的Stinger开创性的极大的优化了Hive项目。Hortonworks为入门提供了一个非常好的,易于使用的沙盒。
(6)Hortonworks开发了很多增强特性并提交至核心主干,这使得Apache Hadoop能够在包括Window Server和Windows Azure在内的Microsoft Windows平台上本地运行。定价以集群为基础,每10个节点每年为12500美元。
用户通过开发分布式程序,充分利用低廉价的硬件资源完成海量数据的存储和运算,不仅仅能够完成数据的存储和运算,还要能保证数据的安全性和可靠性
2.2 hadoop的优势1.高可靠性。因为它假设计算元素和存储会失败,因此它维护多个工作数据副本,确保能够针对失败的节点重新分布处理。
2.高扩展性。当存储hdp集群的存储能力和运算资源不足时,可以横向的扩展机器节点来达到扩容和增强运算能力
3.高效性。因为它以并行的方式工作,通过并行处理加快处理速度
4.高容错性。Hadoop能够自动保存数据的多个副本,当有存储数据的节点宕机以后, 会自动的复制副本维持集群中副本的个数 ,并且能够自动将失败的任务重新分配。
5.低成本。hadoop可以运行在廉价的机器上并行工作,达到高效,安全,效率于一身目的。
3.hadoop的核心知识点1.HDFS (Hadoop Distributed File System) 分布式文件系统:负责海量数据的存储和管理
2.MapReduce分布式运算系统:负责海量数据的运算
3.YARN分布式资源调度和任务监控平台
3.x没有太大的改变
4.什么是hdfs 4.1 hdfs的概念:HDFS分布式文件系统,全称为:Hadoop Distributed File System,首先这是一个文件系统,主要用于对文件的存储,通过和linux相似的目录树系统定位文件和目录的位置,其次,他是分布式的,解决海量数据的存储问题,HDFS系统统一管理,提供统一的操作目录,操作命令和API
4.2 为什么要用hdfs:因为随着数据量越来越大,一台机器已经不能满足当前数据的存储,如果使用多台计算机进行存储,虽然解决了数据的存储问题,但是后期的管理和维护成本比较高,因为我们不能精准的知道哪台机器上存储了什么样的数据,所以我们迫切的需要一个能够帮助我们管理多台机器上的文件的一套管理系统,这就是分布式文件系统的作用,而hdfs就是这样的一套管理系统,而且他也只是其中的一种.
5.hdfs的优缺点 5.1 优点:高容错性:HDFS将文件进行切块,然后存储在集群的不同机器中,默认每个物理切块存储3个副本,如果有一台机器出现宕机或者磁盘损坏导致存储的物理块丢失时,HDFS可以对其进行自动修复
高扩展性:当HDFS系统的存储空间不够时,我们只需要添加一台新的机器到当前集群中即可完成扩容,这就是我们所说的横向扩容,而集群的存储能力,是按照整个集群中的所有的机器的存储能力来计算的,这也就是我们所说的高扩容性
海量数据的存储能力:可以存储单个大文件,也可以存储海量的普通文件
5.2 缺点: 5.2.1 不适合低延时数据访问;比如毫秒级的来存储数据,这是不行的,它做不到。
它适合高吞吐率的场景,就是在某一时间内写入大量的数据。但是它在低延时的情况 下是不行的,比如毫秒级以内读取数据,这样它是很难做到的。
存储大量小文件的话,它会占用NameNode大量的内存来存储文件、目录和块信息。这样是不可取的,因为NameNode的内存总是有限的。
小文件存储的寻址时间会超过读取时间,它违反了HDFS的设计目标。
一个文件只能有一个写,不允许多个线程同时写。
仅支持数据 append(追加),不支持文件的随机修改。
HDFS默认采用的是主从架构,架构中有三个角色:一个叫NameNode,一个叫DataNode,还有一个叫secondaryNameNode
主从架构示例图:
所以我们在搭建hdfs架构时,需要一台NameNode,三台DataNode,一台SecondaryNameNode.
NameNode:主要负责存储文件的元数据,比如集群id,文件存储所在的目录名称,文件的副本数,以及每个文件被切割成块以后的块列表和块列表所在的DataNode
DataNode:主要负责存储数据,包含一整个文件或者某个文件切成的块数据,以及块数据的一些校验信息等
SecondaryNameNode:主要负责和NameNode的checkpoint机制(类似于备份机制)等
注意事项:
- 1. 大文件进行切片存储时,hadoop1.x版本默认切片大小是64MB,Hadoop2.x以后默认大小是128MB,也就是说当我们的一个文件进行存储时,如果小于128MB,会整个文件进行存储,如果大于128M,会将该文件切成块进行存储,如果想要修改切块大小,可以在安装hdfs时进行配置
- 2.每个文件默认副本是3个,切成的块文件也是如此
问题:为什么块大小为128M???
HDFS中的文件在物理上是分块存储(Block),块的大小可以通过配置参数(dfs.blocksize)来规定,默认为128M的原因,基于最佳传输损耗理论!不论对磁盘的文件进行读还是写,都需要先进行寻址!最佳传输损耗理论:在一次传输中,寻址时间占用总传输时间的1%时,本次传输的损耗最小,为最佳性价比传输!目前硬件的发展条件,普通磁盘写的速率大概为100M/S, 寻址时间一般为10ms!
10ms / 1% = 1s
1s * 100M/S=100M
因为我们的计算机的底层进制是2进制,所以设置100M对于计算机来说并不是最友好的,应该将其设置为2的次方值,对于100M来说,最近的就是128M,所以我们将其设置为128M
如果公司中磁盘传输效率显示为300M/s,那么我们在设置时,最好设置为256M,目前SATA硬盘一般传输效率为500M/S,所以具体设置多少,还需要根据公司中使用的磁盘来设定
问题:能不能将块设置的小一些?
理论上是可以的,但是如果设置的块大小过小,会占用大量的namenode的元数据空间,而且在读写操作时,加大了寻址时间,所以不建议设置的过小
问题:不能过小,那能不能过大?
不建议,因为设置的过大,传输时间会远远大于寻址时间,增加了网络资源的消耗,而且如果在读写的过程中出现故障,恢复起来也很麻烦,所以不建议
1.找出之前搭建好的集群机器中的某一台,比如doit01,2.上传文件到/opt/apps 目录下,如果没有该目录,可以先执行
cd /opt #切换到对应目录 mkdir apps #在opt目录下创建一个apps目录
然后再apps目录内部执行rz命令开始上传
如果输入rz命令不好用,证明没有下载过rz命令,可以输入
yum -y install lrzsz #rz命令是主机上传到虚拟机文件的命令 #sz是主机从虚拟机下载文件到本地的命令
3.对hadoop-3.1.1.tar.gz进行解压操作
tar -zxvf hadoop-3.1.1.tar.gz
4.查看解压后的目录
cd hadoop-3.1.1 ll
5.开始对hdfs进行配置,进入到etc目录下
cd etc #这里要注意 不要写成 cd /etc 这样是进入到根目录下的etc # 接着往里面进 cd hadoop
6.先配置hadoop的java依赖
vi hadoop-env.sh #在hadoop-env.sh最后一行添加以下环境信息 export JAVA_HOME=/opt/apps/jdk1.8.0_191
如果没有安装jdk的话,需要先去安装jdk
7.除了配置hadoop的默认java依赖以外,我们还需要一些扩展配置
hadoop是有一些默认的配置文件的,这些默认的配置文件在jar包中,如果如果相对hadoop的默认配置信息进行修改或者增加的话,可以在xx-site.xml文件中进行配置,这算是我们的一些扩展配置.
在当前目录中
vi hdfs-site.xml
将下面配置信息copy到文件中的标签中 dfs.namenode.rpc-address doit01:8020 dfs.namenode.name.dir /opt/hdpdata/name dfs.datanode.data.dir /opt/hdpdata/data dfs.namenode.secondary.http-address doit02:50090
这个时候hdfs的默认配置就基本配置完毕,但是我们只配置了一台机器,还有两台机器没有配置,但是重复上述过程太麻烦,所以我们可以使用一个远程复制的命令将hadoop的解压包复制给另外两台机器
#先切换到apps目录下 cd /opt/apps #然后执行 scp -r hadoop-3.1.1 $doit02:$PWD scp -r hadoop-3.1.1 $doit03:$PWD
以后我们还可能会经常给另外的机器复制文件,所以我们可以写一个脚本对该命令进行封装,然后需要分发什么直接运行该脚本即可
#在apps目录下创建一个专门存放各种脚本的目录 mkdir scriptDir #然后创建一个脚本文件 touch cluster_script.sh #然后编辑该脚本 vi cluster_script.sh #脚本内容: #!/bin/bash for hostname in doit02 doit03 do scp -r $1 $hostname:$PWD done
7.配置完毕以后,初始化NameNode
#进入到hadoop的bin目录里面 cd /opt/apps/hadoop-3.1.1/bin #然后执行命令 ./hadoop namenode -format
8.查看是否初始化成功,直接到/opt目录下查看是否有hdpdata目录被创建出来
9.接下来我们可以依次启动hdfs的角色,先启动namenode
进入到sbin目录下
cd /opt/apps/hadoop-3.1.1/sbin #启动namenode ./hadoop-daemon.sh start namenode #然后再启动datanode ./hadoop-daemon.sh start datanode #在进入不同的机器里面执行启动datanode的命令
10.查看每台机器上是否已经有了对应的服务进程,可以通过一个命令jps,专门查看java服务进程,查看显示有namenode或者datanode证明已经启动成功,这个时候,集群就已经搭建完毕了
11.hadoop还给我们提供了一个web端的监控页面,用于专门查看hdfs文件系统,路径为:
http://ipaddress:9870/ 比如:http://doit01:9870/
ipaddress 是你的namenode的主机名,需要配置主机名的映射
我们还能监控到datanode的启动情况,也就是查看有多少个datanode节点在运行
如果进不去这个页面,有两种原因导致:
1.没有配置主机名映射,需要在windows主机内部配置域名映射
配置主机名映射 要找到windows主机里面的hosts文件
有些电脑默认是不能修改host文件的,所以我们可以将host文件复制到桌面修改完毕之后再粘贴回来,添加内容如下,直接添加到文件末尾即可:
192.168.79.101 doit01 192.168.79.102 doit02 192.168.79.103 doit03
2.没有关闭虚拟机的防火墙,或者namenode没有开启,没有开启namenode的情况下,将namenode进行开启即可,没有关闭防火墙的情况下,我们需要手动关闭防火墙,并且禁用掉防火墙的自动开启功能
systemctl status firewalld #先查看防火墙状态 systemctl stop firewalld #关闭防火墙 systemctl start firewalld #开启防火墙 systemctl restart firewalld #重启防火墙 systemctl disable firewalld #关闭防火墙自启动 systemctl enable firewalld #开启防火墙自启动
想要彻底禁用防火墙,需要先执行关闭,再执行禁用
systemctl stop firewalld systemctl disable firewalld8.hdfs的一键启动和停止
通过上面我们的启动发现,我们需要更换不同的机器来启动对应的节点,但是,机器越多的情况下,操作起来难度越大,所以,我们接下来配置在一台机器里面,将整个hdfs集群启动起来的命令
进入到sbin目录里面
cd /opt/apps/hadoop-3.1.1/sbin/
有两个文件:
start-dfs.sh
stop-dfs.sh
这两个就是一键启停hdfs系统的命令,执行后发现只启动了namenode或者启动了同一台机器上的namenode和datanode,那是因为我们在执行start-dfs.sh这个脚本时,脚本内部需要加载所有的datanode节点信息,加载的这个文件叫做***workers***
***workers***默认在etc/hadoop/目录内,所以我们应该先去把所有的datanode节点的ip配置到workers文件里面,那就进入目录内
cd /opt/apps/hadoop-3.1.1/etc/hadoop
然后编辑此文件:
vi workers
将里面内容删除,然后换成如下代码:
doit01 doit02 doit03 #这里配置的还是主机名映射,让start-dfs.sh文件启动时自动加载workers文件中的内容, #获取到所有的datanode节点所在的机器,然后挨个启动
配置完毕以后,再次执行start-dfs.sh命令,查看是否能够启动成功,启动后,发现报错
ERROR: Attempting to operate on hdfs namenode as root ERROR: but there is no HDFS_NAMENODE_USER defined. Aborting operation. Starting datanodes ERROR: Attempting to operate on hdfs datanode as root ERROR: but there is no HDFS_DATANODE_USER defined. Aborting operation. Starting secondary namenodes [doit02] ERROR: Attempting to operate on hdfs secondarynamenode as root ERROR: but there is no HDFS_SECONDARYNAMENODE_USER defined. Aborting operation
这个错是因为我们启动hdfs时候需要一些root用户的权限,而我们的start-dfs.sh里面没有对应的权限,所以我们应该给改文件添加上对应的权限信息
vi start-dfs.sh vi stop-dfs.sh
然后在第二行插入
HDFS_DATANODE_USER=root HADOOP_SECURE_DN_USER=hdfs HDFS_NAMENODE_USER=root HDFS_SECONDARYNAMENODE_USER=root
保存退出
重新执行start-dfs.sh命令,这样就能够启动成功了
其他配置:
配置在任意位置启动集群:因为我们现在只能在sbin目录下启动集群,类似于我们安装完jdk以后只能在bin目录下使用javac命令,所以我们应该配置以下环境变量,保证我们不管在任何位置都能够启动集群
cd /etc vi profile #添加以下代码 export HADOOP_HOME=/opt/apps/hadoop-3.1.1/ export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin #保存退出 source profile
配置查看所有机器的hadoop进程的脚本文件
cd /opt/apps/scriptDir
#创建脚本文件
touch jps-all.sh
#编辑脚本文件
vi jps-all.sh
#脚本中的内容
#!/bin/bash
for host in doit01 doit02 doit03
do
echo "################$host进程状态###############"
ssh $host "source /etc/profile;jps;exit"
done
配置脚本文件任意位置运行
cd /etc/ #进入/etc目录 vi profile #编辑文件 添加以下代码 export SH_HOME=/opt/apps/scriptDir export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$SH_HOME #保存退出,然后执行 source profile
到此为止,所有的hdfs文件系统的安装和配置就搞定了,可以先去查看所有的机器的java进程情况,然后关闭对应的节点以后,再去使用start-dfs.sh命令一键启动集群
9.hdfs的shell客户端操作启动集群以后,我们不仅可以通过监控页面直接查看hdfs文件系统,而且还能够通过命令操作hdfs端,默认命令是:
hdfs dfs 参数选项
例子:
hdfs dfs -ls / #查看当前hdfs的根目录下的所有成员
但是通过观察结果我们发现,结果显示的是我们linux下的根目录,也就是说,该命令默认操作的是linux的根目录,但是我们需要操作的是hdfs,所以修改命令为:
hdfs dfs -ls hdfs://doit01:8020/
但是如果我们一直这么写,比较麻烦,所以我们可以到配置文件中对其进行修改,将默认地址改为hdfs的根目录即可,先去查看默认配置文件中的默认操作目录地址:
该路径在core-default.xml里面
这里是默认配置文件中的配置,我们可以在core-sit.xml中重新配置即可:
cd /opt/apps/hadoop-3.1.1/etc/hadoop vi core-site.xml #添加下面代码到中 fs.defaultFS hdfs://doit01:8020/
然后再次执行 hdfs dfs -ls /
发现操作的就已经是我们hdfs系统的根目录了
配置完毕之后,接下来我们来查看hdfs中的其他操作命令
标注以下常用命令:
Usage: hadoop fs [generic options] [-appendToFile... ] [-cat [-ignoreCrc] ...] #查看hdfs系统中的文件内容 [-checksum ...] [-chgrp [-R] GROUP PATH...] #更改用户组 [-chmod [-R] PATH...] # 更改权限 [-chown [-R] [OWNER][:[GROUP]] PATH...] # 更改所属用户 [-copyFromLocal [-f] [-p] [-l] [-d] [-t ] ... ] #上传 [-copyToLocal [-f] [-p] [-ignoreCrc] [-crc] ... ] #下载 [-count [-q] [-h] [-v] [-t [ ]] [-u] [-x] [-e] ...] #统计文件夹个数 文件个数 文件所在空间大小 [-cp [-f] [-p | -p[topax]] [-d] ... ] #复制 [-createSnapshot [ ]] [-deleteSnapshot ] [-df [-h] [ ...]] #查看集群的存储能力 [-du [-s] [-h] [-v] [-x] ...] [-expunge] [-find ... ...] #查找文件 [-get [-f] [-p] [-ignoreCrc] [-crc] ... ] #下载 [-getfacl [-R] ] [-getfattr [-R] {-n name | -d} [-e en] ] [-getmerge [-nl] [-skip-empty-file] ] [-head ] #查看文件前面几行内容 [-help [cmd ...]] [-ls [-C] [-d] [-h] [-q] [-R] [-t] [-S] [-r] [-u] [-e] [ ...]] #查看当前目录中的所有信息 [-mkdir [-p] ...] # 创建文件夹 [-moveFromLocal ... ] [-moveToLocal ] [-mv ... ] #移动 [-put [-f] [-p] [-l] [-d] ... ] #上传 [-renameSnapshot ] [-rm [-f] [-r|-R] [-skipTrash] [-safely] ...] #删除文件 [-rmdir [--ignore-fail-on-non-empty] ...] #删除目录 [-setfacl [-R] [{-b|-k} {-m|-x } ]|[--set ]] [-setfattr {-n name [-v value] | -x name} ] [-setrep [-R] [-w] ...] [-stat [format] ...] [-tail [-f] ] #查看文件后几行内容 [-test -[defsz] ] [-text [-ignoreCrc] ...] [-touchz ...] #创建文件 [-truncate [-w] ...] [-usage [cmd ...]]
以上命令,加注释的都尽量记住!!!
注意事项:
1.修改hdfs中的某个目录权限时,如果想一并修改其内容文件权限,可以使用 hdfs dfs -chmod -R 777 文件夹这个命令
2.删除hdfs系统中的根目录下的所有文件时,需要使用绝对路径 hdfs dfs -rm -r hdfs://doit01:8020
//加载配置信息
Configuration conf = new Configuration();
//更改配置信息
// conf.set("dfs.blocksize","256M");
// conf.set("dfs.replication","2");
//确定要操作的系统的地址
URI uri = new URI("hdfs://doit01:8020");
//创建文件系统对象 根据配置信息和uri
FileSystem fs = FileSystem.newInstance(uri, conf);
//创建上传文件的路径
Path src = new Path("D:\word.txt");
//创建接受文件的路径信息,这个路径是hdfs的根目录
Path dest = new Path("/");
//开始上传
fs.copyFromLocalFile(src, dest);
//关闭资源
fs.close();
}
}
注意:当我们同时在默认配置文件里面,site配置文件里面和代码里面配置了同一项内容时,优先级以代码中的配置最高,其次是site文件中,再其次是默认配置文件
我们可以从源码中获取一些配置文件的加载信息,来查看我们的配置文件加载的顺序:
1.在pom.xml文件中添加下面依赖:
org.apache.hadoop hadoop-hdfs 3.1.1 org.apache.hadoop hadoop-hdfs-client 3.1.1 provided
2.ctrl + n搜索Namenode类,找到hadoop相关jar包下的Namenode类,点击进去
3.ctrl + f 搜索main方法:
4.创建配置文件对象,接着点击该方法进下一层源码
5.按着ctrl键然后用鼠标左键点击该方法:
6.点击该类进入查看
7.进入该类,然后搜索core
观察发现,core也是在一个静态代码块中,所以,我们在加载hdfs配置文件之前,其实是先加载了core的配置文件的,由此推断出,我们的配置文件加载顺序是先default,然后site,这样的话,如果我们在site里面对default中的配置信息进行修改,那么就会覆盖掉default中的默认配置信息.而这些配置文件的加载只是在Namenode启动时加载的,那么在我们上传文件时,我们在代码里面再次设置一些配置,这个时候,就会再次覆盖掉site文件中的配置信息,以我们的配置信息为主,这就是为什么我们的配置信息有优先级顺序的原因
也可以通过debug调试查看相关的配置文件加载顺序:
package com.doit.hdp_demo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class DownloadFile {
public static void main(String[] args) throws URISyntaxException, IOException {
//加载配置信息
Configuration conf = new Configuration();
//确定要操作的系统的地址
URI uri = new URI("hdfs://doit01:8020");
//创建文件系统对象 根据配置信息和uri
FileSystem fs = FileSystem.newInstance(uri, conf);
//创建接受文件的路径信息,这个路径是hdfs的根目录
Path src = new Path("/word.txt");
//创建上传文件的路径
Path dest = new Path("D:\");
//开始上传
fs.copyToLocalFile(src, dest);
//关闭资源
fs.close();
}
}
注意事项:想要将hdfs上面的文件下载到本地目录中,需要在主机上配置hadoop环境
1.解压压缩包到本地目录,不要出现中文路径
2.配置环境变量
右键我的电脑选中属性选项:
3.进入新的界面后选择高级系统设置
4.再次进入页面后选择环境变量
5.然后选择新建
6.在新建框里面输入:
7.上述点击确定退出后,再次找到path变量,选中,然后点击编辑
8.然后按照序号步骤进行操作即可:
最后一路点击确定即可退出,配置完毕之后不要忘记重启idea,这样才能重新加载本地配置,如果想要查看是否配置成功,可以win+r快捷键输入cmd,然后输入hdfs命令查看效果,如果出现的是以下内容,那么就证明配置成功.
这样就能保证我们将hdfs系统里面的文件下载到本地磁盘上
package com.doit.hdp_demo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class RenameAndMoveDemo {
public static void main(String[] args) throws URISyntaxException, IOException {
//加载配置信息
Configuration conf = new Configuration();
//确定要操作的系统的地址
URI uri = new URI("hdfs://doit01:8020");
//创建文件系统对象 根据配置信息和uri
FileSystem fs = FileSystem.newInstance(uri, conf);
Path a = new Path("/a") ;
//目标的文件夹一定要存在才能移动
if(fs.exists(a)) { // 存在
if(fs.isDirectory(a)) { // 并且是一个文件夹
//移动
//fs.rename(new Path("/b.txt"), new Path("/a/b.txt")) ;
//重命名
fs.rename(new Path("/a/b.txt"), new Path("/a/c.txt")) ;
}
}
fs.close();
}
}
10.5.删除案例
package com.doit.hdp_demo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class DeleteFileDemo {
public static void main(String[] args) throws URISyntaxException, IOException {
//加载配置信息
Configuration conf = new Configuration();
//确定要操作的系统的地址
URI uri = new URI("hdfs://doit01:8020");
//创建文件系统对象 根据配置信息和uri
FileSystem fs = FileSystem.newInstance(uri, conf);
Path f = new Path("/a");
// 路径是否存在
if(fs.exists(f)) {
// 递归删除文件夹
fs.delete(f, true) ;
}else {
System.out.println("路径不存在");
}
fs.close();
}
}
10.6.创建文件夹
package com.doit.hdp_demo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class MkDirDemo {
public static void main(String[] args) throws URISyntaxException, IOException {
//加载配置信息
Configuration conf = new Configuration();
//确定要操作的系统的地址
URI uri = new URI("hdfs://doit01:8020");
//创建文件系统对象 根据配置信息和uri
FileSystem fs = FileSystem.newInstance(uri, conf);
fs.mkdirs(new Path("/a/b/c")) ;
fs.close();
}
}
10.7 判断是否是文件夹:
package com.doit.hdp_demo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class IsDirDemo {
public static void main(String[] args) throws URISyntaxException, IOException {
//加载配置信息
Configuration conf = new Configuration();
//确定要操作的系统的地址
URI uri = new URI("hdfs://doit01:8020");
//创建文件系统对象 根据配置信息和uri
FileSystem fs = FileSystem.newInstance(uri, conf);
// Path path = new Path("/a");
Path path = new Path("/b.txt");
// fs.isFile(arg0) // 是否是文件
boolean b = fs.isDirectory(path); // 是否是文件夹
if (b) {
System.out.println("文件夹");
} else {
System.out.println("文件");
}
fs.close();
}
}
10.8 遍历文件夹
package com.doit.hdp02;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.net.URI;
public class HdfsDemo1 {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//创建文件对象
FileSystem fs = FileSystem.newInstance(new URI("hdfs://doit01:8020"), conf, "root");
//查看HDFS的根目录中的内容
FileStatus[] fst = fs.listStatus(new Path("/"));
for (FileStatus fileStatus : fst) {
// fileStatus 路径下的文件或者文件夹
if (fileStatus.isDirectory()) {
//文件夹
Path path = fileStatus.getPath();
System.out.println(path);
// 遍历文件夹下的文件 --> 文件性质的操作 获取元信息
//listFiles() ;
} else {
// 文件 获取元信息 路径 名字 大小 .....
fileStatus.getPath();
// 获取文件名
fileStatus.getPath().getName();
// 父级路径
fileStatus.getPath().getParent();
// 最后访问时间
fileStatus.getAccessTime();
// 最后修改时间
fileStatus.getModificationTime();
// 副本的葛素
fileStatus.getReplication();
// 块大小
fileStatus.getBlockSize();
//大小
fileStatus.getLen();
}
}
fs.close();
}
}
10.9 遍历文件,获取元数据信息
package com.doit.hdp02;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.net.URI;
import java.util.Arrays;
public class HdfsDemo2 {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//创建文件对象
FileSystem fs = FileSystem.newInstance(new URI("hdfs://doit01:8020"), conf, "root");
//查看HDFS的根目录中的内容
//第一个参数是我们的HDFS 需要操作的目录
//第二个参数是是否递归获取该目录中的文件 如果为true 就递归获取 如果为false 就不递归
RemoteIterator fr = fs.listFiles(new Path("/"), false);
while(fr.hasNext()){
LocatedFileStatus ls = fr.next();
String name = ls.getPath().getName();
if (name.equals("hadoop")) {
BlockLocation[] bt = ls.getBlockLocations();
System.out.println(Arrays.toString(bt));
long length = bt[2].getLength(); //获取每一块数据块的大小
String[] names = bt[0].getNames(); // 获取ip地址加端口号
String[] hosts = bt[0].getHosts(); // 存储该物理块的所有的节点的主机名
String[] ids = bt[0].getStorageIds();
StorageType[] st = bt[0].getStorageTypes();
System.out.println(Arrays.toString(st));
//第一个是偏移量 第二个是块的大小 后面的三个参数是分别存储于哪个节点
//[0,134217728,doit02,doit03,doit01, 134217728,134217728,doit02,doit03,doit01, 268435456,66123926,doit02,doit03,doit01]
}
}
}
}
10.10 读数据
随机读取数据
seek()方法 字节流
skip() 方法 字符流
public class ReadDataFromHdfsFile {
public static void main(String[] args) throws Exception {
FileSystem fs = HDFSUtils.getFs();
Path path = new Path("/a.txt") ;
if(fs.exists(path)) {
// 获取一个文件的输入流
FSDataInputStream fis = fs.open(path);
BufferedReader br = new BufferedReader(new InputStreamReader(fis));
// 跳过指定的位置读取
//注意 windows中的文本换行是rn linux中是n 实际每行的长度是 length+2 / +1
br.skip(4) ; //okn // okrn
String line = br.readLine();
System.out.println(line);
}
fs.close();
}
}
10.11 写数据
对随机的写是不支持的 , 不建议随机写 文件内容不允许修改 不建议修改文件内容和直接写数据到hdfs中
public class WriteDataToHdfsFile {
public static void main(String[] args) throws Exception {
FileSystem fs = HDFSUtils.getFs();
//fs.append(new Path("追加内容")) ;
FSDataOutputStream fos = fs.create(new Path("/xx.oo"));
// 默认是对个 fos的覆盖
fos.writeUTF("hello tom cat ");
fos.write("hello".getBytes());
fos.flush();
fos.close();
fs.close();
}
}
11.hdfs的内部原理机制
11.1.hdfs的写数据的流程
HDFS的写数据流程是hadoop中最复杂的流程之一
简图如下:
详图如下:
1.客户端生成一个DistributedFileSystem对象,然后请求namenode上传文件word.txt
2.namenode根据文件名从目录树中进行查询操作,查看改文件是否存在,已经客户端用户是否有权限在分布式系统中创建文件,然后返回响应状态,ok,可以上传
3.DistributedFileSystem请求上传第一部分文件,并且申请block块信息
4.namenode创建block块信息,包括blockID,BGS(block Generation Stamp),offset,len,replication,存储的datanode地址,并将信息返回给client
5.client接受到namenode的返回信息以后,创建一个FSOutputStream对象,先去架设连接管道,也就是我们说的Pipeline
6.管道架设好以后,会创建出来FSDataInputStream流将blk01中的内容读到一个buf中,buf大小为4096,也就是4KB,然后每次读满一个buf,就会flush一次,flush出去的数据,会写到chunk中,一个chunk大小为512b的数据内容加上4b的校验(checksum)内容,也就是516b,chunk是数据传输过程中的最小校验单位,然后再将chunk写入到packet中,packet是client和datanode进行数据传输的最小单位,一个packet大概是64KB,相当于126个chunk,也就是说一个packet里面保存了126个chunk,然后,但是可能装不满packet,64kb相当于是65536b,516*126大小等于65016,其实packet里面还包含了一些packet的头信息,也就是packet的一些默认值,所以最多包含了126个chunk
7.一个packet写好以后.将packet保存在dataqueue中,然后通过FSOutputStream将队列中的packet写出去,一旦队列中的数据被写出去,就会将队列中的数据移除到ackqueue中,这是一个应答队列,负责接受Pipleline返回来的应答机制
8.datanode接受到packet以后,会将数据序列到本地磁盘,然后继续将packet发送到下一台机器上,直到最后一台datanode保存完毕之后,然后返回应答信息(ack),packet的发送顺叙是上游到下游,ack的返回机制时下游到上游,然后由最上游做统计,返回给最终的客户端
9,客户端校验应答ack,发现没有问题,先将ackqueue中的packet移除,移除完毕之后,就会给namenode发送保存成功,然后接着发送下一块儿数据,直到全部发送完成,
10,如果在发送过程中,其中某一台datanode没有保存成功,那么这台datanode会自动退出Pipleline,然后client再检测ack时,发现应答数不对,会先停止发送packet,然后将出错的packet从ackqueue中移动到dataqueue,再次发送
11.再次发送之前,客户端将重新架设Pipleline,并且根据 fs.client.block.write.replace-datanode-on-failure.policy/enable 的设置决定是否寻找新的节点代替BadNode,客户端向NameNode申请新的BGS,这个BGS将在重新架设流水线成功后,成为副本和block的BGS。这样BadNode的副本的BGS就和还健在的DataNode,以及NameNode那边block的BGS相差,如果以后BadNode重启,重新向namenode进行副本汇报时,那么因为副本的版本(BGS是Replica的版本标识)过老,而被要求删除,就保证了badnode不会存储一个不完整的副本
数据处理的一些源代码:
DataXceiverServer.java里面的代码都是datanode的接受数据的代码,这里不再展示
应答异常的一些处理:
这里显示根据BadNode出现的时间段,选择不同的处理方式,1.创建阶段,2.传输阶段,3.关闭阶段,添加完新节点以后,会重新生成新的Pipleline,具体要不要添加新的节点
具体是否要添加节点,可以根据配置文件中的这个参数进行参考
如果不需要添加新的节点,那么就继续使用剩下的节点建立新的Pipleline进行数据传输
上面源码,不建议掌握,了解即可!!
从 pipeline setup 错误中恢复
1.在 pipeline 准备阶段发生错误,分两种情况:
新写文件:Client 重新请求 NameNode 分配 block 和 DataNodes,重新设置 pipeline。
追加文件:Client 从 pipeline 中移除出错的 DataNode,然后继续。
2.从 data streaming 错误中恢复
当 pipeline 中的某个 DataNode 检测到写入磁盘出错(可能是磁盘故障),它自动退出 pipeline,关闭相关的 TCP 连接。
当 Client 检测到 pipeline 有 DataNode 出错,先停止发送数据,并基于剩下正常的 DataNode 重新构建 pipeline 再继续发送数据。
Client 恢复发送数据后,从没有收到确认的 packet 开始重发,其中有些 packet 前面的 DataNode 可能已经收过了,则忽略存储过程直接传递到下游节点。
3.从 close 错误中恢复
到了 close 阶段才出错,实际数据已经全部写入了 DataNodes 中,所以影响很小了。Client 依然根据剩下正常的 DataNode 重建 pipeline,让剩下的 DataNode 继续完成 close 阶段需要做的工作。
以上就是 pipeline recovery 三个阶段的处理过程,这里还有点小小的细节可说。 当 pipeline 中一个 DataNode 挂了,Client 重建 pipeline 时是可以移除挂了的 DataNode,也可以使用新的 DataNode 来替换。这里有策略是可配置的,称为 DataNode Replacement Policy upon Failure,包括下面几种情况:
NEVER:从不替换,针对 Client 的行为
DISABLE:禁止替换,DataNode 服务端抛出异常,表现行为类似 Client 的 NEVER 策略
DEFAULT:默认根据副本数要求来决定,简单来说若配置的副本数为 3,如果坏了 2 个 DataNode,则会替换,否则不替换
ALWAYS:总是替换
DN向NN汇报当前解读信息的时间间隔,默认6小时;
DN扫描自己节点块信息列表的时间,默认6小时
首先我们要考虑一个问题:NameNode的元数据到底存储在什么位置?
假设:
1.存储在磁盘上,由于用户需要经常访问我们的hdfs做一些增删改查操作,所以,对于我们来说,存储在磁盘上并不是特别好的办法,因为操作越多,磁盘的IO越多,就会造成响应速度慢,效率低下,这明显不是我们需要的
2.存储在内存中,交互速度快,外界的增删改查我们都能即使更新自己的元数据,但是一旦系统断电,那么我们的元数据就会从内存中消失,导致我们的集群没法工作,所以元数据不能单纯的存放在内存中.
所以我们可以使用第三种方式,就是元数据一部分在内存中和客户端做交互,然后为了元数据的安全性,我们还会每隔一段时间对将元数据序列化到磁盘上,那么新的问题产生了,多久序列化一次比较好,时间过长,容易造成序列化文件和内存中的元数据不一致,时间过短,又会造成磁盘和内存的大量IO,还是会影响我们的效率和性能,所以,在这个过程中,我们又加入一个新的日志文件,这个日志文件专门用于记录两次序列化之间的操作信息,这样即使两次序列化之间出现了断电,造成内存中的元数据丢失,但是我们可以通过日志文件和磁盘的元数据文件反序列化到内存中,依然能恢复到断电前的状态,日志文件时不断被追加写入的,所以如果一直追加写入,内容只会越来越多,所以,我们实际的日志文件是有非常多的,没写够一定的数量,就会被替换成一个新的日志文件
NameNode启动
(1)第一次启动NameNode格式化后,创建Fsimage(元数据的镜像文件)和Edits(日志)文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
(2)客户端对元数据进行增删改的请求。
(3)NameNode记录操作日志,更新滚动日志。
(4)NameNode在内存中对元数据进行增删改。
Secondary NameNode工作
(1)Secondary NameNode询问NameNode是否需要CheckPoint。直接带回NameNode是否检查结果。
(2)Secondary NameNode请求执行CheckPoint。
(3)NameNode滚动正在写的Edits日志。
(4)将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode。
(5)Secondary NameNode加载编辑日志和镜像文件到内存,并合并。
(6)生成新的镜像文件fsimage.chkpoint。
(7)拷贝fsimage.chkpoint到NameNode。
(8)NameNode将fsimage.chkpoint重新命名成fsimage。
每次NameNode启动,都会先加载fsimage进入内存,然后再将日志中的操作加载过来,保证元数据是最新的
checkpoint的时间设置:
要么一小时更新一次,要么当edits日志操作记录达到100W执行一次
secondaryNameNode多久询问一次是否执行checkpoint?? 一分钟询问一次
如果你想查看镜像文件和日志文件中的内容,直接查看是乱码,因为里面都是一些序列化的内容,可以通过两个命令进行查看:
oiv:查看镜像文件
oev:查看日志文件
命令格式:
# 以XML的形式查看镜像文件和日志文件 hdfs oiv -p XML -i fsimage_0000000000000000000 -o ./fsimage.xml hdfs oev -p XML -i edits_0000000000000006828-0000000000000006829 -o ./fsimage.xml
上图就是变成xml文件后的日志文件



