栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

用spark写WordCount(本地运行,提交到yarn运行)

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

用spark写WordCount(本地运行,提交到yarn运行)

文章目录
      • idea中本地运行(local)
      • 提交到虚拟机集群去运行(yarn)

idea中本地运行(local)

本地idea中运行要导入spark,scala依赖

  		
            org.scala-lang
            scala-library
            2.11.12
  		

        
            org.scala-lang
            scala-compiler
            2.11.12
        

        
            org.scala-lang
            scala-reflect
            2.11.12
        
		
            org.apache.spark
            spark-core_2.11
            2.4.5
        

还要设置一个hadoop相关的东西,要不然会报和Hadoop相关的错
随便你在哪个盘创建一个hadoop目录,hadoop目录下在创建一个bin目录
然后下载这个winutils.exe东西
百度网盘链接
提取码:2777
下载完把winutils.exe放到/hadoop/bin/目录下
然后去配置环境变量

然后重启电脑,不然会不成功

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo1WordCount {
  def main(args: Array[String]): Unit = {

    // Spark配置文件对象
    val conf: SparkConf = new SparkConf()
    // 设置Spark程序的名字
    conf.setAppName("Demo1WordCount")
    // 设置运行模式为local模式 即在idea本地运行
    // local : 一个并行度
    // local[2] : 两个并行度
    // local[*] : 有多少用多少
    conf.setMaster("local[2]")

    // Spark的上下文环境,相当于Spark的入口
    val sc: SparkContext = new SparkContext(conf)

    // 词频统计
    // 1、读取文件
    
    val linesRDD: RDD[String] = sc.textFile("spark/data/words",4)

    // 2、将每一行的单词切分出来
    // flatMap: 在Spark中称为 算子
    // 算子一般情况下都会返回另外一个新的RDD
    val wordsRDD: RDD[String] = linesRDD.flatMap(line => line.split(","))

    // 3、按照单词分组
    val groupRDD: RDD[(String, Iterable[String])] = wordsRDD.groupBy(word => word)

    // 4、统计每个单词的数量
    val countRDD: RDD[String] = groupRDD.map(kv => {
      val word: String = kv._1
      val words: Iterable[String] = kv._2
      // words.size直接获取迭代器的大小
      // 因为相同分组的所有的单词都会到迭代器中
      // 所以迭代器的大小就是单词的数量
      word + "," + words.size
    })

    // 5、将结果进行保存
    countRDD.saveAsTextFile("spark/data/wordCount")


//  2,3,4步也可以用reduceBykey写成下面格式,就不拆开写了直接链式调用
//val wordsRDD = linesRDD
//      .flatMap(line=>line.split(","))
//      .map(word=>(word,1))
//      .reduceByKey(_+_)
//  }

}
提交到虚拟机集群去运行(yarn)

提交到yarn运行有两种:
1.yarn-client 只适用于学习测试时使用,开发不用,因为会打印一堆日志
2.yarn-cluster 一般用于开发使用

object Demo2WordCountSubmit {
  
  def main(args: Array[String]): Unit = {

    // Spark配置文件对象
    val conf: SparkConf = new SparkConf()
    // 设置Spark程序的名字
    conf.setAppName("Demo2WordCountSubmit")


    // Spark的上下文环境,相当于Spark的入口
    val sc: SparkContext = new SparkContext(conf)

    // 词频统计
    // 1、读取文件
    
    val linesRDD: RDD[String] = sc.textFile("/spark/data/words")

    // 2、将每一行的单词切分出来
    // flatMap: 在Spark中称为 算子
    // 算子一般情况下都会返回另外一个新的RDD
    val wordsRDD: RDD[String] = linesRDD.flatMap(line => line.split(","))


    // 3、按照单词分组
    val groupRDD: RDD[(String, Iterable[String])] = wordsRDD.groupBy(word => word)

    // 4、统计每个单词的数量
    val countRDD: RDD[String] = groupRDD.map
    (kv => {
      val word: String = kv._1
      val words: Iterable[String] = kv._2
      // words.size直接获取迭代器的大小
      // 因为相同分组的所有的单词都会到迭代器中
      // 所以迭代器的大小就是单词的数量
      word + "," + words.size
    })
    //下面的注释内容可以代替groupRDD.map后内容即
//  val countRDD: RDD[String] = groupRDD.map
//    {
//      case(word:String, words)=>{
//        word+":"+words.size
//      }
//    }


    // 使用HDFS的JAVA API判断输出路径是否已经存在,存在即删除
    val hdfsConf: Configuration = new Configuration()
    hdfsConf.set("fs.defaultFS", "hdfs://master:9000")
    val fs: FileSystem = FileSystem.get(hdfsConf)
    // 判断输出路径是否存在
    if (fs.exists(new Path("/spark/data/wordCount"))) {
      fs.delete(new Path("/spark/data/wordCount"), true)
    }

    // 5、将结果进行保存
    countRDD.saveAsTextFile("/spark/data/wordCount")

  }

}

集群的standalone模式去公司一般用不到,公司有Hadoop集群了就会用yarn模式,没必要去部署两套集群

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/460771.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号