栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

将Spark Dataframe保存到Elasticsearch

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

将Spark Dataframe保存到Elasticsearch

这个问题的答案很棘手,但是由于samklr,我设法弄清了问题所在。

但是,该解决方案并非简单明了,可能会考虑一些“不必要的”转换。

首先让我们谈谈 序列化

在数据的Spark序列化和功能序列化中要考虑两个方面的序列化。在这种情况下,它与数据序列化以及反序列化有关。

从Spark的角度来看,唯一需要做的就是设置序列化-
默认情况下,Spark依赖Java序列化,这很方便,但是效率很低。这就是Hadoop本身引入自己的序列化机制和类型(即)的原因

Writables
。因此,
InputFormat
OutputFormats
要求返回
Writables
其,开箱即用,星火不明白。

使用elasticsearch-spark连接器,必须启用一种不同的序列化(Kryo),该序列化可以自动处理转换,并且还可以非常高效地完成转换。

conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

即使因为Kryo不需要类实现要序列化的特定接口,这也意味着POJO可以在RDD中使用,而无需进行任何其他工作即可启用Kryo序列化。

就是说,@ samklr向我指出,Kryo需要在使用它们之前注册类。

这是因为Kryo编写了对要序列化的对象的类的引用(对于每个写入的对象,将写入一个引用),如果该类已注册,则它只是一个整数标识符,否则为完整的类名。Spark代表您注册Scala类和许多其他框架类(例如Avro
Generic或Thrift类)。

用Kryo注册课程很简单。创建KryoRegistrator的子类,并重写该

registerClasses()
方法:

public class MyKryoRegistrator implements KryoRegistrator, Serializable {    @Override    public void registerClasses(Kryo kryo) {        // Product POJO associated to a product Row from the Dataframe         kryo.register(Product.class);     }}

最后,在驱动程序中,将spark.kryo.registrator属性设置为KryoRegistrator实现的完全限定的类名:

conf.set("spark.kryo.registrator", "MyKryoRegistrator")

其次,甚至考虑到设置了Kryo序列化器并注册了类,并对Spark 1.5进行了更改,并且由于某种原因,Elasticsearch无法 反序列化
Dataframe,因为它无法

SchemaType
将Dataframe的内容推断到连接器中。

所以我不得不将数据框转换为JavaRDD

JavaRDD<Product> products = df.javaRDD().map(new Function<Row, Product>() {    public Product call(Row row) throws Exception {        long id = row.getLong(0);        String title = row.getString(1);        String description = row.getString(2);        int merchantId = row.getInt(3);        double price = row.getDecimal(4).doublevalue();        String keywords = row.getString(5);        long brandId = row.getLong(6);        int categoryId = row.getInt(7);        return new Product(id, title, description, merchantId, price, keywords, brandId, categoryId);    }});

现在可以准备将​​数据写入elasticsearch了:

JavaEsSpark.saveToEs(products, "test/test");

参考文献:

  • Elasticsearch的Apache Spark支持文档。
  • Hadoop最终指南,第19章。Spark,编辑。4 –汤姆·怀特。
  • 用户samklr。


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/380323.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号