hdfs有很多命令,比如说hdfs dfs -ls,hdfs dfs -put, hdfs dfs -mkdir,hdfs dfs -cat,hdfs dfs -rm等等,一般操作hdfs都是通过hdfs的命令行进行操作。其实这些命令行的底层都是通过java代码来支撑的,再比如启动和关闭hdfs都是通过脚本来启动,这些脚本底层肯定都是通过代码来支撑的。
举例:有时候需要把本地文件上传到hdfs,直接用hdfs的命令,先mkdir一个文件夹,然后用put命令即可上传。但是在工作当中,这些操作基本都是代码的。
下面就来看看这些命令的底层是如何实现的。
首先需要hadoop-client的依赖,关于hadoop开发只需要添加这个依赖即可,因为hadoop-client下面已经包含了相关的依赖了。
org.apache.hadoop hadoop-client3.2.2
添加完之后,在右侧的Dependencies可以看到如下:
从上图可以看到,hdfs、yarn、mapreduce的依赖都是在hadoop-client下面。
如果进行单元测试则需要添加junit的依赖:
##JUnit is a unit testing framework for Java, created by Erich Gamma and Kent Beck.
##对于java来说JUnit是一个单元测试的框架
junit
junit
4.13.1
test
以下是在单元测试中进行开发的:
package com.ruozedata.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Test;
import java.net.URI;
public class HDFSApiApp {
// API操作之从本地拷贝文件到hdfs
//以及参数设置优先级问题
@Test
public void copyFromLocalFile() throws Exception{
Configuration configuration = new Configuration();
//上面的URI可以使用这种方式进行设置
configuration.set("fs.defaultFS","hdfs://hadoop001:9000");
//configuration.set("dfs.client.use.datanode.hostname","true");
//在这里设置副本系数为1,不然的话它会默认走hdfs-default.xml。副本就为3
//hdfs dfs ls或者在web页面上可以看到有几个副本
configuration.set("dfs.replication","1");
//上面的User可以使用这种方式进行设置
System.setProperty("HADOOP_USER_NAME","ruoze");
//获取HDFS文件系统的客户端
FileSystem fileSystem = FileSystem.get(configuration);
Path src = new Path("data/wc.data");
Path dst = new Path("/hdfsapi/test");
fileSystem.copyFromLocalFile(src,dst);
//释放资源
if(null != fileSystem){
fileSystem.close();
}
}
// API操作之创建文件夹
@Test
public void mkdir01() throws Exception{
Configuration configuration = new Configuration();
//获取HDFS文件系统的客户端
FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop001:9000"),
configuration,"ruoze");
Path path = new Path("/hdfsapi/test");
boolean success = fileSystem.mkdirs(path);
//有没有创建成功,在这里断言一下,看看是不是成功的
Assert.assertEquals(true,success);
//释放资源
if(null != fileSystem){
fileSystem.close();
}
}
// API操作之创建文件夹
@Test
public void mkdir02() throws Exception{
Configuration configuration = new Configuration();
//上面的URI可以使用这种方式进行设置
configuration.set("fs.defaultFS","hdfs://hadoop001:9000");
//上面的User可以使用这种方式进行设置
System.setProperty("HADOOP_USER_NAME","ruoze");
//获取HDFS文件系统的客户端
FileSystem fileSystem = FileSystem.get(configuration);
Path path = new Path("/hdfsapi/test2");
boolean success = fileSystem.mkdirs(path);
//有没有创建成功,在这里断言一下,看看是不是成功的
Assert.assertEquals(true,success);
//释放资源
if(null != fileSystem){
fileSystem.close();
}
}
}
以上configuration配置了好几遍,代码不精简。
下面进行重构:
package com.ruozedata.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.net.URI;
public class HDFSApiApp02 {
FileSystem fileSystem;
Configuration configuration;
@Before
public void setUp()throws Exception {
System.out.println("---setUp----");
configuration = new Configuration();
configuration.set("fs.defaultFS","hdfs://hadoop001:9000");
configuration.set("dfs.replication","1");
System.setProperty("HADOOP_USER_NAME","ruoze");
fileSystem = FileSystem.get(this.configuration);
}
@After
public void tearDown()throws Exception {
System.out.println("---tearDown----");
if(null != fileSystem){
fileSystem.close();
}
}
//创建文件夹
@Test
public void mkdir() throws Exception {
fileSystem.mkdirs(new Path("/hdfsapi/test3"));
}
//从本地拷贝到hdfs
@Test
public void copyFromLocalFile() throws Exception {
Path src = new Path("data/wc.data");
Path dst = new Path("/hdfsapi/test3");
fileSystem.copyFromLocalFile(src,dst);
}
//从hdfs拷贝到本地
@Test
public void copyToLocalFile() throws Exception {
Path dst = new Path("data/wc2.txt");
Path src = new Path("/hdfsapi/test3/wc.data");
fileSystem.copyToLocalFile(src,dst);
}
//从hdfs的一个地方移动到hdfs的另一个地方
@Test
public void rename() throws Exception {
Path src = new Path("/hdfsapi/test3/wc.data");
Path dst = new Path("/hdfsapi/test3/wc2.data");
//改名和移动都可以 相当于Linux的move
fileSystem.rename(src,dst);
}
//从hdfs的文件列表操作
@Test
public void listFiles() throws Exception {
RemoteIterator files = fileSystem.listFiles(new Path("/hdfsapi/"), true);
while (files.hasNext()){
LocatedFileStatus fileStatus = files.next();
String isDir = fileStatus.isDirectory() ? "文件夹" : "文件";
String permission = fileStatus.getPermission().toString();
short replication = fileStatus.getReplication();
long len = fileStatus.getLen();
String path = fileStatus.getPath().toString();
System.out.println(isDir + "t" + permission + "t" + replication + "t" + len + "t" + path);
//拿到block信息,是一个数组
//然后用增强的for循环进行遍历
//block有很多信息,比如该块文件所在的hostname、块名称等等
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
for(BlockLocation blockLocation : blockLocations){
//拿到block的host列表,它是一个数组,生产环境上,一个块会在不同的机器上
String[] hosts = blockLocation.getHosts();
for(String host: hosts){
System.out.println(host);
}
}
}
}
//删除HDFS文件的操作
@Test
public void delete() throws Exception{
fileSystem.delete(new Path("/hdfsapi/test3/wc2.data"),false);
}
}
以上是关于mkdir、copyFromLocalFile、copyToLocalFile、rename、delete、listFiles等的操作。
API操作之IO流操作:
拷贝文件到HDFS上可以用上面的copyFromLocalFile,或者其他操作,进行操作也可以用IO流进行操作,不同的场景会用到不同的方式。
package com.ruozedata.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
public class HDFSApiApp03 {
FileSystem fileSystem;
Configuration configuration;
@Before
public void setUp()throws Exception {
System.out.println("---setUp----");
configuration = new Configuration();
configuration.set("fs.defaultFS","hdfs://hadoop001:9000");
configuration.set("dfs.replication","1");
System.setProperty("HADOOP_USER_NAME","ruoze");
fileSystem = FileSystem.get(this.configuration);
}
@After
public void tearDown()throws Exception {
System.out.println("---tearDown----");
if(null != fileSystem){
fileSystem.close();
}
}
@Test
public void copyFromLocalIO() throws Exception{
BufferedInputStream in = new BufferedInputStream(new FileInputStream(new File("data/wc.data")));
FSDataOutputStream out = fileSystem.create(new Path("/hdfsapi/test3/wc-io.data"));
IOUtils.copyBytes(in,out,4096);
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
@Test
public void copyToLocalIO() throws Exception {
FSDataInputStream in = fileSystem.open(new Path("/hdfsapi/test3/wc-io.data"));
FileOutputStream out = new FileOutputStream(new File("data/test.txt"));
IOUtils.copyBytes(in, out, 4096);
IOUtils.closeStream(out);
IOUtils.closeStream(in);
}
}



