栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

55.PySpark异常分析——数据类型转换

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

55.PySpark异常分析——数据类型转换

55.1 异常情况
  • 异常一:
NameError: name 'DoubleType' is not defined
NameErrorTraceback (most recent call last)
in engine
1 schema = StructType([StructField("person_name", StringType(), False),
                       ----> 2                     StructField("person_age", DoubleType(), False)])

NameError: name 'DoubleType' is not defined

异常二:

Py4JJavaError: An error occurred while calling o152.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-172-31-26-102.ap-southeast-1.compute.internal, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/cloudera/parcels/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
process()
File "/opt/cloudera/parcels/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/cloudera/parcels/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/sql/session.py", line 509, in prepare
verify_func(obj, schema)
File "/opt/cloudera/parcels/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904/lib/spark2/python/lib/pyspark.zip/pyspark/sql/types.py", line 1360, in _verify_type
_verify_type(v, f.dataType, f.nullable)
File "/opt/cloudera/parcels/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904/lib/spark2/python/lib/pyspark.zip/pyspark/sql/types.py", line 1324, in _verify_type
raise TypeError("%s can not accept object %r in type %s" % (dataType, obj, type(obj)))
TypeError: DoubleType can not accept object u'23' in type 

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
  • 使用PySpark的SparkSQL读取HDFS的文本文件创建Dataframe时,在做数据类型转换时会出现一些异常,如下:
    • 在设置Schema字段类型为DoubleType,抛“name ‘DoubleType’ is not defined”异常;
    • 将读取的数据字段转换为DoubleType类型时抛“Double Type can not accept object u’23’ in type ”异常;
    • 将字段定义为StringType类型,SparkSQL也可以对数据进行统计如sum求和,非数值的数据不会被统计。
55.2 异常解决

异常一:

NameError: name 'DoubleType' is not defined
  • 问题原因:
    • 由于在Python代码中未引入pyspark.sql.types为DoubleType的数据类型导致
  • 解决方法:
from pyspark.sql.types import *

或者

from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType, DoubleType

异常二:

TypeError: DoubleType can not accept object u'23' in type 
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
  • 问题原因:
    • 由于Python默认的字符编码集为unicode,如果需要将字段转为Double类型,则需要进行转换。
  • 解决方法:
    • 增加【.map(lambda x: (x[0], float(x[1])))】代码,将需要转换的字段转换为float类型。
# Schema with two fields - person_name and person_age
schema = StructType([StructField("person_name", StringType(), False),
                     StructField("person_age", DoubleType(), False)])

lines = spark.read.text("/tmp/test/").rdd 
    .map(lambda x:x[0].split(",")) 
    .map(lambda x: (x[0], float(x[1])))

总结

  • 如果x[1]列的数据中有空字符串或者非数字字符串则会导致转换失败,因此在指定字段数据类型的时候,如果数据中存在“非法数据”则需要对数据进行剔除,否则不能正常执行。测试数据如下:
  • 代码执行报错如下:
Py4JJavaError: An error occurred while calling o291.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-172-31-26-102.ap-southeast-1.compute.internal, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/cloudera/parcels/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
process()
File "/opt/cloudera/parcels/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/cloudera/parcels/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "", line 1, in 
    ValueError: invalid literal for float(): 23q

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
  • 若不对“非法数据”进行剔除,则需要将该字段数据类型定义为StringType,可以正常对字段进行统计,对于非数字的数据则不进行统计。

大数据视频推荐:
CSDN
大数据语音推荐:
企业级大数据技术应用
大数据机器学习案例之推荐系统
自然语言处理
大数据基础
人工智能:深度学习入门到精通

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/321903.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号