//导入隐饰操作,否则RDD无法调用toDF方法
import spark.implicits._
val df1:Dataframe=rdd.map(x=>{
x.split(",")
}).map(x=>{
(x(0),x(1),x(2),x(3),x(4))
}).map(x=>
Hello(x._1,x._2,x._3,x._4,x._5)
).toDF()
二、以csv格式保存
df2.coalesce(1).write.csv("school.csv")
三、与MySql数据库创建连接并保存在MySql数据库中
1、MySql数据库版本在8.0以上:
2、MySql数据库版本在8.0以下: 修改红色方框内容:
val saveMode = SaveMode.Append val prop=new java.util.Properties prop.setProperty("driver","com.mysql.cj.jdbc.Driver") prop.setProperty("user","root") prop.setProperty("password","123456") df1.write.mode(saveMode) .jdbc("jdbc:mysql://localhost:3306/cqgs?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false","college",prop)
com.mysql.cj.jdbc.Driver修改为:com.mysql.jdbc.Driver
删除useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false
四、 完整代码:
val saveMode = SaveMode.Append val prop=new java.util.Properties prop.setProperty("driver","com.mysql.jdbc.Driver") prop.setProperty("user","root") prop.setProperty("password","123456") df1.write.mode(saveMode) .jdbc("jdbc:mysql://localhost:3306/cqgs","college",prop)
def main(args: Array[String]): Unit = {
val spark:SparkSession=
SparkSession.builder().master("local[2]").getOrCreate()
// 设置日志级别为WARN
spark.sparkContext.setLogLevel("WARN")
// 弹性分布式数据集
val rdd: RDD[String] = spark.sparkContext.textFile("data/college.txt")
// 用foreach读取每条数据并输出
rdd.foreach(println)
// 隐式转换:将RDD转换成Dataframe 必须导入隐饰操作(import spark.implicits._),否则RDD无法调用toDF方法
// Transformations 算子 map:对每个元素操作
// 将原来 RDD 的每个数据项通过 map 中的用户自定义函数 f 映射转变为一个新的元素
import spark.implicits._
val df1:Dataframe=rdd.map(x=>{
x.split(",")
}).map(x=>{
(x(0),x(1),x(2),x(3),x(4))
}).map(x=>
Hello(x._1,x._2,x._3,x._4,x._5)
).toDF()
// 打印Dataframe元数据信息
df1.printSchema()
// 打印数据 默认20行 false长数据不缩写
df1.show(false)
// createOrReplaceTempView:创建临时视图,此视图的生命周期与用于创建此数据集的[SparkSession]相关联
df1.createOrReplaceTempView("school")
// sql查询语句:查询school表中id为1的所有字段
val df2: Dataframe = spark.sql("select * from school where id = 1")
// 打印sql后的数据 以csv格式
df2.show()
df2.coalesce(1).write.csv("school.csv")
// 与MySql数据库建立连接
val saveMode = SaveMode.Append
val prop=new java.util.Properties
prop.setProperty("driver","com.mysql.cj.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","123456")
df1.write.mode(saveMode)
.jdbc("jdbc:mysql://localhost:3306/cqgs?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false","college",prop)
spark.stop()
}
case class Hello(id: String, name:String,addr:String,email:String,year:String)
五、 设置减少输出日志
1、设置前:
2、设置后:
3、设置操作:
在resources下添加一个.properties文件 添加以下内容:
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to ERROR. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=ERROR
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=ERROR
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR



