栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

datax开发hdfswriter写入目录不存在时自动创建

datax开发hdfswriter写入目录不存在时自动创建

1.背景

在使用datax导数据到hive分区表中,配置写入目录为hive表分区为dt=datax,如果不提前创建该分区,会报目录不存在错误,于是希望二次开发,在写入时检测目录不存在自动创建此分区目录

2.开发

在HdfsHelper.java 类中添加在hdfs上创建目录的方法,并在HdfsWriter.java类中,当目录不存在时调用此方法即可

    
    public boolean createPath(String filePath) {
        Path path = new Path(filePath);
        boolean exist = false;
        try {
            if (fileSystem.exists(path)) {
                String message = String.format("文件路径[%s]已存在,无需创建!",
                        "message:filePath =" + filePath);
                LOG.info(message);
                exist = true;
            } else {
                exist = fileSystem.mkdirs(path);
            }
        } catch (IOException e) {
            String message = String.format("创建文件路径[%s]时发生网络IO异常,请检查您的网络是否正常!",
                    "message:filePath =" + filePath);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
        }
        return exist;
    }
        @Override
        public void prepare() {
            //若路径已经存在,检查path是否是目录
            if(hdfsHelper.isPathexists(path)){
                if(!hdfsHelper.isPathDir(path)){
                    throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
                            String.format("您配置的path: [%s] 不是一个合法的目录, 请您注意文件重名, 不合法目录名等情况.",
                                    path));
                }
                //根据writeMode对目录下文件进行处理
                Path[] existFilePaths = hdfsHelper.hdfsDirList(path,fileName);
                boolean isExistFile = false;
                if(existFilePaths.length > 0){
                    isExistFile = true;
                }
                
                if ("append".equalsIgnoreCase(writeMode)) {
                    LOG.info(String.format("由于您配置了writeMode append, 写入前不做清理工作, [%s] 目录下写入相应文件名前缀  [%s] 的文件",
                            path, fileName));
                } else if ("nonconflict".equalsIgnoreCase(writeMode) && isExistFile) {
                    LOG.info(String.format("由于您配置了writeMode nonConflict, 开始检查 [%s] 下面的内容", path));
                    List allFiles = new ArrayList();
                    for (Path eachFile : existFilePaths) {
                        allFiles.add(eachFile.toString());
                    }
                    LOG.error(String.format("冲突文件列表为: [%s]", StringUtils.join(allFiles, ",")));
                    throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
                            String.format("由于您配置了writeMode nonConflict,但您配置的path: [%s] 目录不为空, 下面存在其他文件或文件夹.", path));
                }else if ("truncate".equalsIgnoreCase(writeMode) && isExistFile) {
                    LOG.info(String.format("由于您配置了writeMode truncate,  [%s] 下面的内容将被覆盖重写", path));
                    hdfsHelper.deleteFiles(existFilePaths);
                }
            }else{
                LOG.info(String.format("您配置的路径: [%s] 不存在,自动为您创建此路径",path));
                hdfsHelper.createPath(path);
            }
        }
3.打包

打包并替换掉集群的datax安装目录datax/plugin/writer/hdfswriter/hdfswriter-0.0.1-SNAPSHOT.jar即可

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/671575.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号