栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark-examples 之 读写 txt 文件

spark-examples 之 读写 txt 文件

前言

本文隶属于专栏《大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见大数据技术体系


开源

spark-examples 代码已开源,本项目致力于提供最具实践性的 Apache Spark 代码开发学习指南。

目录 读写

读写 txt 文件

读写 csv/tsv 文件

读写 json 文件

读写 parquet 文件

读写 orc 文件

读写 sequence 文件

读写 object 文件

读写 mysql

读写 redis

正文 读 数据 people.txt
Michael, 29
Andy, 30
Justin, 19
Spark Core
package com.shockang.study.spark.core.read

import com.shockang.study.spark.READ_DATA_DIR
import com.shockang.study.spark.internal.Logging
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}


object UsingCoreReadTxtExample extends Logging {
  def main(args: Array[String]): Unit = {
    // 关闭 Spark 内部的日志打印,只关注结果日志
    Logger.getLogger("org").setLevel(Level.OFF)
    // 初始化 SparkConf 对象,设置基本任务参数
    val conf = new SparkConf()
      // 设置提交任务的目标 Master 机器地址,local 为本地运行,[*]为自动分配任务线程数
      .setMaster("local[*]")
      // 设置任务名称
      .setAppName("UsingCoreReadTxtExample")
    // 实例化 SparkContext
    val sc = new SparkContext(conf)

    // 读取文本文件
    val inputTextFile = sc.textFile(READ_DATA_DIR + "people.txt")
    // 结果日志输出
    logInfo(inputTextFile.collect.mkString(","))

    // 读取目录下所有 txt 文本文件
    val allTextFile = sc.textFile(READ_DATA_DIR + "*.txt")
    // 结果日志输出
    logInfo(allTextFile.collect.mkString(","))

    // 停止sc,结束该任务
    sc.stop()
  }
}
22/03/13 23:50:06 INFO core: Michael, 29,Andy, 30,Justin, 19
Spark SQL
package com.shockang.study.spark.sql.read

import com.shockang.study.spark.READ_DATA_DIR
import com.shockang.study.spark.internal.Logging
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession


object UsingSqlReadTxtExample extends Logging {
  def main(args: Array[String]): Unit = {
    // 关闭 Spark 内部的日志打印,只关注结果日志
    Logger.getLogger("org").setLevel(Level.OFF)
    // 使用 Spark 2.0 提供的 SparkSession API 来访问应用程序
    val spark = SparkSession.builder().master("local[*]").appName("UsingSqlReadTxtExample").getOrCreate()

    // 读取 txt 文件
    val inputTextFile = spark.read.textFile(READ_DATA_DIR + "people.txt")

    // 结果日志输出
    logInfo(inputTextFile.collect.mkString(","))

    // 读取目录下所有 txt 文本文件
    val allTextFile = spark.read.textFile(READ_DATA_DIR + "*.txt")

    // 结果日志输出
    logInfo(allTextFile.collect.mkString(","))

    // 停止sc,结束该任务
    spark.stop()
  }
}
22/03/13 23:52:54 INFO sql: Michael, 29,Andy, 30,Justin, 19
写 Spark Core
package com.shockang.study.spark.core.write

import com.shockang.study.spark.WRITE_DATA_DIR
import com.shockang.study.spark.internal.Logging
import com.shockang.study.spark.util.Utils.writableLocalFsPath
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}


object UsingCoreWriteTxtExample extends Logging {
  def main(args: Array[String]): Unit = {
    // 关闭 Spark 内部的日志打印,只关注结果日志
    Logger.getLogger("org").setLevel(Level.OFF)
    // 初始化 SparkConf 对象,设置基本任务参数
    val conf = new SparkConf()
      // 设置提交任务的目标 Master 机器地址,local 为本地运行,[*]为自动分配任务线程数
      .setMaster("local[*]")
      // 设置任务名称
      .setAppName("UsingCoreWriteTxtExample")
    // 实例化 SparkContext
    val sc = new SparkContext(conf)

    val rddData = sc.parallelize(Array(("one", 1), ("two", 2), ("three", 3)), 10)
    val filePath = WRITE_DATA_DIR + "UsingCoreWriteTxtExample"
    rddData.saveAsTextFile(writableLocalFsPath(filePath))

    sc.stop
  }
}
Spark SQL
package com.shockang.study.spark.sql.write

import com.shockang.study.spark.WRITE_DATA_DIR
import com.shockang.study.spark.internal.Logging
import com.shockang.study.spark.util.Utils.writableLocalFsPath
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession


object UsingSqlWriteTxtExample extends Logging {

  case class Person(name: String, sex: String, age: Int)

  def main(args: Array[String]): Unit = {
    // 关闭 Spark 内部的日志打印,只关注结果日志
    Logger.getLogger("org").setLevel(Level.OFF)
    // 使用 Spark 2.0 提供的 SparkSession API 来访问应用程序
    val spark = SparkSession.builder().master("local[*]").appName("UsingSqlWriteTxtExample").getOrCreate()

    // 注意隐式导入
    import spark.implicits._

    // 由于 txt 只支持 String 类型的数据,故需要经过转换
    val df = spark.createDataframe(List(("one", 1), ("two", 2), ("three", 3))).map(_.mkString("(", ", ", ")"))

    val filePath = WRITE_DATA_DIR + "UsingSqlWriteTxtExample"

    df.write.text(writableLocalFsPath(filePath))

    spark.stop
  }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/762231.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号