栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SparkSQL 算子接口调用,数据编码报错

SparkSQL 算子接口调用,数据编码报错

Filter算子接口调用有null的列报错

问题背景

具体信息分析过程

问题背景

编写spark-sql 算子ut,自定构造数据,调用列式filter验证对null处理。

具体信息

1、方法调用栈

org.apache.spark.sql.execution.ColumnarFilterExecSuite *** ABORTED ***
  java.lang.UnsupportedOperationException: No Encoder found for Any
- field (class: "java.lang.Object", name: "_4")
- root class: "scala.Tuple4"
  at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$1(ScalaReflection.scala:591)
  at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
  at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:904)
  at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:903)
  at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
  at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:432)
  at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$6(ScalaReflection.scala:577)
  at scala.collection.immutable.List.map(List.scala:297)
  at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$1(ScalaReflection.scala:562)
  at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)


报错的数据(未指定数据类型)

inputDfWithNull = Seq(
  (null, "", 4, 2.0),
  (null, null, 1, 1.0),
  (" add", "World", 8, null),
  (" yeah  ", "yeah", 10, 8.0),
  (" yeah  ", "yeah", 10, 8.0)
).toDF("a", "b", "c", "d")

不报错的数据(指定数据类型)

inputDfWithNull = Seq[(String, String, java.lang.Integer, java.lang.Double)](
  (null, "", 4, 2.0),
  (null, null, 1, 1.0),
  (" add", "World", 8, null),
  (" yeah  ", "yeah", 10, 8.0),
  (" yeah  ", "yeah", 10, 8.0)
).toDF("a", "b", "c", "d")
分析过程

1、调用原生spark是否报错? 一样报错
2、报错是类型编码问题,可能是数据转换原因,指定编码? 指定编码比较麻烦
3、使用其他创建df的方式,在创建时指定每列数据类型(参考原生spark 算子ut代码)
创建方式1

sparkContext.parallelize(Seq(
      Row(1, 2.0),
      Row(1, 2.0),
      Row(2, 1.0),
      Row(2, 1.0),
      Row(3, 3.0),
      Row(null, null),
      Row(null, 5.0),
      Row(6, null)
    )), new StructType().add("a", IntegerType).add("b", DoubleType))

创建方式2

spark.createDataframe(
    sparkContext.parallelize(Seq(
      Row(1, 2.0),
      Row(1, 2.0),
      Row(2, 1.0),
      Row(2, 1.0),
      Row(3, 3.0),
      Row(null, null),
      Row(null, 5.0),
      Row(6, null)
    )), new StructType().add("a", IntegerType).add("b", DoubleType))
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/746167.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号