栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark作业序列化问题解决方案

spark作业序列化问题解决方案

问题现象及原因

开发spark作业过程中经常会遇到序列化问题,并出现Task not serializable、java.io.NotSerializableException这样的报错。

org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
	^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimetaken$(ProgressReporter.scala:350)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimetaken(StreamExecution.scala:69)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
Caused by: java.io.NotSerializableException: okhttp3.Request$Builder

首先来说一下产生这个问题的原因,spark作为分布式计算引擎,当你提交作业后:

  1. driver节点会生成执行计划,并将任务(transformation)序列化
  2. 将序列化后的任务分发到集群上适合的工作节点
  3. 工作节点对任务进行反序列化,并执行任务
  4. 将任务执行结果返回给driver节点

在step1,driver节点会尝试序列化要发送到worker节点的对象,如果对象无法序列化,作业就会失败并抛出上面的异常。

解决方案
  • 把对象序列化
    此方法仅适用于内部对象,将对象 implements Serializable接口即可
    对于一些无法序列化的对象,可以尝试使用以下方法

  • 把对象实例化放到transformation的lambda表达式或udf中 
    JavaRDD rdd = sc.textFile("/tmp/myfile");
    rdd.map(s -> {
        NotSerializable notSerializable = new NotSerializable();
        return notSerializable.run(s);
    }).collect();
  • 将无法序列化的对象置为静态变量
    private static NotSerializable notSerializable = new NotSerializable();
    
    public static void main( String[] args ) throws Exception {
        JavaRDD rdd = sc.textFile("/tmp/myfile");
    
        rdd.map(s -> notSerializable.run(s)).collect();
    
    }
    
  • 调用rdd.forEachPartition, 将对象示例化放入其中
    JavaRDD rdd = sc.textFile("/tmp/myfile");
    rdd.forEachPartition(iter -> {
        NotSerializable notSerializable = new NotSerializable();
        
        // 业务处理代码
    
    });

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/601316.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号