栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink中使用Kryo序列化器的注意事项

Flink中使用Kryo序列化器的注意事项

你以为我要说的是在Flink中使用Kryo序列化吗?不是的,还记得上一篇关于Kryo序列化的问题的文章:Kryo序列化:Class Not Found的可能原因.

里面介绍了因为在Spark环境下由于类加载器原因导致Kryo反序列化时找不到类的问题。

没错,还有续集。这次是在Flink下,也出现了同样的问题。

问题复现

见如下代码,是Flink提交给YARN的主函数类,里面反序列化一个 StreamParam的参数类。这个类就在提交的jar包里。
(KryoSerializer是我们自己封装了下Kryo,里面还是Kryo实例。)

object MyFlinkDriver {

  def main(args: Array[String]): Unit = {
    Assert.paramMiss(args.length > 0, "StreamParam JsonString")

    val param = KryoSerializer.deserialize(
      EncodeUtil.base64DecodeBytes(args(0))
    ).asInstanceOf[StreamParam]

    val res = param.streamResource
    val env = getStreamExecutionEnv(res)
    new StreamGraphExecutor(param.streamGraph, param.config, env).execute()

    env.execute(res.getTaskId)
  }
}

在本地单测运行是没问题的,在服务器会发现类找不到。

Caused by: java.lang.ClassNotFoundException: com.jimo.sdk.core.analyze.stream.StreamParam
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[?:1.8.0_66]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_66]
at org.springframework.boot.loader.LaunchedURLClassLoader.loadClass(LaunchedURLClassLoader.java:151) ~[just-cmc.jar:2.2.3-SNAPSHOT]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_66]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_66]
at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_66]
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ~[kryo-2.24.0.jar!/:?]
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) ~[kryo-2.24.0.jar!/:?]
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[kryo-2.24.0.jar!/:?]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) ~[kryo-2.24.0.jar!/:?]
at com.jimo.sdk.core.serialize.KryoSerializer.deserialize(KryoSerializer.java:59) ~[?:?]
at com.jimo.executor.stream.MyFlinkDriver$.main(MyFlinkDriver.scala:19) ~[?:?]

同一个地方,同一个报错,应该是同一个原因吧。

设置类加载器

所以我们就设置类加载器吧。

	KryoSerializer.setClassLoader(MyFlinkDriver.getClass.getClassLoader)
	
	val param = KryoSerializer.deserialize(
	  EncodeUtil.base64DecodeBytes(args(0))
	).asInstanceOf[StreamParam]

当然,问题解决了,就这么简单。不过事情还没结束。

用的什么类加载器?

用的是Flink的 FlinkUserCodeClassLoader。Flink的自定义类加载器分为 parent-first和child-first,可以通过配置文件配置。

在我们这个场景下,需要配成 parent-first,不然jar包里的类是Flink的类加载器加载的,而序列化时用的是 AppClassLoader,会导致反序列化的实例不是同一个,因为是不同的类加载器加载的。

总结

将flink的flink-conf.yaml的classloader.resolve-order改为parent-first,不然类加载器导致反序列化不是同一个类同时要将Kryo的ClassLoader设置成Flink的类加载器,否则找不到类

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/784959.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号