栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark操作MongoDB案例

Spark操作MongoDB案例

一、引入依赖

        pom文件中引入mongo-spark依赖,如下所示:

 
            org.mongodb.spark
            mongo-spark-connector_2.11
            ${spark.version}
 

二、spark写数据到MongoDB
object SparkMongoWriteJob {
  def main(args: Array[String]): Unit = {
    //Spark配置项
    val conf = new SparkConf()
      .setAppName("SparkMongoWriteJob")
      .setMaster("local[2]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    val ss = SparkSession.builder()
      .config(conf)
      .getOrCreate()
    val testList = List(
      Test(1, "张三"),
      Test(2, "李四"),
      Test(3, "王五"),
      Test(4, "赵六"),
      Test(5, "秦七")
    )

    val dataDF = ss.createDataframe(testList)
    dataDF.show()

    //数据写入Mongo
    val config = WriteConfig.create("test",  //数据库名
      "test",   //表名
      "mongodb://localhost",  //uri
      2,   //线程数
      WriteConcern.ACKNOWLEDGED)

    MongoSpark.save(dataDF, config)

    //关闭资源
    ss.stop()


  }

  //样例测试类
  case class Test(_id: Int, name: String)

}
三、spark从MongoDB读取数据
object SparkMongoReadJob {
  def main(args: Array[String]): Unit = {
    //Spark配置项
    val conf = new SparkConf()
      .setAppName("SparkMongoReadJob")
      .setMaster("local[2]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    val ss = SparkSession.builder()
      .config(conf)
      .getOrCreate()

    val map = new util.HashMap[String,String]()
    map.put("spark.mongodb.input.database","test")  //数据库
    map.put("spark.mongodb.input.collection","test")  //表
    map.put("spark.mongodb.input.uri","mongodb://localhost")  //uri


    val readConfig = ReadConfig.create(map)
    val dataDF = MongoSpark.load(ss,readConfig)
    dataDF.show()


    //关闭资源
    ss.stop()


  }

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/650921.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号