栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

一种处理Hive元数据与文件类型不同时SQL查询失败的方法(二)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

一种处理Hive元数据与文件类型不同时SQL查询失败的方法(二)

文章目录

一、异常触发SQL二、异常处理三、Hive on Spark依赖的Hive jar包部署
继上一篇之后,又发现了一种新的报错位置。本篇对这种情况进行处理,并验证这种处理方式是否适用于Hive on Spark环境。

一、异常触发SQL

构造测试数据
(1) 建表,插入数据

create table t1(id float,content string) stored as parquet;
insert into t1 vlaues(1.1,'content1'),(2.2,'content2');
create table error_type(id int,content string) stored as parquet;

(2) 拷贝文件到类型不兼容的表

hdfs dfs -cp /user/hive/warehouse/testdb.db/t1/000000_0 /user/hive/warehouse/testdb.db/error_type/

在前面两步之后,执行如下SQL:

select * from error_type where content='content1';

报错并有如下错误日志:

Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row [Error getting row data with exception java.lang.ClassCastException: org.apache.hadoop.io.FloatWritable cannot be cast to org.apache.hadoop.io.IntWritable
	at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector.get(WritableIntObjectInspector.java:36)
	at org.apache.hadoop.hive.serde2.SerDeUtils.buildJSonString(SerDeUtils.java:227)
	at org.apache.hadoop.hive.serde2.SerDeUtils.buildJSonString(SerDeUtils.java:364)
	at org.apache.hadoop.hive.serde2.SerDeUtils.getJSonString(SerDeUtils.java:200)
	at org.apache.hadoop.hive.serde2.SerDeUtils.getJSonString(SerDeUtils.java:186)
	at org.apache.hadoop.hive.ql.exec.MapOperator.toErrorMessage(MapOperator.java:520)
	at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:489)
	at org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:133)
	at org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:48)
	at org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:27)
	at org.apache.hadoop.hive.ql.exec.spark.HivebaseFunctionResultList.hasNext(HivebaseFunctionResultList.java:85)
	at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$12.apply(AsyncRDDActions.scala:127)
	at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$12.apply(AsyncRDDActions.scala:127)
	at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2232)
	at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2232)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
 ]
	at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:494) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
	at org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:133) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
	at org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:48) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
	at org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:27) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
	at org.apache.hadoop.hive.ql.exec.spark.HivebaseFunctionResultList.hasNext(HivebaseFunctionResultList.java:85) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
	at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42) ~[scala-library-2.11.12.jar:?]
	at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[scala-library-2.11.12.jar:?]
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[scala-library-2.11.12.jar:?]
	at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$12.apply(AsyncRDDActions.scala:127) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
	at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$12.apply(AsyncRDDActions.scala:127) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
	at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2232) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
	at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2232) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
	at org.apache.spark.scheduler.Task.run(Task.scala:121) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
	... 3 more
Caused by: java.lang.ClassCastException: org.apache.hadoop.io.FloatWritable cannot be cast to org.apache.hadoop.io.IntWritable
	at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector.get(WritableIntObjectInspector.java:36) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
	at org.apache.hadoop.hive.serde2.lazy.LazyUtils.writePrimitiveUTF8(LazyUtils.java:251) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
	at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serialize(LazySimpleSerDe.java:292) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
	at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serializeField(LazySimpleSerDe.java:247) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
	at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.doSerialize(LazySimpleSerDe.java:231) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
	at org.apache.hadoop.hive.serde2.AbstractEncodingAwareSerDe.serialize(AbstractEncodingAwareSerDe.java:55) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
	at org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:732) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
	at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:882) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
	at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:95) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
	at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:882) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
	at org.apache.hadoop.hive.ql.exec.FilterOperator.process(FilterOperator.java:126) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
	at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:882) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
	at org.apache.hadoop.hive.ql.exec.TableScanOperator.process(TableScanOperator.java:130) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
	at org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.forward(MapOperator.java:146) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
	at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:484) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
	at org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:133) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
	at org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:48) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
	at org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:27) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
	at org.apache.hadoop.hive.ql.exec.spark.HivebaseFunctionResultList.hasNext(HivebaseFunctionResultList.java:85) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
	at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42) ~[scala-library-2.11.12.jar:?]
	at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[scala-library-2.11.12.jar:?]
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[scala-library-2.11.12.jar:?]
	at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$12.apply(AsyncRDDActions.scala:127) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
	at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$12.apply(AsyncRDDActions.scala:127) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
	at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2232) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
	at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2232) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
	at org.apache.spark.scheduler.Task.run(Task.scala:121) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
	... 3 more
二、异常处理

其中org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.doSerialize(LazySimpleSerDe.java:231)函数中有序列化每个字段的逻辑:

  
  @Override
  public Writable doSerialize(Object obj, ObjectInspector objInspector)
      throws SerDeException {

    if (objInspector.getCategory() != Category.STRUCT) {
      throw new SerDeException(getClass().toString()
          + " can only serialize struct types, but we got: "
          + objInspector.getTypeName());
    }

    // Prepare the field ObjectInspectors
    StructObjectInspector soi = (StructObjectInspector) objInspector;
    List fields = soi.getAllStructFieldRefs();
    List list = soi.getStructFieldsDataAsList(obj);
    List declaredFields = (serdeParams.getRowTypeInfo() != null && ((StructTypeInfo) serdeParams.getRowTypeInfo())
        .getAllStructFieldNames().size() > 0) ? ((StructObjectInspector) getObjectInspector())
        .getAllStructFieldRefs()
        : null;

    serializeStream.reset();
    serializedSize = 0;

    // Serialize each field
    for (int i = 0; i < fields.size(); i++) {
      // Append the separator if needed.
      if (i > 0) {
        serializeStream.write(serdeParams.getSeparators()[0]);
      }
      // Get the field objectInspector and the field object.
      ObjectInspector foi = fields.get(i).getFieldObjectInspector();
      Object f = (list == null ? null : list.get(i));

      if (declaredFields != null && i >= declaredFields.size()) {
        throw new SerDeException("Error: expecting " + declaredFields.size()
            + " but asking for field " + i + "n" + "data=" + obj + "n"
            + "tableType=" + serdeParams.getRowTypeInfo().toString() + "n"
            + "dataType="
            + TypeInfoUtils.getTypeInfoFromObjectInspector(objInspector));
      }

      serializeField(serializeStream, f, foi, serdeParams);
    }

    // TODO: The copy of data is unnecessary, but there is no work-around
    // since we cannot directly set the private byte[] field inside Text.
    serializeCache
        .set(serializeStream.getData(), 0, serializeStream.getLength());
    serializedSize = serializeStream.getLength();
    lastOperationSerialize = true;
    lastOperationDeserialize = false;
    return serializeCache;
  }
 

for循环中的最后一行serializeField(serializeStream, f, foi, serdeParams);调用的即是异常堆栈中的org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serializeField(LazySimpleSerDe.java:247)函数:

  protected void serializeField(ByteStream.Output out, Object obj, ObjectInspector objInspector,
      LazySerDeParameters serdeParams) throws SerDeException {
    try {
      serialize(out, obj, objInspector, serdeParams.getSeparators(), 1, serdeParams.getNullSequence(),
          serdeParams.isEscaped(), serdeParams.getEscapeChar(), serdeParams.getNeedsEscape());
    } catch (IOException e) {
      throw new SerDeException(e);
    }
  }

serializeField中调用的serialize函数为异常堆栈中的org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serialize(LazySimpleSerDe.java:292)函数:

  
  public static void serialize(ByteStream.Output out, Object obj,
      ObjectInspector objInspector, byte[] separators, int level,
      Text nullSequence, boolean escaped, byte escapeChar, boolean[] needsEscape)
      throws IOException, SerDeException {

    if (obj == null) {
      out.write(nullSequence.getBytes(), 0, nullSequence.getLength());
      return;
    }

    char separator;
    List list;
    switch (objInspector.getCategory()) {
    case PRIMITIVE:
      LazyUtils.writePrimitiveUTF8(out, obj,
          (PrimitiveObjectInspector) objInspector, escaped, escapeChar,
          needsEscape);
      return;
    case LIST:
      ......
      return;
    case MAP:
      ......
      return;
    case STRUCT:
      ......
      return;
    case UNIOn:
      ......
      return;
    default:
      break;
    }

    throw new RuntimeException("Unknown category type: "
        + objInspector.getCategory());
  }

在这个函数中,调用后续针对特定类型的函数对特定类型进行序列化,类型不兼容时则抛出异常。可以看到当前字段数据为空时,有如下逻辑:

if (obj == null) {
      out.write(nullSequence.getBytes(), 0, nullSequence.getLength());
      return;
    }

所以还是可以在LazySimpleSerDe.doSerialize函数中处理每个字段的逻辑中,捕获ClassCastException,并参考serialize函数这种逻辑写入空值,将LazySimpleSerDe.doSerialize中

serializeField(serializeStream, f, foi, serdeParams);

改成

try {
    serializeField(serializeStream, f, foi, serdeParams);
} catch (ClassCastException | UnsupportedOperationException e) {
    serializeStream.write(serdeParams.getNullSequence().getBytes(), 0, serdeParams.getNullSequence().getLength());
}
三、Hive on Spark依赖的Hive jar包部署

上面代码修改后,用前一篇文章中的copy_jars.sh脚本将hive*.jar部署后,Hive默认的MR执行引擎已经可以执行本文开始提到的会报错的SQL,但是当Hive使用Spark作为执行引擎时(如beeline中可通过set hive.execution.engine=spark;设置),仍然会报错,猜测Spark使用的Hive依赖包在另外的位置也存放了一份。

从前面的日志可以看出,一部分日志后面都显示了hive-exec-2.1.1-cdh6.3.0.jar这个jar包名,在部署了CDH的主机上搜索这个jar包:

[root@dev-master2 tmp]# find / -name hive-exec-2.1.1-cdh6.3.0.jar
/opt/cloudera/cm/cloudera-navigator-server/libs/cdh6/hive-exec-2.1.1-cdh6.3.0.jar
/opt/cloudera/cm/cloudera-scm-telepub/libs/cdh6/hive-exec-2.1.1-cdh6.3.0.jar
/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/jars/hive-exec-2.1.1-cdh6.3.0.jar
/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/hive/hive-exec-2.1.1-cdh6.3.0.jar
/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hive/lib/hive-exec-2.1.1-cdh6.3.0.jar
/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/oozie/embedded-oozie-server/webapp/WEB-INF/lib/hive-exec-2.1.1-cdh6.3.0.jar

看起来/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/hive/hive-exec-2.1.1-cdh6.3.0.jar这个就是Spark使用的Hive依赖包存放位置,且这个目录下只有一个jar包:

[root@dev-master2 tmp]# ls -lh /opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/hive/
total 35M
-rw-r--r--. 1 root root 35M Jul 19  2019 hive-exec-2.1.1-cdh6.3.0.jar

所以在copy_jars.sh中添加一句

cp $current_dir/hive-exec-2.1.1-cdh6.3.0.jar /opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/hive/

再重新部署,经测试,Hive on Spark已经可以查询类型不兼容的类型,结果显示为空值。

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号