环境准备:
配置的时候确实很麻烦…
win10和Linux下环境都要配置好 java 1.8.0_152 Hadoop 2.7.3 Scala 2.10.6 Spark 1.6.0 IntelliJ IDEA 2016.3.4文章目录
运行Spark程序
在开发环境下运行Spark在集群环境中运行Spark
在IDEA中打包工程(输出JAR)编译生成Artifact 持久化数据分区
运行Spark程序 在开发环境下运行Sparkimport org.apache.spark.{SparkConf,SparkContext}
object WorldCount{
def main(args: Array[String]): Unit = {
//以本地方式执行,可以指定线程数
val conf=new SparkConf().setAppName("WorldCount").setMaster("local")
val sc=new SparkContext(conf)
sc.setLogLevel("ERROR")//清除无用INFO信息
//Hadoop位置,添加环境变量后可以不写
System.setProperty("hadoop.home.dir","D:\ProgramFiles\hadoop")
//输入文件可以是本地win10文件,也可以是HDFS
val input="words.txt"
//计算各个单词出现个数
val count=sc.textFile(input).flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y)
count.foreach(x=>println(x._1+","+x._2))
}
}
在集群环境中运行Spark
import org.apache.spark.{SparkConf,SparkContext}
object WorldCount{
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("WorldCount")
val sc=new SparkContext(conf)
val input=args(0)
val output=args(1)
val count=sc.textFile(input).flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y)
count.repartition(1).saveAsTextFile(output)
}
}
在IDEA中打包工程(输出JAR)
File->Project Structure->Artifacts->"+"->JAR->Empty
自定义JAR包名为"word",设置好后点OK
Build->Build Artifacts->Bulid
生成Artifact后,工程目录会有/out目录,可以看到生成的JAR包,JAR包处单击右键,Show in Explorer,可以直达JAR包所在文件夹。
将JAR包和win10本地的words.txt文件上传到Linux的/opt目录下,再将/opt/words.txt上传到HDFS的/user/root下
可能需要sudo chmod a+w /opt/给写入的权限,不然文件传不到虚拟机
hadoop@master:/opt$ ls word.jar words.txt
启动Hadoop和Spark
hdfs dfs -put words.txt /user/root/
下面spark-submit提交这个命令一定要注意!我被这坑了,运行报错我还以为我环境配错了,删了虚拟机重装了一遍。如果你程序放在包下面,程序开头会有package xxx.xxx.xxx…,我是直接在src下建的WorldCount,所以没有包,如果有包的话,例如你的包名是demo.spark,那么下面这个提交命令你得改一下,将WorldCount改为demo.spark.WorldCount(好吧,我发现我把Word写成了World,不改了,我都上传到Linux了,重装了一遍环境我现在人都麻了…)
cd spark/bin/ ./spark-submit --master yarn-cluster --class WorldCount /opt/word.jar /user/root/words.txt /user/root/word_count
[root@master bin]# hdfs dfs -cat /user/root/word_count/part-00000 (Hello,3) (Real,1) (MapReduce,1) (World,2) (Our,1) (BigData,2) (Great,1) (Hadoop,3)持久化
设置存储级别为MEMORY_ONLY,如不持久化RDD,求和求均值都计算data。持久化RDD后,求和计算data,求均值时直接从内存中读取数据。
scala> import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel scala> val data=sc.parallelize(List(1,2,3,4,5,6)).map(x=>x*x) data: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at数据分区:28 scala> data.persist(StorageLevel.MEMORY_ONLY) res0: data.type = MapPartitionsRDD[1] at map at :28 scala> println(data.sum) 91.0 scala> println(data.mean) 15.166666666666668
package demo.partition
import org.apache.spark.Partitioner
class MyPartition(numParts: Int) extends Partitioner {
override def numPartitions = numParts //返回想要创建分区个数
override def getPartition(key: Any) = {
if (key.toString().toInt % 2 == 0) {
//如果是偶数存储在0分区,否则存储在1分区
0
} else {
1
}
}
override def equals(other: Any): Boolean = other match {
//比较自定义的分区器是否与其他RDD分区一样,固定的一种格式
case mypartition: MyPartition => mypartition.numPartitions == numPartitions
case _ => false
}
}
package demo.partition
import org.apache.spark.{SparkConf, SparkContext}
object ToDistribute{
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("test partition")
val sc=new SparkContext(conf)
val input=args(0)
val output=args(1)
val data=sc.textFile(input).map{x=>val y=x.split(",");(y(0),y(1))}
val data2=data.partitionBy(new MyPartition(2))
data2.saveAsTextFile(output)
}
}
将工程重新编译
提交到Spark
./spark-submit --master yarn-cluster --class demo.partition.ToDistribute /opt/word.jar /user/root/user.txt /user/root/part
[root@master bin]# hdfs dfs -cat /user/root/part/part-00000 (4,David) (6,Fran) (2,Bob) [root@master bin]# hdfs dfs -cat /user/root/part/part-00001 (1,Alice) (3,Charlie) (5,Ed) [root@master bin]#
RDD默认分区1,通过coalesce重分区个数为5,未生效;将shuffle参数设置为true后,分区个数被重设为5;接着通过repartition将原RDD重设分区为10,RDD数据被分到10个分区中。
scala> val rdd=sc.parallelize(List(1,2,3,4,5,6,7)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at:28 scala> rdd.partitions.size res3: Int = 1 scala> val rdd1=rdd.coalesce(5) rdd1: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[6] at coalesce at :30 scala> rdd1.partitions.size res4: Int = 1 scala> val rdd2=rdd.coalesce(5,true) rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[10] at coalesce at :30 scala> rdd2.partitions.size res5: Int = 5 scala> rdd.repartition(10).partitions.size res6: Int = 10



