架构hdfs分布式文件系统
优势:数据存储量大,可扩展,硬件要求不高,数据容错高(副本机制),适合大数据量批处理
缺点:不适合低延时的数据访问,不可以修改数据(一次写入,多次读取),不适合大量小文件(可以通过压缩合并解决)
NameNode
功能:管理hdfs的名称空间,配置副本策略,管理数据块映射信息,处理客户端读写请求 fsimage:元数据镜像文件
edits:元数据操作日志 可通过zoookeeper实现HA多个namenode容灾切换
secondary NameNode
功能:定时合并edits,Namenode故障时可通过fsimage文件来恢复,并不是NameNode的热备
DataNode
功能:物理存储数据,执行数据块的读写操作 数据文件的物理存储,1.x 64M,2.x 3.x 128M, 根据硬盘传输效率来定
1s*100mb.s
Client
HDFS读写流程功能:与namenode交互获取位置信息,与datanode交互读写数据,命令访问hdfs,上传文件切分
写流程:
(1)客户端通过 Distributed FileSystem 模块向 NameNode 请求上传文件,NameNode 检查目标文件是否已存在,父目录是否存在。 (2)NameNode 返回是否可以上传。 (3)客户端请求第一个 Block 上传到哪几个 DataNode 服务器上。 (4)NameNode 返回 3 个 DataNode 节点,分别为 dn1、dn2、dn3。 (5)客户端通过 FSDataOutputStream 模块请求 dn1 上传数据,dn1 收到请求会继续调用 dn2,然后 dn2 调用 dn3,将这个通信管道建立完成。 (6)dn1、dn2、dn3 逐级应答客户端。 (7)客户端开始往 dn1 上传第一个 Block(先从磁盘读取数据放到一个本地内存缓存), 以 Packet 为单位,dn1 收到一个 Packet 就会传给 dn2,dn2 传给 dn3;dn1 每传一个 packet会放入一个应答队列等待应答。 (8)当一个 Block 传输完成之后,客户端再次请求 NameNode 上传第二个 Block 的服务 器。(重复执行 3-7 步)。
读流程:
(1)客户端通过 DistributedFileSystem 向 NameNode 请求下载文件,NameNode 通过查询元数据,找到文件块所在的 DataNode 地址。 (2)挑选一台 DataNode(就近原则,然后随机)服务器,请求读取数据。 (3)DataNode 开始传输数据给客户端(从磁盘里面读取数据输入流,以 Packet 为单位来做校验)。 (4)客户端以 Packet 为单位接收,先在本地缓存,然后写入目标文件
。
shell操作hdfshdfs dfs 或者hadoop fs # 查看命令帮助 hadoop fs -help rm # 上传 ## 本地文件剪切上传 hadoop fs -moveFromLocal /root/test.txt /test01 ## 本地文件复制上传 hadoop fs -copyFromLocal /root/test.txt /test01 ## 本地文件复制上传(等同于 copyFromLocal) hadoop fs -put /root/test.txt /test01 ## 追加本地文件到hdfs文件 hadoop fs -appendToFile /root/append.txt /test01/test.txt # 下载 ## 下载hdfs文件到本地 hadoop fs -copyToLocal /test01/test.txt ./ ## 下载hdfs文件到本地(等同于copyToLocal ) hadoop fs -get /test01/test.txt ./ # HDFS 直接操作 ## 创建hdfs目录 hadoop fs -mkdir /test01 ## 查看根目录结构 hadoop fs -ls / ## 查看文件内容 hadoop fs -cat /test01/test.txt ## 修改权限 hadoop fs -chmod 666 /test01/test.txt ## 修改文件归属 hadoop fs -chown hadoop:hadoop /test01/test.txt ## 复制文件到指定目录 hadoop fs -cp /test01/test.txt /test02 ## 移动文件到指定目录 hadoop fs -mv /test01/test.txt /test02 ## 显示一个文件的末尾数据 hadoop fs -tail /test01/test.txt ## 删除文件或文件夹 (-r 递归删除) hadoop fs -rm -r /test02 ## 统计文件夹的大小信息 hadoop fs -du -s -h /test01 ## 设置 HDFS 中文件的副本数量(具体看DataNode节点数) hadoop fs -setrep 3 /test01/test.txt 其他命令自行百度或参考http://hadoop.apache.org/docs/r1.0.4/cn/hdfs_shell.htmljava操作hdfs
HdfsUtils.java
package com.wxl.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import java.net.URI;
public class HdfsUtils {
private static final Configuration conf;
static {
conf = new Configuration();
// 默认文件系统的名称
conf.set("fs.defaultFS", "hdfs://mycluster");
// namenode 集群的名字
conf.set("dfs.nameservices", "mycluster");
// mycluster 下有两个 NameNode,逻辑地址分别是 nn1,nn2
conf.set("dfs.ha.namenodes.mycluster", "nn1,nn2");
// nn1 的 http 通信地址
conf.set("dfs.namenode.rpc-address.mycluster.nn1", "192.168.100.100:9000");
// nn2 的 http 通信地址
conf.set("dfs.namenode.rpc-address.mycluster.nn2", "192.168.100.101:9000");
// 配置读取失败自动切换的实现方式
conf.set("dfs.client.failover.proxy.provider.mycluster",
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
conf.set("dfs.replication", "2");
System.setProperty("HADOOP_USER_NAME", "root");
}
public static LocalFileSystem getLocalFileSystem() throws Exception {
LocalFileSystem fileSystem = FileSystem.getLocal(conf);
return fileSystem;
}
public static FileSystem getFileSystem() throws Exception {
FileSystem fileSystem = FileSystem.get(conf);
return fileSystem;
}
public static FileSystem getFileSystemUser(String user) throws Exception {
FileSystem fileSystem = FileSystem.get(new URI("hdfs://mycluster"), conf, user);
return fileSystem;
}
public static void main(String[] args) throws Exception {
System.out.println(getFileSystemUser("root"));
}
}
HDFSApi.java
package com.wxl.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.*;
import java.net.URI;
import java.util.Arrays;
public class HDFSApi {
Configuration configuration = null;
FileSystem fs = null;
@Before
public void init() {
try {
// 参数优先级排序:(1)客户端代码中设置的值 >(2)ClassPath 下的用户自定义配置文件 >(3)然后是服务器的自定义配置(xxx-site.xml)>(4)服务器的默认配置(xxx-default.xml)
configuration = new Configuration();
// 默认文件系统的名称
configuration.set("fs.defaultFS", "hdfs://mycluster");
// namenode 集群的名字
configuration.set("dfs.nameservices", "mycluster");
// mycluster 下有两个 NameNode,逻辑地址分别是 nn1,nn2
configuration.set("dfs.ha.namenodes.mycluster", "nn1,nn2");
// nn1 的 http 通信地址
configuration.set("dfs.namenode.rpc-address.mycluster.nn1", "192.168.100.100:9000");
// nn2 的 http 通信地址
configuration.set("dfs.namenode.rpc-address.mycluster.nn2", "192.168.100.101:9000");
// 配置读取失败自动切换的实现方式
configuration.set("dfs.client.failover.proxy.provider.mycluster",
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
configuration.set("dfs.replication", "2");
System.setProperty("HADOOP_USER_NAME", "root");
// FileSystem.get(new URI("hdfs://hadoop102:9000"), conf , "root");
fs = FileSystem.get(configuration);
} catch (IOException e) {
e.printStackTrace();
}
}
@After
public void end() {
try {
fs.close();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("------end------");
}
// 创建目录
@Test
public void mkdir() throws IOException {
Path path = new Path("/test01");
if (!fs.exists(path)) {
if (fs.mkdirs(path)) {
System.out.println("创建成功!");
} else {
System.out.println("创建失败!");
}
} else {
System.out.println("文件已存在,创建失败!");
}
}
// 文件上传
@Test
public void copyFromLocal() throws IOException {
Path localPath = new Path("D:\ideawork\bigdata\apache-hadoop\src\main\resources\text.txt");
Path hdfsPath = new Path("/");
// fs.copyFromLocalFile(localPath, hdfsPath);//默认同名覆盖
fs.copyFromLocalFile(false, true, localPath, hdfsPath);
// boolean delSrc 是否将原文件删除
// boolean overwrite 是否覆盖
// Path src 指要下载的文件路径
// Path dst 指hdfs的路径
// copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
System.out.println("上传成功");
}
// 文件下载
@Test
public void copyToLocalFile() throws IOException {
Path localPath = new Path("D:\ideawork\bigdata\apache-hadoop\src\main\resources\");
Path hdfsPath = new Path("/test01/text.txt");
fs.copyToLocalFile(hdfsPath, localPath);
// boolean delSrc 指是否将原文件删除
// Path src 指要下载的文件路径
// Path dst 指将文件下载到的路径
// boolean useRawLocalFileSystem 是否开启文件校验
// copyToLocalFile(boolean delSrc, Path src, Path dst, boolean useRawLocalFileSystem)
System.out.println("下载完成");
}
// 移动&修改文件名
@Test
public void rename() throws IOException {
Path oldHdfsPath = new Path("/test01/text.txt");
Path newHdfsPath = new Path("/test01/text01.txt");
fs.rename(oldHdfsPath, newHdfsPath);
System.out.println("移动完成");
}
// 删除文件和目录
@Test
public void delete() throws IOException {
Path hdfsPath = new Path("/ok");
fs.delete(hdfsPath, true);//强制删除
System.out.println("删除完成");
}
// 查看目录/文件文件名称、权限、长度、块信息
@Test
public void file() throws IOException {
// 2 获取文件详情
RemoteIterator listFiles = fs.listFiles(new Path("/"),
true);
while (listFiles.hasNext()) {
LocatedFileStatus fileStatus = listFiles.next();
System.out.println("========" + fileStatus.getPath() + "=========");
System.out.println(fileStatus.getPermission());
System.out.println(fileStatus.getOwner());
System.out.println(fileStatus.getGroup());
System.out.println(fileStatus.getLen());
System.out.println(fileStatus.getModificationTime());
System.out.println(fileStatus.getReplication());
System.out.println(fileStatus.getBlockSize());
System.out.println(fileStatus.getPath().getName());
// 获取块信息
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
System.out.println(Arrays.toString(blockLocations));
}
}
@Test
public void listFiles() throws IOException {
//指定目录下的文件和目录
FileStatus[] listStatus = fs.listStatus(new Path("/"));
for (FileStatus fileStatus : listStatus) {
if (fileStatus.isDirectory()) {
System.out.println("目录:" + fileStatus.getPath().getName());
} else {
System.out.println("文件:" + fileStatus.getPath().getName());
}
}
}
@Test
public void listAllFiles() throws IOException {
//指定目录下的所有文件
RemoteIterator listFiles = fs.listFiles(new Path("/"), true);
while (listFiles.hasNext()) {
LocatedFileStatus locatedFileStatus = listFiles.next();
System.out.println(locatedFileStatus.getPath().getName());
}
}
}
HDFSIO.java
package com.wxl.hadoop.hdfs;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
import java.io.*;
public class HDFSIO {
// 把本地文件上传到HDFS根目录
@Test
public void putFileToHDFS() throws Exception {
// 1 获取对象
FileSystem fileSystem = HdfsUtils.getFileSystem();
// 2 获取输入流
FileInputStream fis = new FileInputStream(new File("D:\ideawork\bigdata\apache-hadoop\src\main\resources\test.txt"));
// 3 获取输出流
FSDataOutputStream fos = fileSystem.create(new Path("/hdfs/test.txt"));
// 4 流的对拷
IOUtils.copyBytes(fis, fos, fileSystem.getConf());
// 5 关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fileSystem.close();
}
// 从HDFS上下载文件到本地上
@Test
public void getFileFromHDFS() throws Exception {
// 1 获取对象
FileSystem fileSystem = HdfsUtils.getFileSystem();
// 2 获取输入流
FSDataInputStream fis = fileSystem.open(new Path("/hdfs/test.txt"));
// 3 获取输出流
FileOutputStream fos = new FileOutputStream(new File("D:\ideawork\bigdata\apache-hadoop\src\main\resources\test下载.txt"));
// 4 流的对拷
IOUtils.copyBytes(fis, fos, fileSystem.getConf());
// 5 关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fileSystem.close();
}
// 下载第一块
@Test
public void readFileSeek1() throws Exception {
// 1 获取对象
FileSystem fileSystem = HdfsUtils.getFileSystem();
// 2 获取输入流
FSDataInputStream fis = fileSystem.open(new Path("/hadoop-3.2.2.tar.gz"));
// 3 获取输出流
FileOutputStream fos = new FileOutputStream(new File("D:\ideawork\bigdata\apache-hadoop\src\main\resources\hadoop-3.2.2.tar.gz.part1"));
// 4 流的对拷(只拷贝128m)
byte[] buf = new byte[1024];
for (int i = 0; i < 1024 * 128; i++) {
fis.read(buf);
fos.write(buf);
}
// 5 关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fileSystem.close();
}
// 下载第二块
@Test
public void readFileSeek2() throws Exception {
// 1 获取对象
FileSystem fileSystem = HdfsUtils.getFileSystem();
// 2 获取输入流
FSDataInputStream fis = fileSystem.open(new Path("/hadoop-3.2.2.tar.gz"));
// 3 获取输出流
FileOutputStream fos = new FileOutputStream(new File("D:\ideawork\bigdata\apache-hadoop\src\main\resources\hadoop-3.2.2.tar.gz.part2"));
// 4 流的对拷(只拷贝128m)
byte[] buf = new byte[1024];
for (int i = 0; i < 1024 * 128; i++) {
fis.read(buf);
fos.write(buf);
}
// 5 关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fileSystem.close();
}
@Test
public void cat() throws Exception {
FileSystem fileSystem = HdfsUtils.getFileSystem();
String fileUri = "/hdfs/merge.log";
if (!fileSystem.exists(new Path(fileUri))) {
throw new RuntimeException("文件不存在");
}
FSDataInputStream inStream = fileSystem.open(new Path(fileUri));
try {
IOUtils.copyBytes(inStream, System.out, 4096, false);
} catch (Exception e) {
} finally {
IOUtils.closeStream(inStream);
}
}
}
PutMerge.java
package com.wxl.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
public class PutMerge {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
// 默认文件系统的名称
configuration.set("fs.defaultFS", "hdfs://mycluster");
// namenode 集群的名字
configuration.set("dfs.nameservices", "mycluster");
// mycluster 下有两个 NameNode,逻辑地址分别是 nn1,nn2
configuration.set("dfs.ha.namenodes.mycluster", "nn1,nn2");
// nn1 的 http 通信地址
configuration.set("dfs.namenode.rpc-address.mycluster.nn1", "192.168.100.100:9000");
// nn2 的 http 通信地址
configuration.set("dfs.namenode.rpc-address.mycluster.nn2", "192.168.100.101:9000");
// 配置读取失败自动切换的实现方式
configuration.set("dfs.client.failover.proxy.provider.mycluster",
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
configuration.set("dfs.replication", "2");
System.setProperty("HADOOP_USER_NAME", "root");
FileSystem hdfs = FileSystem.get(configuration); // get HDFS FileSystem
FileSystem local = FileSystem.getLocal(configuration); // get Local FileSystem
Path inputDir = new Path("D:\ideawork\bigdata\apache-hadoop\src\main\resources\hdfs\merge");
Path hdfsFile = new Path("/hdfs/merge.log");
FileStatus[] inputFiles = local.listStatus(inputDir); // 得到本地文件列表
FSDataOutputStream out = hdfs.create(hdfsFile); // 生成HDFS输出流
try {
for (int i = 0; i < inputFiles.length; i++) {
System.out.println("-------------" + inputFiles[i].getPath().getName() + "---------------");
FSDataInputStream in = local.open(inputFiles[i].getPath()); // 打开本地输入流
byte buffer[] = new byte[1024];
int bytesRead = 0;
while ((bytesRead = in.read(buffer)) > 0) {
out.write(buffer, 0, bytesRead);
}
in.close();
}
out.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}



