栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

SparkCore的RDD算子mapPartitions使用案例

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SparkCore的RDD算子mapPartitions使用案例

SparkCore的RDD算子mapPartitions使用案例
mappartitions的实际应用场景
1、要对一份大体量的数据进行处理的时候,需要查询一个外部字典数据
2、如果用map算子来做,就意味着对外部字典数据查询的连接创建会被高频度地执行,效率低下
3、而用mappartitions则可以避免这个弊端: 一个分区执行一次自己的函数,那我们可以在函数中先创建一个连接,然后再去迭代处理每一行数据

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}

object RDD算子_mapPartitions {
  def main(args: Array[String]): Unit = {

    Logger.getLogger("org").setLevel(Level.WARN)
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("mappartitions测试")

    val sc = new SparkContext(conf)

    val rdd: RDD[String] = sc.textFile("data/wordcount/input")

    val rdd2: RDD[String] = rdd.flatMap(s => s.split("\s+"))


    val rdd3 = rdd2.map(s => {
      println("f1我被执行了")
      (s, 1)
    })
    // rdd3.foreach(println)

    val rdd4 = rdd2.mapPartitions(iter => {
      println("f2被执行了")
      iter.map(s => (s, 1))
    })
    //rdd4.foreach(println)

    // mappartitions的理解测试
    rdd2.mapPartitions(iter => {
      List("haha", "heihei", "xixi").iterator
    }).foreach(println)

    
    //先用map算子来实现一遍
    val sanguo: RDD[String] = sc.textFile("data/sanguo/input")
    val res = sanguo.map(line => {
      val arr: Array[String] = line.split(",")
      val id = arr(0).toInt


      val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/abc", "root", "123456")
      val stmt: PreparedStatement = conn.prepareStatement("select name,role,battel from battel where id=?")
      stmt.setInt(1, id)
      val rs: ResultSet = stmt.executeQuery()
      rs.next()
      val name = rs.getString(1)
      val role = rs.getString(2)
      val battel = rs.getInt(3)


      line + "," + name + "," + role + "," + battel
    })


    res.foreach(println)

    // 再用MapPartitions将这个实现进行优化
    val res2 = sanguo.mapPartitions(iter=>{
      // 先创建连接(这段代码只会在一个分区上执行一次)
      val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/abc", "root", "123456")
      val stmt: PreparedStatement = conn.prepareStatement("select name,role,battel from battel where id=?")


      // 再去迭代数据进行处理
      iter.map(line=>{
        val arr: Array[String] = line.split(",")
        val id = arr(0).toInt
        stmt.setInt(1, id)


        val rs: ResultSet = stmt.executeQuery()
        rs.next()
        val name = rs.getString(1)
        val role = rs.getString(2)
        val battel = rs.getInt(3)


        line + "," + name + "," + role + "," + battel
      })
    })
    res2.foreach(println)
    sc.stop()
  }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/666272.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号