栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark编程进阶学习

Spark编程进阶学习

环境准备:
配置的时候确实很麻烦…

win10和Linux下环境都要配置好
java 1.8.0_152
Hadoop 2.7.3
Scala 2.10.6
Spark 1.6.0
IntelliJ IDEA 2016.3.4

文章目录

运行Spark程序

在开发环境下运行Spark在集群环境中运行Spark

在IDEA中打包工程(输出JAR)编译生成Artifact 持久化数据分区

运行Spark程序 在开发环境下运行Spark
import org.apache.spark.{SparkConf,SparkContext}
object WorldCount{
  def main(args: Array[String]): Unit = {
    //以本地方式执行,可以指定线程数
    val conf=new SparkConf().setAppName("WorldCount").setMaster("local")
    val sc=new SparkContext(conf)
    sc.setLogLevel("ERROR")//清除无用INFO信息
    //Hadoop位置,添加环境变量后可以不写
    System.setProperty("hadoop.home.dir","D:\ProgramFiles\hadoop")
    //输入文件可以是本地win10文件,也可以是HDFS
    val input="words.txt"
    //计算各个单词出现个数
    val count=sc.textFile(input).flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y)
    count.foreach(x=>println(x._1+","+x._2))
  }
}

在集群环境中运行Spark

import org.apache.spark.{SparkConf,SparkContext}
object WorldCount{
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("WorldCount")
    val sc=new SparkContext(conf)
    val input=args(0)
    val output=args(1)
    val count=sc.textFile(input).flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y)
    count.repartition(1).saveAsTextFile(output)
  }
}
在IDEA中打包工程(输出JAR)

File->Project Structure->Artifacts->"+"->JAR->Empty
自定义JAR包名为"word",设置好后点OK

编译生成Artifact

Build->Build Artifacts->Bulid

生成Artifact后,工程目录会有/out目录,可以看到生成的JAR包,JAR包处单击右键,Show in Explorer,可以直达JAR包所在文件夹。

将JAR包和win10本地的words.txt文件上传到Linux的/opt目录下,再将/opt/words.txt上传到HDFS的/user/root下
可能需要sudo chmod a+w /opt/给写入的权限,不然文件传不到虚拟机

hadoop@master:/opt$ ls
word.jar  words.txt

启动Hadoop和Spark

hdfs dfs -put words.txt /user/root/

下面spark-submit提交这个命令一定要注意!我被这坑了,运行报错我还以为我环境配错了,删了虚拟机重装了一遍。如果你程序放在包下面,程序开头会有package xxx.xxx.xxx…,我是直接在src下建的WorldCount,所以没有包,如果有包的话,例如你的包名是demo.spark,那么下面这个提交命令你得改一下,将WorldCount改为demo.spark.WorldCount(好吧,我发现我把Word写成了World,不改了,我都上传到Linux了,重装了一遍环境我现在人都麻了…)

cd spark/bin/

./spark-submit --master yarn-cluster --class WorldCount /opt/word.jar /user/root/words.txt /user/root/word_count
[root@master bin]# hdfs dfs -cat /user/root/word_count/part-00000
(Hello,3)
(Real,1)
(MapReduce,1)
(World,2)
(Our,1)
(BigData,2)
(Great,1)
(Hadoop,3)
持久化

设置存储级别为MEMORY_ONLY,如不持久化RDD,求和求均值都计算data。持久化RDD后,求和计算data,求均值时直接从内存中读取数据。

scala> import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel

scala> val data=sc.parallelize(List(1,2,3,4,5,6)).map(x=>x*x)
data: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at :28

scala> data.persist(StorageLevel.MEMORY_ONLY)
res0: data.type = MapPartitionsRDD[1] at map at :28

scala> println(data.sum)
91.0

scala> println(data.mean)
15.166666666666668

数据分区
package demo.partition

import org.apache.spark.Partitioner
class MyPartition(numParts: Int) extends Partitioner {
  override def numPartitions = numParts //返回想要创建分区个数
  override def getPartition(key: Any) = {
    if (key.toString().toInt % 2 == 0) {
      //如果是偶数存储在0分区,否则存储在1分区
      0
    } else {
      1
    }
  }
  override def equals(other: Any): Boolean = other match {
    //比较自定义的分区器是否与其他RDD分区一样,固定的一种格式
    case mypartition: MyPartition => mypartition.numPartitions == numPartitions
    case _ => false
  }
}
package demo.partition
import org.apache.spark.{SparkConf, SparkContext}
object ToDistribute{
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("test partition")
    val sc=new SparkContext(conf)
    val input=args(0)
    val output=args(1)
    val data=sc.textFile(input).map{x=>val y=x.split(",");(y(0),y(1))}
    val data2=data.partitionBy(new MyPartition(2))
    data2.saveAsTextFile(output)
  }
}

将工程重新编译
提交到Spark

./spark-submit --master yarn-cluster --class demo.partition.ToDistribute /opt/word.jar /user/root/user.txt /user/root/part
[root@master bin]# hdfs dfs -cat /user/root/part/part-00000
(4,David)
(6,Fran)
(2,Bob)
[root@master bin]# hdfs dfs -cat /user/root/part/part-00001
(1,Alice)
(3,Charlie)
(5,Ed)
[root@master bin]# 

RDD默认分区1,通过coalesce重分区个数为5,未生效;将shuffle参数设置为true后,分区个数被重设为5;接着通过repartition将原RDD重设分区为10,RDD数据被分到10个分区中。

scala> val rdd=sc.parallelize(List(1,2,3,4,5,6,7))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at :28

scala> rdd.partitions.size
res3: Int = 1

scala> val rdd1=rdd.coalesce(5)
rdd1: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[6] at coalesce at :30

scala> rdd1.partitions.size
res4: Int = 1

scala> val rdd2=rdd.coalesce(5,true)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[10] at coalesce at :30

scala> rdd2.partitions.size
res5: Int = 5

scala> rdd.repartition(10).partitions.size
res6: Int = 10

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/774371.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号