本文隶属于专栏《大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!
本专栏目录结构和参考文献请见大数据技术体系
开源
spark-examples 代码已开源,本项目致力于提供最具实践性的 Apache Spark 代码开发学习指南。
目录 读写读写 txt 文件
读写 csv/tsv 文件
读写 json 文件
读写 parquet 文件
读写 orc 文件
读写 sequence 文件
读写 object 文件
读写 mysql
读写 redis
正文 读 数据 people.txtMichael, 29 Andy, 30 Justin, 19Spark Core
package com.shockang.study.spark.core.read
import com.shockang.study.spark.READ_DATA_DIR
import com.shockang.study.spark.internal.Logging
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
object UsingCoreReadTxtExample extends Logging {
def main(args: Array[String]): Unit = {
// 关闭 Spark 内部的日志打印,只关注结果日志
Logger.getLogger("org").setLevel(Level.OFF)
// 初始化 SparkConf 对象,设置基本任务参数
val conf = new SparkConf()
// 设置提交任务的目标 Master 机器地址,local 为本地运行,[*]为自动分配任务线程数
.setMaster("local[*]")
// 设置任务名称
.setAppName("UsingCoreReadTxtExample")
// 实例化 SparkContext
val sc = new SparkContext(conf)
// 读取文本文件
val inputTextFile = sc.textFile(READ_DATA_DIR + "people.txt")
// 结果日志输出
logInfo(inputTextFile.collect.mkString(","))
// 读取目录下所有 txt 文本文件
val allTextFile = sc.textFile(READ_DATA_DIR + "*.txt")
// 结果日志输出
logInfo(allTextFile.collect.mkString(","))
// 停止sc,结束该任务
sc.stop()
}
}
22/03/13 23:50:06 INFO core: Michael, 29,Andy, 30,Justin, 19Spark SQL
package com.shockang.study.spark.sql.read
import com.shockang.study.spark.READ_DATA_DIR
import com.shockang.study.spark.internal.Logging
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
object UsingSqlReadTxtExample extends Logging {
def main(args: Array[String]): Unit = {
// 关闭 Spark 内部的日志打印,只关注结果日志
Logger.getLogger("org").setLevel(Level.OFF)
// 使用 Spark 2.0 提供的 SparkSession API 来访问应用程序
val spark = SparkSession.builder().master("local[*]").appName("UsingSqlReadTxtExample").getOrCreate()
// 读取 txt 文件
val inputTextFile = spark.read.textFile(READ_DATA_DIR + "people.txt")
// 结果日志输出
logInfo(inputTextFile.collect.mkString(","))
// 读取目录下所有 txt 文本文件
val allTextFile = spark.read.textFile(READ_DATA_DIR + "*.txt")
// 结果日志输出
logInfo(allTextFile.collect.mkString(","))
// 停止sc,结束该任务
spark.stop()
}
}
22/03/13 23:52:54 INFO sql: Michael, 29,Andy, 30,Justin, 19写 Spark Core
package com.shockang.study.spark.core.write
import com.shockang.study.spark.WRITE_DATA_DIR
import com.shockang.study.spark.internal.Logging
import com.shockang.study.spark.util.Utils.writableLocalFsPath
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
object UsingCoreWriteTxtExample extends Logging {
def main(args: Array[String]): Unit = {
// 关闭 Spark 内部的日志打印,只关注结果日志
Logger.getLogger("org").setLevel(Level.OFF)
// 初始化 SparkConf 对象,设置基本任务参数
val conf = new SparkConf()
// 设置提交任务的目标 Master 机器地址,local 为本地运行,[*]为自动分配任务线程数
.setMaster("local[*]")
// 设置任务名称
.setAppName("UsingCoreWriteTxtExample")
// 实例化 SparkContext
val sc = new SparkContext(conf)
val rddData = sc.parallelize(Array(("one", 1), ("two", 2), ("three", 3)), 10)
val filePath = WRITE_DATA_DIR + "UsingCoreWriteTxtExample"
rddData.saveAsTextFile(writableLocalFsPath(filePath))
sc.stop
}
}
Spark SQL
package com.shockang.study.spark.sql.write
import com.shockang.study.spark.WRITE_DATA_DIR
import com.shockang.study.spark.internal.Logging
import com.shockang.study.spark.util.Utils.writableLocalFsPath
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
object UsingSqlWriteTxtExample extends Logging {
case class Person(name: String, sex: String, age: Int)
def main(args: Array[String]): Unit = {
// 关闭 Spark 内部的日志打印,只关注结果日志
Logger.getLogger("org").setLevel(Level.OFF)
// 使用 Spark 2.0 提供的 SparkSession API 来访问应用程序
val spark = SparkSession.builder().master("local[*]").appName("UsingSqlWriteTxtExample").getOrCreate()
// 注意隐式导入
import spark.implicits._
// 由于 txt 只支持 String 类型的数据,故需要经过转换
val df = spark.createDataframe(List(("one", 1), ("two", 2), ("three", 3))).map(_.mkString("(", ", ", ")"))
val filePath = WRITE_DATA_DIR + "UsingSqlWriteTxtExample"
df.write.text(writableLocalFsPath(filePath))
spark.stop
}
}



