栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark SQL编程

Spark SQL编程

1、Spark SQL基本操作 (一)创建Dataframe

(二)查询所有数据   (三)查询所有数据并去除重复的数据   (四)查询所有数据打印时去除id字段   (五)筛选出age>30的记录   (六)将数据按age分组   (七)将数据按name升序排序 

(八)取出前3行数据   (九)查询所有记录的name列,并为其取别名为username   (十)查询年龄age的平均值   (十一)查询年龄age的最小值 

2、编程实现将RDD转换为Dataframe (一) 代码编写
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val peopleRDD = spark.sparkContext.textFile("file:///root/wyqWork/Demo6/employee.txt")

val schemaString = "id name age"

val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))

val schema = StructType(fields)

val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim, attributes(2).trim))

val peopleDF = spark.createDataframe(rowRDD, schema)

peopleDF.createOrReplaceTempView("people")

val results = spark.sql("SELECT id,name,age FROM people")

results.map(attributes => "id: " + attributes(0)+","+"name:"+attributes(1)+","+"age:"+attributes(2)).show()

 (二)结果输出

3、利用Dataframe读写MySQL的数据 (一)新建数据库sparktest,再创建表employee,并输入数据  

 

(二)查询当前表中所有数据   4、配置Sp a r k通过JDBC连接数据库MySQL,编程实现利用Da t aFr a m e插入如表6 -2所示的两行数据到MySQL中,最后打印出a g e的最大值和a g e的总和。 (一)代码编写
import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
val spark=SparkSession.builder().appName("TestMySQL").master("local").getOrCreate()
import  spark.implicits._
val employeeRDD=spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" "))
val schema=StructType(List(StructField("id",IntegerType,
      true),StructField("name",StringType,true),StructField("gender",StringType,true),
      StructField("age",IntegerType,true)))
val rowRDD=employeeRDD.map(p=>Row(p(0).toInt,p(1).trim,p(2).trim,p(3).toInt))
val employeeDF=spark.createDataframe(rowRDD,schema)
val prop=new Properties()
prop.put("user","root")
prop.put("password","10086CYc#")
prop.put("driver","com.mysql.cj.jdbc.Driver")
employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest","sparktest.employee",prop)
val jdbcDF = spark.read.format("jdbc").
option("url","jdbc:mysql://localhost:3306/sparktest").
option("driver","com.mysql.cj.jdbc.Driver").
option("dbtable", "employee").
option("user", "root").
option("password", "10086CYc#").
load()
jdbcDF.agg("age" -> "max", "age" -> "sum").show()
   (二)结果输出

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/585037.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号