因业务需求hive表需要使用parquet格式文件,但是datax导入时不支持parquet格式文件,于是手动开发hdfswriter
官方文档:https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md
2. 修改代码:hdfswriter模块1. HdfsHelper.java 增加写parquet文件的方法
public void parquetFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName,
TaskPluginCollector taskPluginCollector) {
List columns = config.getListConfiguration(Key.COLUMN);
String compress = config.getString(Key.COMPRESS, null);
List columnNames = getColumnNames(columns);
List columnTypeInspectors = getColumnTypeInspectors(columns);
StructObjectInspector inspector = (StructObjectInspector) ObjectInspectorFactory
.getStandardStructObjectInspector(columnNames, columnTypeInspectors);
ParquetHiveSerDe parquetHiveSerDe = new ParquetHiveSerDe();
MapredParquetOutputFormat outFormat = new MapredParquetOutputFormat();
if (!"NONE".equalsIgnoreCase(compress) && null != compress) {
Class extends CompressionCodec> codecClass = getCompressCodec(compress);
if (null != codecClass) {
outFormat.setOutputCompressorClass(conf, codecClass);
}
}
try {
Properties colProperties = new Properties();
colProperties.setProperty("columns", String.join(",", columnNames));
List colType = Lists.newArrayList();
columns.forEach(c -> colType.add(c.getString(Key.TYPE)));
colProperties.setProperty("columns.types", String.join(",", colType));
RecordWriter writer = (RecordWriter) outFormat.getHiveRecordWriter(conf, new Path(fileName), ObjectWritable.class, true, colProperties, Reporter.NULL);
Record record = null;
while ((record = lineReceiver.getFromReader()) != null) {
MutablePair, Boolean> transportResult = transportOneRecord(record, columns, taskPluginCollector);
if (!transportResult.getRight()) {
writer.write(null, parquetHiveSerDe.serialize(transportResult.getLeft(), inspector));
}
}
writer.close(Reporter.NULL);
} catch (Exception e) {
String message = String.format("写文件文件[%s]时发生IO异常,请检查您的网络是否正常!", fileName);
LOG.error(message);
Path path = new Path(fileName);
deleteDir(path.getParent());
throw DataXException.asDataXException(HdfsWriterErrorCode.Write_FILE_IO_ERROR, e);
}
}
2.HdfsWriter.java 修改部分代码,判断配置 "fileType": "parquet" 时,执行写parquet文件相关代码
private void validateParameter() {
this.defaultFS = this.writerSliceConfig.getNecessaryValue(Key.DEFAULT_FS, HdfsWriterErrorCode.REQUIRED_VALUE);
//fileType check
this.fileType = this.writerSliceConfig.getNecessaryValue(Key.FILE_TYPE, HdfsWriterErrorCode.REQUIRED_VALUE);
if( !fileType.equalsIgnoreCase("ORC") && !fileType.equalsIgnoreCase("TEXT") && !fileType.equalsIgnoreCase("PARQUET")){
String message = "HdfsWriter插件目前只支持ORC和TEXT和PARQUET三种格式的文件,请将filetype选项的值配置为ORC或者TEXT或者PARQUET";
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, message);
}
//path
this.path = this.writerSliceConfig.getNecessaryValue(Key.PATH, HdfsWriterErrorCode.REQUIRED_VALUE);
if(!path.startsWith("/")){
String message = String.format("请检查参数path:[%s],需要配置为绝对路径", path);
LOG.error(message);
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, message);
}else if(path.contains("*") || path.contains("?")){
String message = String.format("请检查参数path:[%s],不能包含*,?等特殊字符", path);
LOG.error(message);
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, message);
}
//fileName
this.fileName = this.writerSliceConfig.getNecessaryValue(Key.FILE_NAME, HdfsWriterErrorCode.REQUIRED_VALUE);
//columns check
this.columns = this.writerSliceConfig.getListConfiguration(Key.COLUMN);
if (null == columns || columns.size() == 0) {
throw DataXException.asDataXException(HdfsWriterErrorCode.REQUIRED_VALUE, "您需要指定 columns");
}else{
for (Configuration eachColumnConf : columns) {
eachColumnConf.getNecessaryValue(Key.NAME, HdfsWriterErrorCode.COLUMN_REQUIRED_VALUE);
eachColumnConf.getNecessaryValue(Key.TYPE, HdfsWriterErrorCode.COLUMN_REQUIRED_VALUE);
}
}
//writeMode check
this.writeMode = this.writerSliceConfig.getNecessaryValue(Key.WRITE_MODE, HdfsWriterErrorCode.REQUIRED_VALUE);
writeMode = writeMode.toLowerCase().trim();
Set supportedWriteModes = Sets.newHashSet("append", "nonconflict", "truncate");
if (!supportedWriteModes.contains(writeMode)) {
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
String.format("仅支持append, nonConflict, truncate三种模式, 不支持您配置的 writeMode 模式 : [%s]",
writeMode));
}
this.writerSliceConfig.set(Key.WRITE_MODE, writeMode);
//fieldDelimiter check
this.fieldDelimiter = this.writerSliceConfig.getString(Key.FIELD_DELIMITER,null);
if(null == fieldDelimiter){
throw DataXException.asDataXException(HdfsWriterErrorCode.REQUIRED_VALUE,
String.format("您提供配置文件有误,[%s]是必填参数.", Key.FIELD_DELIMITER));
}else if(1 != fieldDelimiter.length()){
// warn: if have, length must be one
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
String.format("仅仅支持单字符切分, 您配置的切分为 : [%s]", fieldDelimiter));
}
//compress check
this.compress = this.writerSliceConfig.getString(Key.COMPRESS,null);
if(fileType.equalsIgnoreCase("TEXT")){
Set textSupportedCompress = Sets.newHashSet("GZIP", "BZIP2");
//用户可能配置的是compress:"",空字符串,需要将compress设置为null
if(StringUtils.isBlank(compress) ){
this.writerSliceConfig.set(Key.COMPRESS, null);
}else {
compress = compress.toUpperCase().trim();
if(!textSupportedCompress.contains(compress) ){
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
String.format("目前TEXT FILE仅支持GZIP、BZIP2 两种压缩, 不支持您配置的 compress 模式 : [%s]",
compress));
}
}
}else if(fileType.equalsIgnoreCase("ORC")){
Set orcSupportedCompress = Sets.newHashSet("NONE", "SNAPPY");
if(null == compress){
this.writerSliceConfig.set(Key.COMPRESS, "NONE");
}else {
compress = compress.toUpperCase().trim();
if(!orcSupportedCompress.contains(compress)){
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
String.format("目前ORC FILE仅支持SNAPPY压缩, 不支持您配置的 compress 模式 : [%s]",
compress));
}
}
}else if(fileType.equalsIgnoreCase("PARQUET")){
Set parquetSupportedCompress = Sets.newHashSet("NONE", "SNAPPY");
if(null == compress){
this.writerSliceConfig.set(Key.COMPRESS, "NONE");
}else {
compress = compress.toUpperCase().trim();
if(!parquetSupportedCompress.contains(compress)){
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
String.format("目前SNAPPY FILE仅支持SNAPPY压缩, 不支持您配置的 compress 模式 : [%s]",
compress));
}
}
}
@Override
public void startWrite(RecordReceiver lineReceiver) {
LOG.info("begin do write...");
LOG.info(String.format("write to file : [%s]", this.fileName));
if(fileType.equalsIgnoreCase("TEXT")){
//写TEXT FILE
hdfsHelper.textFileStartWrite(lineReceiver,this.writerSliceConfig, this.fileName,
this.getTaskPluginCollector());
}else if(fileType.equalsIgnoreCase("ORC")){
//写ORC FILE
hdfsHelper.orcFileStartWrite(lineReceiver,this.writerSliceConfig, this.fileName,
this.getTaskPluginCollector());
}else if(fileType.equalsIgnoreCase("PARQUET")){
//写PARQUET FILE
hdfsHelper.parquetFileStartWrite(lineReceiver,this.writerSliceConfig, this.fileName,
this.getTaskPluginCollector());
}
LOG.info("end do write");
}
3.打包
打包并替换掉集群的datax安装目录datax/plugin/writer/hdfswriter/hdfswriter-0.0.1-SNAPSHOT.jar即可



