包含spark远程调试总结demo分享
用scala实现spark读取并处理数据然后提交到mongodb案例linux和windows环境:hadoop-2.7.5、spark2.1.2、jdk1.8、scala2.11、mongodb2.0.3 (linux和windows版本要保持一致如果不不一致,会报ClassNotFound等异常)
该文章将详细叙述单机测试,在spark运行,IDEA-spark调试,三个部分。
1.环境简述这里我在开了三台虚拟机,hadoop01、02、03。
hadoop01为namenode,02和03为datanode。
hadoop01为master和worker。02和03为worker。
首先需要开启hadoop集群,然后开启spark集群。这里要保证自己的集群节点都工作正常。并且你的windowsPC内存要大最好是16G要不然集群跑起来很卡。且如果不够大的话在运行过程中会报内存溢出异常和outOfIndex,或者StackOverflow异常。然后开启hadoop和spark的history服务。jps检查一下是否都开起正常。
3.IDEA完成demo代码本地调试// An highlighted block
package com.ynu.recommender
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import com.mongodb.casbah.commons.MongoDBObject
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataframe, SparkSession}
case class Product( productId: Int, name: String, imageUrl: String, categories: String, tags: String )
case class Rating( userId: Int, productId: Int, score: Double, timestamp: Int )
case class MongoConfig( uri: String, db: String )
object DataLoader {
//在本地调试的时候注意两点
//1.必须配置和你pom文件相同版本的hadoop环境
//2.在环境目录下(hadoop-2.7.5/bin/)看是否有winutils.exe。如果没有自己在Github上免费下载一个和自己版本最接近的。
System.setProperty("hadoop.home.dir", "你本机的hadoop环境路径")
// 定义数据文件路径
val PRODUCT_DATA_PATH = "windows上文件地址"
val RATING_DATA_PATH = "windows上文件地址"
// 定义mongodb中存储的表名
val MONGODB_PRODUCT_COLLECTION = "Product"
val MONGODB_RATING_COLLECTION = "Rating"
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local",
"mongo.uri" -> "mongodb://mongodb主机的ip:27017/recommender",
"mongo.db" -> "recommender"
)
// 创建一个spark config
val sparkConf = new SparkConf().setMaster("spark.cores").setAppName("DataLoader").setJars(List("file:///D:/BigDataShow/XXX/target/DataLoader-1.0-SNAPSHOT.jar"))
//这里的setjars是windows上jar包的绝对路径,加file表示绝对路径。
// 创建spark session
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
// 加载数据
val productRDD = spark.sparkContext.textFile(PRODUCT_DATA_PATH)
val productDF = productRDD.map( item => {
// product数据通过^分隔,切分出来
val attr = item.split("\^")
// 转换成Product
Product( attr(0).toInt, attr(1).trim, attr(4).trim, attr(5).trim, attr(6).trim )
} ).toDF()
println("333")
val ratingRDD = spark.sparkContext.textFile(RATING_DATA_PATH)
val ratingDF = ratingRDD.map( item => {
val attr = item.split(",")
Rating( attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt )
} ).toDF()
implicit val mongoConfig = MongoConfig( config("mongo.uri"), config("mongo.db") )
storeDataInMongoDB( productDF, ratingDF )
spark.stop()
}
def storeDataInMongoDB( productDF: Dataframe, ratingDF: Dataframe )(implicit mongoConfig: MongoConfig): Unit ={
// 新建一个mongodb的连接,客户端
val mongoClient = MongoClient( MongoClientURI(mongoConfig.uri) )
// 定义要操作的mongodb表,可以理解为 db.Product
val productCollection = mongoClient( mongoConfig.db )( MONGODB_PRODUCT_COLLECTION )
val ratingCollection = mongoClient( mongoConfig.db )( MONGODB_RATING_COLLECTION )
// 如果表已经存在,则删掉
productCollection.dropCollection()
ratingCollection.dropCollection()
// 将当前数据存入对应的表中
productDF.write
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_PRODUCT_COLLECTION)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
ratingDF.write
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
// 对表创建索引
productCollection.createIndex( MongoDBObject( "productId" -> 1 ) )
ratingCollection.createIndex( MongoDBObject( "productId" -> 1 ) )
ratingCollection.createIndex( MongoDBObject( "userId" -> 1 ) )
mongoClient.close()
}
}
// An highlighted block
写好之后就可以直接点击右键然后run了。
4.IDEA完成demo代码提交到集群运行由于是提交到集群,我们有很多的依赖,所以必须要打包。怎么打包?
我这里是用maven-plugin打包。需要在pom中添加如下的依赖
这样的话你打包的话就可以在IDEA 用maven命令:mvn package。把demo用到的依赖都打进去。大概有100多M的样子。否则你运行就会报类找不到异常。下面是代码:
// An highlighted block
package com.ynu.recommender
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import com.mongodb.casbah.commons.MongoDBObject
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataframe, SparkSession}
case class Product( productId: Int, name: String, imageUrl: String, categories: String, tags: String )
case class Rating( userId: Int, productId: Int, score: Double, timestamp: Int )
case class MongoConfig( uri: String, db: String )
object DataLoader {
//在本地调试的时候注意两点
//1.必须配置和你pom文件相同版本的hadoop环境
//2.在环境目录下(hadoop-2.7.5/bin/)看是否有winutils.exe。如果没有自己在Github上免费下载一个和自己版本最接近的。
System.setProperty("hadoop.home.dir", "你虚拟机的hadoop环境路径")
// 定义数据文件路径
//注意这些csv文件每个节点都要有,要不然filenotfound异常
val PRODUCT_DATA_PATH = "hdfs://hadoop01:9000/user/XXX.CSV"
val RATING_DATA_PATH = "hdfs://hadoop01:9000/user/XXXX.CSV"
// 定义mongodb中存储的表名
val MONGODB_PRODUCT_COLLECTION = "Product"
val MONGODB_RATING_COLLECTION = "Rating"
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "spark://hadoop01:7077",
"mongo.uri" -> "mongodb://mongodb主机的ip:27017/recommender",
"mongo.db" -> "recommender"
)
// 创建一个spark config
val sparkConf = new SparkConf().setMaster("spark.cores").setAppName("DataLoader").setJars(List("/opt/XXX/test/DataLoader-1.0-SNAPSHOT.jar"))
//这里的setjars改为master主机(hadoop01)上的绝对路径
//jar包只要在master节点有就行,他会分发到worker中。当然你也可以都放。
// 创建spark session
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
// 加载数据
val productRDD = spark.sparkContext.textFile(PRODUCT_DATA_PATH)
val productDF = productRDD.map( item => {
// product数据通过^分隔,切分出来
val attr = item.split("\^")
// 转换成Product
Product( attr(0).toInt, attr(1).trim, attr(4).trim, attr(5).trim, attr(6).trim )
} ).toDF()
val ratingRDD = spark.sparkContext.textFile(RATING_DATA_PATH)
val ratingDF = ratingRDD.map( item => {
val attr = item.split(",")
Rating( attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt )
} ).toDF()
implicit val mongoConfig = MongoConfig( config("mongo.uri"), config("mongo.db") )
storeDataInMongoDB( productDF, ratingDF )
spark.stop()
}
def storeDataInMongoDB( productDF: Dataframe, ratingDF: Dataframe )(implicit mongoConfig: MongoConfig): Unit ={
// 新建一个mongodb的连接,客户端
val mongoClient = MongoClient( MongoClientURI(mongoConfig.uri) )
// 定义要操作的mongodb表,可以理解为 db.Product
val productCollection = mongoClient( mongoConfig.db )( MONGODB_PRODUCT_COLLECTION )
val ratingCollection = mongoClient( mongoConfig.db )( MONGODB_RATING_COLLECTION )
// 如果表已经存在,则删掉
productCollection.dropCollection()
ratingCollection.dropCollection()
// 将当前数据存入对应的表中
productDF.write
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_PRODUCT_COLLECTION)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
ratingDF.write
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
// 对表创建索引
productCollection.createIndex( MongoDBObject( "productId" -> 1 ) )
ratingCollection.createIndex( MongoDBObject( "productId" -> 1 ) )
ratingCollection.createIndex( MongoDBObject( "userId" -> 1 ) )
mongoClient.close()
}
}
// An highlighted block
最后在linux上我们就可以spark-submit了:
spark-submit --master spark://192.168.125.129:7077 --class com.ynu.recommender.DataLoader /opt/test/DataLoader-1.0-SNAPSHOT-jar-with-dependencies.jar
只不过这里的–master设不设置都一样,因为代码里面已经写好了。
这里给大家放一篇参考配置的blog:
https://blog.csdn.net/zgjdzwhy/article/details/81632978
这里叙述三点:
1.如果按照上述blog老哥的来无法使用spark-submit。那么恢复spark-class之前的内容。转而在spark-env.sh添加jvm的参数。
2.集群调试时出现一直accetped,则将hadoop的capacity-scheduler.xml里面的yarn.scheduler.capacity.maximum-am-resource-percent修改如下:
3.需要读取的文件要么以args参数传入,要么是hdfs路径(所有节点都要有该文件)。setjars方法里面的路径是windows本地jar包的路径,且master上的jar包要和本地的相同。若果出现ClassNotFound则在打包的时候把所有依赖排除
运行结果
可以看到数据已经导入到mongodb中了。



