栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

JAVA HDFS API编程

JAVA HDFS API编程

hdfs有很多命令,比如说hdfs dfs -ls,hdfs dfs -put, hdfs dfs -mkdir,hdfs dfs -cat,hdfs dfs -rm等等,一般操作hdfs都是通过hdfs的命令行进行操作。其实这些命令行的底层都是通过java代码来支撑的,再比如启动和关闭hdfs都是通过脚本来启动,这些脚本底层肯定都是通过代码来支撑的。
举例:有时候需要把本地文件上传到hdfs,直接用hdfs的命令,先mkdir一个文件夹,然后用put命令即可上传。但是在工作当中,这些操作基本都是代码的。
下面就来看看这些命令的底层是如何实现的。
首先需要hadoop-client的依赖,关于hadoop开发只需要添加这个依赖即可,因为hadoop-client下面已经包含了相关的依赖了。

        
            org.apache.hadoop
            hadoop-client
            3.2.2
        

添加完之后,在右侧的Dependencies可以看到如下:

从上图可以看到,hdfs、yarn、mapreduce的依赖都是在hadoop-client下面。

如果进行单元测试则需要添加junit的依赖:

##JUnit is a unit testing framework for Java, created by Erich Gamma and Kent Beck.
##对于java来说JUnit是一个单元测试的框架
        
            junit
            junit
            4.13.1
            test
        


以下是在单元测试中进行开发的:

package com.ruozedata.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Test;

import java.net.URI;


public class HDFSApiApp {

    


    // API操作之从本地拷贝文件到hdfs
    //以及参数设置优先级问题
    
    @Test
    public void copyFromLocalFile() throws Exception{
        Configuration configuration = new Configuration();
        //上面的URI可以使用这种方式进行设置
        configuration.set("fs.defaultFS","hdfs://hadoop001:9000");
        //configuration.set("dfs.client.use.datanode.hostname","true");
        //在这里设置副本系数为1,不然的话它会默认走hdfs-default.xml。副本就为3
        //hdfs dfs ls或者在web页面上可以看到有几个副本
        configuration.set("dfs.replication","1");
        //上面的User可以使用这种方式进行设置
        System.setProperty("HADOOP_USER_NAME","ruoze");

        //获取HDFS文件系统的客户端
        FileSystem fileSystem = FileSystem.get(configuration);

        Path src = new Path("data/wc.data");
        Path dst = new Path("/hdfsapi/test");

        fileSystem.copyFromLocalFile(src,dst);

        //释放资源
        if(null != fileSystem){
            fileSystem.close();
        }
    }





    // API操作之创建文件夹
    @Test
    public void mkdir01() throws Exception{
        Configuration configuration = new Configuration();

        //获取HDFS文件系统的客户端
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop001:9000"),
                configuration,"ruoze");

        Path path = new Path("/hdfsapi/test");
        boolean success = fileSystem.mkdirs(path);
        //有没有创建成功,在这里断言一下,看看是不是成功的
        Assert.assertEquals(true,success);

        //释放资源
        if(null != fileSystem){
            fileSystem.close();
        }
    }


    // API操作之创建文件夹
    
    @Test
    public void mkdir02() throws Exception{
        Configuration configuration = new Configuration();
        //上面的URI可以使用这种方式进行设置
        configuration.set("fs.defaultFS","hdfs://hadoop001:9000");
        //上面的User可以使用这种方式进行设置
        System.setProperty("HADOOP_USER_NAME","ruoze");

        //获取HDFS文件系统的客户端
        FileSystem fileSystem = FileSystem.get(configuration);

        Path path = new Path("/hdfsapi/test2");
        boolean success = fileSystem.mkdirs(path);
        //有没有创建成功,在这里断言一下,看看是不是成功的
        Assert.assertEquals(true,success);

        //释放资源
        if(null != fileSystem){
            fileSystem.close();
        }
    }
}

以上configuration配置了好几遍,代码不精简。
下面进行重构:

package com.ruozedata.hadoop;


        import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.fs.*;
        import org.junit.After;
        import org.junit.Assert;
        import org.junit.Before;
        import org.junit.Test;

        import java.net.URI;


public class HDFSApiApp02 {

    FileSystem fileSystem;
    Configuration configuration;

    @Before
    public void setUp()throws Exception {
        System.out.println("---setUp----");
        configuration = new Configuration();
        configuration.set("fs.defaultFS","hdfs://hadoop001:9000");
        configuration.set("dfs.replication","1");
        System.setProperty("HADOOP_USER_NAME","ruoze");
        fileSystem = FileSystem.get(this.configuration);
    }

    @After
    public void tearDown()throws Exception {
        System.out.println("---tearDown----");
        if(null != fileSystem){
            fileSystem.close();
        }
    }


    //创建文件夹
    @Test
    public void mkdir() throws Exception {
        fileSystem.mkdirs(new Path("/hdfsapi/test3"));
    }

    //从本地拷贝到hdfs
    @Test
    public void copyFromLocalFile() throws Exception {
        Path src = new Path("data/wc.data");
        Path dst = new Path("/hdfsapi/test3");
        fileSystem.copyFromLocalFile(src,dst);
    }

    //从hdfs拷贝到本地
    @Test
    public void copyToLocalFile() throws Exception {
        Path dst = new Path("data/wc2.txt");
        Path src = new Path("/hdfsapi/test3/wc.data");
        fileSystem.copyToLocalFile(src,dst);
    }

    //从hdfs的一个地方移动到hdfs的另一个地方
    @Test
    public void rename() throws Exception {
        Path src = new Path("/hdfsapi/test3/wc.data");
        Path dst = new Path("/hdfsapi/test3/wc2.data");
        //改名和移动都可以 相当于Linux的move
        fileSystem.rename(src,dst);
    }

    //从hdfs的文件列表操作
    
    @Test
    public void listFiles() throws Exception {
        RemoteIterator files = fileSystem.listFiles(new Path("/hdfsapi/"), true);

        while (files.hasNext()){
            LocatedFileStatus fileStatus = files.next();
            String isDir = fileStatus.isDirectory() ? "文件夹" : "文件";
            String permission = fileStatus.getPermission().toString();
            short replication = fileStatus.getReplication();
            long len = fileStatus.getLen();
            String path = fileStatus.getPath().toString();

            System.out.println(isDir + "t" + permission + "t" + replication + "t" + len + "t" + path);

            //拿到block信息,是一个数组
            //然后用增强的for循环进行遍历
            //block有很多信息,比如该块文件所在的hostname、块名称等等
            BlockLocation[] blockLocations = fileStatus.getBlockLocations();
            for(BlockLocation blockLocation : blockLocations){
                //拿到block的host列表,它是一个数组,生产环境上,一个块会在不同的机器上
                String[] hosts = blockLocation.getHosts();
                for(String host: hosts){
                    System.out.println(host);
                }

            }

        }
    }

    //删除HDFS文件的操作
    @Test
    public void delete() throws Exception{
        fileSystem.delete(new Path("/hdfsapi/test3/wc2.data"),false);
    }
}

以上是关于mkdir、copyFromLocalFile、copyToLocalFile、rename、delete、listFiles等的操作。

API操作之IO流操作:
拷贝文件到HDFS上可以用上面的copyFromLocalFile,或者其他操作,进行操作也可以用IO流进行操作,不同的场景会用到不同的方式。

package com.ruozedata.hadoop;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;


public class HDFSApiApp03 {

    FileSystem fileSystem;
    Configuration configuration;

    @Before
    public void setUp()throws Exception {
        System.out.println("---setUp----");
        configuration = new Configuration();
        configuration.set("fs.defaultFS","hdfs://hadoop001:9000");
        configuration.set("dfs.replication","1");
        System.setProperty("HADOOP_USER_NAME","ruoze");
        fileSystem = FileSystem.get(this.configuration);
    }

    @After
    public void tearDown()throws Exception {
        System.out.println("---tearDown----");
        if(null != fileSystem){
            fileSystem.close();
        }
    }


    
    @Test
    public void copyFromLocalIO() throws Exception{

        BufferedInputStream in = new BufferedInputStream(new FileInputStream(new File("data/wc.data")));
        FSDataOutputStream out = fileSystem.create(new Path("/hdfsapi/test3/wc-io.data"));

        IOUtils.copyBytes(in,out,4096);
        IOUtils.closeStream(in);
        IOUtils.closeStream(out);
    }

    
    @Test
    public void copyToLocalIO() throws Exception {
        FSDataInputStream in = fileSystem.open(new Path("/hdfsapi/test3/wc-io.data"));
        FileOutputStream out = new FileOutputStream(new File("data/test.txt"));

        IOUtils.copyBytes(in, out, 4096);

        IOUtils.closeStream(out);
        IOUtils.closeStream(in);
    }

}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/676839.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号