栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark DataFrame操作

spark DataFrame操作

先创建测试数据:

开启zookeeper、hadoop和spark集群

在hdfs中创建spark文件夹,再将本地文件上传过去

[root@hadoop01 data]# hdfs dfs -mkdir /spark
[root@hadoop01 data]# hdfs dfs -put /export/data/person.txt /spark

在读取时会出现两个bug,分别是:
Failed to get database global_temp,returning NoSuchObjectException.
Error creating transactional connection factory.

解决如下:

1、需要将hive中conf目录的配置文件hive-site.xml传到spark的conf目录中
2、mysql作为元数据数据库,需要在spark-shell启动时添加驱动,–jars包


spark-shell启动方式:

Spark读取数据源的方式进行创建Dataframe

scala> val personDF = spark.read.text("/spark/person.txt")
personDF: org.apache.spark.sql.Dataframe = [value: string]

scala> personDF.printSchema()
root
 |-- value: string (nullable = true)

Dataframe的show()方法可以查看当前Dataframe的结果数据

scala> personDF.show()
+-------------+                                                                 
|        value|
+-------------+
|1 zhangsan 20|
|    2 lisi 29|
|  3 wangwu 25|
| 4 zhaoliu 30|
|  5 tianqi 35|
|   6 jerry 40|
+-------------+

RDD的toDF()方法,可以将RDD转换为Dataframe对象

scala> val lineRDD = sc.textFile("/spark/person.txt").map(_.split(" "))
lineRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[6] at map at :24

scala> case class Person(id:Int,name:String,age:Int)
defined class Person


scala> val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
personRDD: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[7] at map at :27

scala> val personDF = personRDD.toDF()
personDF: org.apache.spark.sql.Dataframe = [id: int, name: string ... 1 more field]

scala> personDF.show
+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|zhangsan| 20|
|  2|    lisi| 29|
|  3|  wangwu| 25|
|  4| zhaoliu| 30|
|  5|  tianqi| 35|
|  6|   jerry| 40|
+---+--------+---+


scala> personDF.printSchema
root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)

查看personDF对象的name字段数据

scala> personDF.select(personDF.col("name")).show()
+--------+
|    name|
+--------+
|zhangsan|
|    lisi|
|  wangwu|
| zhaoliu|
|  tianqi|
|   jerry|
+--------+

select()操作可以实现对列名进行重命名

scala> personDF.select(personDF("name").as("username"),personDF("age")).show()
+--------+---+
|username|age|
+--------+---+
|zhangsan| 20|
|    lisi| 29|
|  wangwu| 25|
| zhaoliu| 30|
|  tianqi| 35|
|   jerry| 40|
+--------+---+

过滤age大于等于25的数据

scala> personDF.filter(personDF("age") >= 25).show()
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  2|   lisi| 29|
|  3| wangwu| 25|
|  4|zhaoliu| 30|
|  5| tianqi| 35|
|  6|  jerry| 40|
+---+-------+---+

按年龄进行分组并统计相同年龄的人数

scala> personDF.groupBy("age").count().show()
+---+-----+                                                                     
|age|count|
+---+-----+
| 20|    1|
| 40|    1|
| 35|    1|
| 25|    1|
| 29|    1|
| 30|    1|
+---+-----+

按年龄降序排列

scala> personDF.sort(personDF("age").desc).show()
+---+--------+---+
| id|    name|age|
+---+--------+---+
|  6|   jerry| 40|
|  5|  tianqi| 35|
|  4| zhaoliu| 30|
|  2|    lisi| 29|
|  3|  wangwu| 25|
|  1|zhangsan| 20|
+---+--------+---+

使用SQL风格操作的前提是需要将Dataframe注册成一个临时表

scala> personDF.registerTempTable("t_person")
warning: there was one deprecation warning; re-run with -deprecation for details

查询年龄最大的前两名人的信息

scala> spark.sql("select * from t_person order by age desc limit 2").show()
+---+------+---+
| id|  name|age|
+---+------+---+
|  6| jerry| 40|
|  5|tianqi| 35|
+---+------+---+

查询年龄大于25的人的信息

scala> spark.sql("select * from t_person where age > 25").show()
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  2|   lisi| 29|
|  4|zhaoliu| 30|
|  5| tianqi| 35|
|  6|  jerry| 40|
+---+-------+---+
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/728364.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号