栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

用scala实现spark读取并处理数据然后提交到mongodb案例,包含远程调试spark总结分享

用scala实现spark读取并处理数据然后提交到mongodb案例,包含远程调试spark总结分享

包含spark远程调试总结demo分享

用scala实现spark读取并处理数据然后提交到mongodb案例

linux和windows环境:hadoop-2.7.5、spark2.1.2、jdk1.8、scala2.11、mongodb2.0.3 (linux和windows版本要保持一致如果不不一致,会报ClassNotFound等异常)

该文章将详细叙述单机测试,在spark运行,IDEA-spark调试,三个部分。

1.环境简述

这里我在开了三台虚拟机,hadoop01、02、03。
hadoop01为namenode,02和03为datanode。
hadoop01为master和worker。02和03为worker。

2.集群开启

首先需要开启hadoop集群,然后开启spark集群。这里要保证自己的集群节点都工作正常。并且你的windowsPC内存要大最好是16G要不然集群跑起来很卡。且如果不够大的话在运行过程中会报内存溢出异常和outOfIndex,或者StackOverflow异常。然后开启hadoop和spark的history服务。jps检查一下是否都开起正常。

3.IDEA完成demo代码本地调试
// An highlighted block

package com.ynu.recommender

import com.mongodb.casbah.{MongoClient, MongoClientURI}
import com.mongodb.casbah.commons.MongoDBObject
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataframe, SparkSession}
case class Product( productId: Int, name: String, imageUrl: String, categories: String, tags: String )


case class Rating( userId: Int, productId: Int, score: Double, timestamp: Int )


case class MongoConfig( uri: String, db: String )
object DataLoader {
   //在本地调试的时候注意两点
   //1.必须配置和你pom文件相同版本的hadoop环境
   //2.在环境目录下(hadoop-2.7.5/bin/)看是否有winutils.exe。如果没有自己在Github上免费下载一个和自己版本最接近的。
  System.setProperty("hadoop.home.dir", "你本机的hadoop环境路径")
  // 定义数据文件路径
  val PRODUCT_DATA_PATH = "windows上文件地址"
  val RATING_DATA_PATH = "windows上文件地址"
  // 定义mongodb中存储的表名
  val MONGODB_PRODUCT_COLLECTION = "Product"
  val MONGODB_RATING_COLLECTION = "Rating"

  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local",
      "mongo.uri" -> "mongodb://mongodb主机的ip:27017/recommender",
      "mongo.db" -> "recommender"
    )
    // 创建一个spark config
    val sparkConf = new SparkConf().setMaster("spark.cores").setAppName("DataLoader").setJars(List("file:///D:/BigDataShow/XXX/target/DataLoader-1.0-SNAPSHOT.jar"))
    //这里的setjars是windows上jar包的绝对路径,加file表示绝对路径。
    // 创建spark session
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._

    // 加载数据
    val productRDD = spark.sparkContext.textFile(PRODUCT_DATA_PATH)
    val productDF = productRDD.map( item => {
      // product数据通过^分隔,切分出来
      val attr = item.split("\^")
      // 转换成Product
      Product( attr(0).toInt, attr(1).trim, attr(4).trim, attr(5).trim, attr(6).trim )

    } ).toDF()
    println("333")
    val ratingRDD = spark.sparkContext.textFile(RATING_DATA_PATH)
    val ratingDF = ratingRDD.map( item => {
      val attr = item.split(",")
      Rating( attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt )
    } ).toDF()

    implicit val mongoConfig = MongoConfig( config("mongo.uri"), config("mongo.db") )
    storeDataInMongoDB( productDF, ratingDF )

    spark.stop()
  }
  def storeDataInMongoDB( productDF: Dataframe, ratingDF: Dataframe )(implicit mongoConfig: MongoConfig): Unit ={
    // 新建一个mongodb的连接,客户端
    val mongoClient = MongoClient( MongoClientURI(mongoConfig.uri) )
    // 定义要操作的mongodb表,可以理解为 db.Product
    val productCollection = mongoClient( mongoConfig.db )( MONGODB_PRODUCT_COLLECTION )
    val ratingCollection = mongoClient( mongoConfig.db )( MONGODB_RATING_COLLECTION )

    // 如果表已经存在,则删掉
    productCollection.dropCollection()
    ratingCollection.dropCollection()
    // 将当前数据存入对应的表中
    productDF.write
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_PRODUCT_COLLECTION)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()
    ratingDF.write
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_RATING_COLLECTION)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    // 对表创建索引
    productCollection.createIndex( MongoDBObject( "productId" -> 1 ) )
    ratingCollection.createIndex( MongoDBObject( "productId" -> 1 ) )
    ratingCollection.createIndex( MongoDBObject( "userId" -> 1 ) )

    mongoClient.close()
  }
}
// An highlighted block

写好之后就可以直接点击右键然后run了。

4.IDEA完成demo代码提交到集群运行

由于是提交到集群,我们有很多的依赖,所以必须要打包。怎么打包?
我这里是用maven-plugin打包。需要在pom中添加如下的依赖

这样的话你打包的话就可以在IDEA 用maven命令:mvn package。把demo用到的依赖都打进去。大概有100多M的样子。否则你运行就会报类找不到异常。下面是代码:

// An highlighted block

package com.ynu.recommender

import com.mongodb.casbah.{MongoClient, MongoClientURI}
import com.mongodb.casbah.commons.MongoDBObject
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataframe, SparkSession}
case class Product( productId: Int, name: String, imageUrl: String, categories: String, tags: String )


case class Rating( userId: Int, productId: Int, score: Double, timestamp: Int )

case class MongoConfig( uri: String, db: String )
object DataLoader {
   //在本地调试的时候注意两点
   //1.必须配置和你pom文件相同版本的hadoop环境
   //2.在环境目录下(hadoop-2.7.5/bin/)看是否有winutils.exe。如果没有自己在Github上免费下载一个和自己版本最接近的。
  System.setProperty("hadoop.home.dir", "你虚拟机的hadoop环境路径")
  // 定义数据文件路径
  //注意这些csv文件每个节点都要有,要不然filenotfound异常
  val PRODUCT_DATA_PATH = "hdfs://hadoop01:9000/user/XXX.CSV"
  val RATING_DATA_PATH = "hdfs://hadoop01:9000/user/XXXX.CSV"
  // 定义mongodb中存储的表名
  val MONGODB_PRODUCT_COLLECTION = "Product"
  val MONGODB_RATING_COLLECTION = "Rating"

  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "spark://hadoop01:7077",
      "mongo.uri" -> "mongodb://mongodb主机的ip:27017/recommender",
      "mongo.db" -> "recommender"
    )
    // 创建一个spark config
    val sparkConf = new SparkConf().setMaster("spark.cores").setAppName("DataLoader").setJars(List("/opt/XXX/test/DataLoader-1.0-SNAPSHOT.jar"))
    //这里的setjars改为master主机(hadoop01)上的绝对路径
    //jar包只要在master节点有就行,他会分发到worker中。当然你也可以都放。
    // 创建spark session
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._

    // 加载数据
    val productRDD = spark.sparkContext.textFile(PRODUCT_DATA_PATH)
    val productDF = productRDD.map( item => {
      // product数据通过^分隔,切分出来
      val attr = item.split("\^")
      // 转换成Product
      Product( attr(0).toInt, attr(1).trim, attr(4).trim, attr(5).trim, attr(6).trim )

    } ).toDF()
    val ratingRDD = spark.sparkContext.textFile(RATING_DATA_PATH)
    val ratingDF = ratingRDD.map( item => {
      val attr = item.split(",")
      Rating( attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt )
    } ).toDF()

    implicit val mongoConfig = MongoConfig( config("mongo.uri"), config("mongo.db") )
    storeDataInMongoDB( productDF, ratingDF )

    spark.stop()
  }
  def storeDataInMongoDB( productDF: Dataframe, ratingDF: Dataframe )(implicit mongoConfig: MongoConfig): Unit ={
    // 新建一个mongodb的连接,客户端
    val mongoClient = MongoClient( MongoClientURI(mongoConfig.uri) )
    // 定义要操作的mongodb表,可以理解为 db.Product
    val productCollection = mongoClient( mongoConfig.db )( MONGODB_PRODUCT_COLLECTION )
    val ratingCollection = mongoClient( mongoConfig.db )( MONGODB_RATING_COLLECTION )

    // 如果表已经存在,则删掉
    productCollection.dropCollection()
    ratingCollection.dropCollection()
    // 将当前数据存入对应的表中
    productDF.write
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_PRODUCT_COLLECTION)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()
    ratingDF.write
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_RATING_COLLECTION)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    // 对表创建索引
    productCollection.createIndex( MongoDBObject( "productId" -> 1 ) )
    ratingCollection.createIndex( MongoDBObject( "productId" -> 1 ) )
    ratingCollection.createIndex( MongoDBObject( "userId" -> 1 ) )

    mongoClient.close()
  }
}
// An highlighted block

最后在linux上我们就可以spark-submit了:
spark-submit --master spark://192.168.125.129:7077 --class com.ynu.recommender.DataLoader /opt/test/DataLoader-1.0-SNAPSHOT-jar-with-dependencies.jar
只不过这里的–master设不设置都一样,因为代码里面已经写好了。

scala的demo和spark远程调试。

这里给大家放一篇参考配置的blog:
https://blog.csdn.net/zgjdzwhy/article/details/81632978
这里叙述三点:
1.如果按照上述blog老哥的来无法使用spark-submit。那么恢复spark-class之前的内容。转而在spark-env.sh添加jvm的参数。
2.集群调试时出现一直accetped,则将hadoop的capacity-scheduler.xml里面的yarn.scheduler.capacity.maximum-am-resource-percent修改如下:
3.需要读取的文件要么以args参数传入,要么是hdfs路径(所有节点都要有该文件)。setjars方法里面的路径是windows本地jar包的路径,且master上的jar包要和本地的相同。若果出现ClassNotFound则在打包的时候把所有依赖排除

运行结果

可以看到数据已经导入到mongodb中了。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/487805.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号