您可以使用
mapPartition或
foreachPartition。这是从Learning
Spark摘录的片段
通过使用基于分区的操作,我们可以与该数据库共享一个连接池,以避免建立许多连接,并重用我们的JSON解析器。如示例6-10至6-12所示,我们使用mapPartitions()函数,该函数为我们提供了对输入RDD的每个分区中的元素的迭代器,并期望我们返回结果的迭代器。
这使我们可以为每个执行程序初始化一个连接,然后根据需要迭代分区中的元素。这对于将数据保存到某些外部数据库或创建昂贵的可重用对象非常有用。
这是从链接的书中获取的一个简单的scala示例。如果需要,可以将其翻译为java。此处只是显示mapPartition和foreachPartition的简单用例。
ipAddressRequestCount.foreachRDD { rdd => rdd.foreachPartition { partition => // Open connection to storage system (e.g. a database connection) partition.foreach { item => // Use connection to push item to system } // Close connection } }这是一个指向
Java示例的链接。



