栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark(1)将RDD转换成DataFrame并以CSV和数据库保存数据

Spark(1)将RDD转换成DataFrame并以CSV和数据库保存数据

一、使用隐式转换将RDD转换成Dataframe
//导入隐饰操作,否则RDD无法调用toDF方法 
import spark.implicits._
val df1:Dataframe=rdd.map(x=>{
  x.split(",")
}).map(x=>{
  (x(0),x(1),x(2),x(3),x(4))
}).map(x=>
  Hello(x._1,x._2,x._3,x._4,x._5)
).toDF()
二、以csv格式保存
df2.coalesce(1).write.csv("school.csv")
三、与MySql数据库创建连接并保存在MySql数据库中   1、MySql数据库版本在8.0以上: 

 

val saveMode = SaveMode.Append
val prop=new java.util.Properties
prop.setProperty("driver","com.mysql.cj.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","123456")
df1.write.mode(saveMode)
    .jdbc("jdbc:mysql://localhost:3306/cqgs?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false","college",prop)
2、MySql数据库版本在8.0以下:

修改红色方框内容:

com.mysql.cj.jdbc.Driver修改为:com.mysql.jdbc.Driver

删除useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false

 

val saveMode = SaveMode.Append
val prop=new java.util.Properties
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","123456")
df1.write.mode(saveMode)
    .jdbc("jdbc:mysql://localhost:3306/cqgs","college",prop)
四、 完整代码:
def main(args: Array[String]): Unit = {
      val spark:SparkSession=
        SparkSession.builder().master("local[2]").getOrCreate()
//      设置日志级别为WARN
      spark.sparkContext.setLogLevel("WARN")
//      弹性分布式数据集
      val rdd: RDD[String] = spark.sparkContext.textFile("data/college.txt")
//      用foreach读取每条数据并输出
      rdd.foreach(println)
//      隐式转换:将RDD转换成Dataframe 必须导入隐饰操作(import spark.implicits._),否则RDD无法调用toDF方法 
//      Transformations 算子 map:对每个元素操作
//      将原来 RDD 的每个数据项通过 map 中的用户自定义函数 f 映射转变为一个新的元素
      import spark.implicits._
      val df1:Dataframe=rdd.map(x=>{
        x.split(",")
      }).map(x=>{
        (x(0),x(1),x(2),x(3),x(4))
      }).map(x=>
        Hello(x._1,x._2,x._3,x._4,x._5)
      ).toDF()
//      打印Dataframe元数据信息
      df1.printSchema()
//      打印数据 默认20行 false长数据不缩写
      df1.show(false)
//      createOrReplaceTempView:创建临时视图,此视图的生命周期与用于创建此数据集的[SparkSession]相关联
      df1.createOrReplaceTempView("school")
//        sql查询语句:查询school表中id为1的所有字段
      val df2: Dataframe = spark.sql("select * from school where id = 1")
//      打印sql后的数据 以csv格式
      df2.show()
      df2.coalesce(1).write.csv("school.csv")

//        与MySql数据库建立连接
      val saveMode = SaveMode.Append
      val prop=new java.util.Properties
      prop.setProperty("driver","com.mysql.cj.jdbc.Driver")
      prop.setProperty("user","root")
      prop.setProperty("password","123456")
      df1.write.mode(saveMode)
          .jdbc("jdbc:mysql://localhost:3306/cqgs?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false","college",prop)
      spark.stop()
    }

    case class Hello(id: String, name:String,addr:String,email:String,year:String)
五、 设置减少输出日志 1、设置前:

 2、设置后:

 3、设置操作:

 在resources下添加一个.properties文件 添加以下内容:

log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Set the default spark-shell log level to ERROR. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=ERROR

# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=ERROR
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/775466.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号