在实际项目中,分别使用Hive、SparkSQL、Impala对ORC、Parquet格式数据进行性能查询测试后(Impala 3.1版本之后才可以使用ORC格式),发现Impala对Parquet数据进行查询聚合效率最高,就对后续的数据存储有了Parquet格式存储需求。
1.Java动态生成Parquet文件本样例是根据动态传参在本地(也可以直接在HDFS上)生成Parquet文件,具体原理如下:
1.根据传入参数组合parquet的schema信息;
private static String getFinalSchemaStr(Map columnTypeMap) {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("message schema {");
for (Map.Entry stringStringEntry : columnTypeMap.entrySet()) {
stringBuilder.append("optional ");
String tpye = getParquetType(stringStringEntry.getValue());
stringBuilder.append(tpye+" "+stringStringEntry.getKey()+";");
}
stringBuilder.append("}");
return stringBuilder.toString();
}
private static String getParquetType(String value) {
switch (value){
case "string":
return "binary";
case "int":
return "int32";
case "double":
return "double";
case "date":
return "int96";
default:
return "binary";
}
}
2.生成Parquet格式文件写入本地,具体参数见注释,此处异常抛出不处理,数值类型可以扩展;
public static void parquetWriter(String targetPath, List resultDatas, List columns, linkedHashMap columnTypeMap) throws Exception {
String schemaStr = getFinalSchemaStr(columnTypeMap);
MessageType schema = MessageTypeParser.parseMessageType(schemaStr);
// Path file = new Path("C:\Temp\test3.parq");
Path file = new Path(targetPath);
ExampleParquetWriter.Builder builder = ExampleParquetWriter
.builder(file).withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
.withCompressionCodec(CompressionCodecName.SNAPPY)
//.withConf(configuration)
.withType(schema);
ParquetWriter writer = builder.build();
SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema);
for (String[] resultdata: resultDatas) {
Group group = groupFactory.newGroup();
for (int i = 0; i < resultData.length; i++) {
String columnType = columnTypeMap.get(columns.get(i));
String value = resultData[i];
if (columnType.equals("string")){
group.append(columns.get(i),value);
}
if (columnType.equals("int")){
group.append(columns.get(i),Integer.parseInt(value));
}
if (columnType.equals("double")){
group.append(columns.get(i),Double.parseDouble(value));
}
if (columnType.equals("date")){
DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
group.append(columns.get(i),sdf.parse(value).getTime());
}
}
writer.write(group);
}
writer.close();
}
2.把生成的Parquet导入Hive数仓中
在第一步中可以直接将数据写入Hive在HDFS上的存储位置,此步骤仅记录另一种实现方法过程,即在Hive中建表并直接将数据load到Hive中,整个过程也可以在java中直接实现,此处不再赘述。
1.在Hive中创建表存储Parquet格式数据:
create table table_name1 (id string, name string, score double) STORED AS PARQUET; 或 create table table_name1 (id string, name string, score double) partitioned by (dt string) STORED AS PARQUET;
2.将数据load到Hive中:
<1>load本地数据
load data local inpath '/path/test.parquet' into table default.table_name1;
<2>load HDFS路径数据
load data inpath '/path/test.parquet' into table default.table_name1;



