栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Java动态生成parquet格式数据并导入Hive

Java动态生成parquet格式数据并导入Hive

前言:

        在实际项目中,分别使用Hive、SparkSQL、Impala对ORC、Parquet格式数据进行性能查询测试后(Impala 3.1版本之后才可以使用ORC格式),发现Impala对Parquet数据进行查询聚合效率最高,就对后续的数据存储有了Parquet格式存储需求。

1.Java动态生成Parquet文件

本样例是根据动态传参在本地(也可以直接在HDFS上)生成Parquet文件,具体原理如下:

1.根据传入参数组合parquet的schema信息;

    
    private static String getFinalSchemaStr(Map columnTypeMap) {
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append("message schema {");
        for (Map.Entry stringStringEntry : columnTypeMap.entrySet()) {
            stringBuilder.append("optional ");
            String tpye = getParquetType(stringStringEntry.getValue());
            stringBuilder.append(tpye+" "+stringStringEntry.getKey()+";");
        }
        stringBuilder.append("}");
        return stringBuilder.toString();
    }


    
    private static String getParquetType(String value) {
        switch (value){
            case "string":
                return "binary";
            case "int":
                return "int32";
            case "double":
                return "double";
            case "date":
                return "int96";
            default:
                return "binary";
        }
    }

 2.生成Parquet格式文件写入本地,具体参数见注释,此处异常抛出不处理,数值类型可以扩展;

    
    public static void parquetWriter(String targetPath, List resultDatas, List columns, linkedHashMap columnTypeMap) throws Exception {
        String schemaStr = getFinalSchemaStr(columnTypeMap);
        MessageType schema = MessageTypeParser.parseMessageType(schemaStr);
//        Path file = new Path("C:\Temp\test3.parq");
        Path file = new Path(targetPath);
        ExampleParquetWriter.Builder builder = ExampleParquetWriter
                .builder(file).withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
                .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
                .withCompressionCodec(CompressionCodecName.SNAPPY)
                //.withConf(configuration)
                .withType(schema);
        ParquetWriter writer = builder.build();
        SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema);
        for (String[] resultdata: resultDatas) {
            Group group = groupFactory.newGroup();
            for (int i = 0; i < resultData.length; i++) {
                String columnType = columnTypeMap.get(columns.get(i));
                String value = resultData[i];
                if (columnType.equals("string")){
                    group.append(columns.get(i),value);
                }
                if (columnType.equals("int")){
                    group.append(columns.get(i),Integer.parseInt(value));
                }
                if (columnType.equals("double")){
                    group.append(columns.get(i),Double.parseDouble(value));
                }
                if (columnType.equals("date")){
                    DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    group.append(columns.get(i),sdf.parse(value).getTime());
                }
            }
            writer.write(group);
        }
        writer.close();
    }
2.把生成的Parquet导入Hive数仓中

        在第一步中可以直接将数据写入Hive在HDFS上的存储位置,此步骤仅记录另一种实现方法过程,即在Hive中建表并直接将数据load到Hive中,整个过程也可以在java中直接实现,此处不再赘述。

1.在Hive中创建表存储Parquet格式数据:

create table table_name1
(id string, name string, score double) 
STORED AS PARQUET;

或

create table table_name1
(id string, name string, score double) 
partitioned by (dt string) 
STORED AS PARQUET;

2.将数据load到Hive中:

<1>load本地数据

load data local inpath '/path/test.parquet' into table default.table_name1;

<2>load HDFS路径数据

load data inpath '/path/test.parquet' into table default.table_name1;

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/720545.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号