栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

大数据之Spark 快速上手 概述 完整使用(第二章)

大数据之Spark 快速上手 概述 完整使用(第二章)

大数据之Spark快速上手概述

一、实操

1、增加 Scala 插件2、增加依赖关系3、WordCount4、异常处理

一、实操 1、增加 Scala 插件

2、增加依赖关系

 
 org.apache.spark
 spark-core_2.12
 3.0.0
 


 
 
 
 net.alchim31.maven
 scala-maven-plugin
 3.2.2
 
 
 
 
 testCompile
 
 
 
 
 
 org.apache.maven.plugins
 maven-assembly-plugin
 3.1.0
 
 
 jar-with-dependencies
 
 
 
 
 make-assembly
 package
 
 single
 
 
 
 
 

3、WordCount

为了能直观地感受 Spark 框架的效果,接下来我们实现一个大数据学科中最常见的教学
案例 WordCount

package com.spack.bigdata.core.wc

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object Spark02_WordCount {
  def main(args: Array[String]): Unit = {

    //Application
    //Spark框架

    //JDBC:Connection

    //建立和Spark框架的链接
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("WordCount")
    val sc = new SparkContext(conf)


    //TODO 执行业务操作

    //1、读取文件,获取一行一行的数据
    //hello word
    val lines: RDD[String] = sc.textFile("datas")
    println(lines)

    //2、将一行数据进行拆分,形成一个一个单词(分词)
    // hello world =>hello,word, hello,word
    val words: RDD[String] = lines.flatMap(_.split(" "))

    //3、将数据根据单词进行分组、便于统计
    //  (hello,hello,hello),(world, world)
    val wordGroup: RDD[(String, Iterable[String])] = words.groupBy(word => word)

    //4、对分组后的数据进行转换
    //(hello,hello,hello),(word,word)
    //(hello,3),(word,2)
    val wordToCount = wordGroup.map {
      // word 是 单词(list[]) 格式
      case (word, list) => {
        (word, list.size)
      }
    }

    val array: Array[(String, Int)] = wordToCount.collect()
    array.foreach(println)

    //TODO 关闭连接
    sc.stop()


  }
}

package com.spack.bigdata.core.wc

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object Spark03_WordCount {
  def main(args: Array[String]): Unit = {

    //Application
    //Spark框架

    //JDBC:Connection

    //建立和Spark框架的链接
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("WordCount")
    val sc = new SparkContext(conf)


    //TODO 执行业务操作

    //1、读取文件,获取一行一行的数据
    val lines: RDD[String] = sc.textFile("datas")
    println(lines)

    //2、将一行数据进行拆分,形成一个一个单词(分词)
    // hello world =>hello,word, hello,word
    val words: RDD[String] = lines.flatMap(_.split(" "))

    val wordToOne = words.map(
      word => (word, 1)
    )

    //3、将数据根据单词进行分组、便于统计
    //  (hello,hello,hello),(world, world)
    val wordGroup: RDD[(String, Iterable[(String, Int)])] = wordToOne.groupBy(
      t => t._1
    )


    //4、对分组后的数据进行转换
    val wordToCount = wordGroup.map {
      // word 是 单词(list[]) 格式
      case (word, list) => {
        list.reduce(
          (t1, t2) => {
            (t1._1, t1._2 + t2._2)
          }
        )


      }
    }
    val array: Array[(String, Int)] = wordToCount.collect()
    array.foreach(println)

    //TODO 关闭连接
    sc.stop()
  }

}

package com.spack.bigdata.core.wc

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object Spark04_WordCount {
  def main(args: Array[String]): Unit = {

    //Application
    //Spark框架

    //JDBC:Connection

    //建立和Spark框架的链接
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("WordCount")
    val sc = new SparkContext(conf)


    //TODO 执行业务操作

    //1、读取文件,获取一行一行的数据
    val lines: RDD[String] = sc.textFile("datas")
    println(lines)

    //2、将一行数据进行拆分,形成一个一个单词(分词)
    // hello world =>hello,word, hello,word
    val words: RDD[String] = lines.flatMap(_.split(" "))

    val wordToOne = words.map(
      word => (word, 1)
    )


    //分组聚合Sprak使用一个方法实现
    //reduceByKey:相同的key的数据、可以对value进行reduce聚合
    //wordToOne.reduceByKey((x,y)=>{x+y})
    //wordToOne.reduceByKey((x,y)=>x+y)
    val wordToCount = wordToOne.reduceByKey(_ + _)

    val array: Array[(String, Int)] = wordToCount.collect()
    array.foreach(println)


    //TODO 关闭连接
    sc.stop()
  }

}

执行过程中,会产生大量的执行日志,如果为了能够更好的查看程序的执行结果,可以在项
目的 resources 目录中创建 log4j.properties 文件,并添加日志配置信息:

log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd 
HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to ERROR. When running the spark-shell, 
the
# log level for this class is used to overwrite the root logger's log level, so 
that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=ERROR
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=ERROR
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent 
UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
4、异常处理

如果本机操作系统是 Windows,在程序中使用了 Hadoop 相关的东西,比如写入文件到
HDFS,则会遇到如下异常:

出现这个问题的原因,并不是程序的错误,而是 windows 系统用到了 hadoop 相关的服
务,解决办法是通过配置关联到 windows 的系统依赖就可以了


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/761620.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号