栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【spark2.x】如何通过SparkSQL读取csv文件

【spark2.x】如何通过SparkSQL读取csv文件

package cn.itcast.spark.source

import java.util.Properties

import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StructType}
import org.apache.spark.sql.{Dataframe, SparkSession}

object _03SparkSQLSourceTest {
	
	def main(args: Array[String]): Unit = {
		// 构建SparkSession实例对象
		val spark: SparkSession = SparkSession.builder()
			.master("local[4]")
			.appName(this.getClass.getSimpleName.stripSuffix("$"))
			.config("spark.sql.shuffle.partitions", "4")
			.getOrCreate()
		import spark.implicits._
		
		// TODO: 1. CSV 格式数据文本文件数据 -> 依据 CSV文件首行是否是列名称,决定读取数据方式不一样的
		
		// 方式一:首行是列名称,数据文件u.dat
		val dataframe: Dataframe = spark.read
			.format("csv")
			.option("sep", "\t")
			.option("header", "true")
			.option("inferSchema", "true")
			.load("datas/ml-100k/u.dat")
		dataframe.printSchema()
		dataframe.show(10, truncate = false)

		// 方式二:首行不是列名,需要自定义Schema信息,数据文件u.data
		// 自定义schema信息
		val schema: StructType = new StructType()
			.add("user_id", IntegerType, nullable = true)
			.add("iter_id", IntegerType, nullable = true)
			.add("rating", DoubleType, nullable = true)
			.add("timestamp", LongType, nullable = true)
		val df: Dataframe = spark.read
			.format("csv")
			.schema(schema)
			.option("sep", "\t")
			.load("datas/ml-100k/u.data")
		df.printSchema()
		df.show(10, truncate = false)
		
		
		// TODO: 2. 读取MySQL表中数据
		// 第一、简洁版格式
		
		val props =  new Properties()
		props.put("user", "root")
		props.put("password", "123456")
		props.put("driver", "com.mysql.cj.jdbc.Driver")
		val empDF: Dataframe = spark.read.jdbc(
			"jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true", //
			"db_test.emp", //
			props //
		)
		println(s"Partition Number = ${empDF.rdd.getNumPartitions}")
		empDF.printSchema()
		empDF.show(10, truncate = false)
		
		
		// 第二、标准格式写
		
		val table: String = "(select ename,deptname,sal from db_test.emp e join db_test.dept d on e.deptno = d.deptno) AS tmp"
		val joinDF: Dataframe = spark.read
			.format("jdbc")
			.option("url", "jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true")
			.option("driver", "com.mysql.cj.jdbc.Driver")
			.option("user", "root")
			.option("password", "123456")
			.option("dbtable", table)
			.load()
		joinDF.printSchema()
		joinDF.show(10, truncate = false)
		
		
		// 应用结束,关闭资源
		spark.stop()
	}
	
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/600463.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号