一. Spark SQL提供的操作数据的方式
SQL
Dataframe API
Dataset API
一个用于处理结构化数据的Spark组件,强调的是“结构化数据”,而非“SQL”
二. Spark SQL不仅仅是SQL这么简单的事情,它还能做更多的事情
Hive: SQL
Spark SQL: SQL
三. Spark SQL架构以及处理流程
Sql -->抽象语法树-->逻辑执行计划 -->优化过后的逻辑执行计划-->物理执行计划-->优化后的物理执行计划-->最终执行引擎上面执行
Catalyst做了很多的优化工作
四. Dataframe/Dataset详解
Python无法使用Dataset,只可以使用Dataframe, Dataframe也是个分布式的集合,组织成带名字的列
Create an RDD of tuples or lists from the original RDD;
Create the schema represented by a StructType matching the structure of tuples or lists in the RDD created in the step 1.
Apply the schema to the RDD via createDataframe method provided by SparkSession.
Spark中RDD、Dataframe和DataSet的区别 - LestatZ - 博客园
五. 范例
1. 读取文件使用SQL
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import *
def basic(spark):
df = spark.read.json('file:///home/tarena/桌面/spark/spark_sql/1.json')
df.show()
# 显示数据类型
df.printSchema()
# 选择所要显示的列
df.select('staic_total_route').show()
df.select(df['staic_total_route'], df['staic_total_route'] + 1).show()
df.filter(df['total_total_route'] > 21).show()
df.groupBy("host_name").count().show()
# 注册临时表
df.createOrReplaceTempView('origin')
sqlDF = spark.sql('select * from origin')
sqlDF.show()
def schema_inference_example(spark):
# Inferring the Schema Using Reflection
sc = spark.sparkContext
path = r'file:///home/tarena/桌面/spark/txt/age.txt'
lines = sc.textFile(path)
parts = lines.map(lambda l: l.split(" "))
# print(parts.collect())
people = parts.map(lambda p: Row(name=p[0], age=p[1]))
# Infer the schema, and register the Dataframe as a table.
schemaPeople = spark.createDataframe(people)
schemaPeople.createOrReplaceTempView("people")
sqlDF = spark.sql('select * from people')
sqlDF.show()
# sql之后转换成rdd进行操作
sqlRdd = sqlDF.rdd.map(lambda name:'Name: ' + name.name)
for name in sqlRdd.collect():
print(name)
def programmatically_schema_example(spark):
'''
Create an RDD of tuples or lists from the original RDD;
Create the schema represented by a StructType matching the structure of tuples or lists in the RDD created in the step 1.
Apply the schema to the RDD via createDataframe method provided by SparkSession.
'''
sc = spark.sparkContext
# Load a text file and convert each line to a Row.
path = r'file:///home/tarena/桌面/spark/txt/age.txt'
lines = sc.textFile(path)
parts = lines.map(lambda l: l.split(" "))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))
# The schema is encoded in a string.
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(' ')]
schema = StructType(fields)
# Apply the schema to the RDD.
schemaPeople = spark.createDataframe(people, schema)
# Creates a temporary view using the Dataframe
schemaPeople.createOrReplaceTempView("people")
# SQL can be run over Dataframes that have been registered as a table.
results = spark.sql("SELECT * FROM people")
results.show()
if __name__ == '__main__':
spark = SparkSession.builder.appName('spark_sql').getOrCreate()
# sql = SparkSession.builder.appName('spark_sql').enableHiveSupport().getOrCreate()
# basic(spark)
# schema_inference_example(spark)
programmatically_schema_example(spark)
spark.stop()
2. spark连接sql
from pyspark import SparkContext
from pyspark.sql import SQLContext
if __name__ == "__main__":
sc = SparkContext(appName="mysqltest")
sqlContext = SQLContext(sc)
df = sqlContext.read.format("jdbc").options(
url="jdbc:mysql://192.168.137.1:3306/employees?user=root&password=123456", dbtable="ztp").load()
df.show()
sc.stop()
SQL
Dataframe API
Dataset API
一个用于处理结构化数据的Spark组件,强调的是“结构化数据”,而非“SQL”
Hive: SQL
Spark SQL: SQL
三. Spark SQL架构以及处理流程
Sql -->抽象语法树-->逻辑执行计划 -->优化过后的逻辑执行计划-->物理执行计划-->优化后的物理执行计划-->最终执行引擎上面执行
Catalyst做了很多的优化工作
四. Dataframe/Dataset详解
Python无法使用Dataset,只可以使用Dataframe, Dataframe也是个分布式的集合,组织成带名字的列
Create an RDD of tuples or lists from the original RDD;
Create the schema represented by a StructType matching the structure of tuples or lists in the RDD created in the step 1.
Apply the schema to the RDD via createDataframe method provided by SparkSession.
Spark中RDD、Dataframe和DataSet的区别 - LestatZ - 博客园
五. 范例
1. 读取文件使用SQL
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import *
def basic(spark):
df = spark.read.json('file:///home/tarena/桌面/spark/spark_sql/1.json')
df.show()
# 显示数据类型
df.printSchema()
# 选择所要显示的列
df.select('staic_total_route').show()
df.select(df['staic_total_route'], df['staic_total_route'] + 1).show()
df.filter(df['total_total_route'] > 21).show()
df.groupBy("host_name").count().show()
# 注册临时表
df.createOrReplaceTempView('origin')
sqlDF = spark.sql('select * from origin')
sqlDF.show()
def schema_inference_example(spark):
# Inferring the Schema Using Reflection
sc = spark.sparkContext
path = r'file:///home/tarena/桌面/spark/txt/age.txt'
lines = sc.textFile(path)
parts = lines.map(lambda l: l.split(" "))
# print(parts.collect())
people = parts.map(lambda p: Row(name=p[0], age=p[1]))
# Infer the schema, and register the Dataframe as a table.
schemaPeople = spark.createDataframe(people)
schemaPeople.createOrReplaceTempView("people")
sqlDF = spark.sql('select * from people')
sqlDF.show()
# sql之后转换成rdd进行操作
sqlRdd = sqlDF.rdd.map(lambda name:'Name: ' + name.name)
for name in sqlRdd.collect():
print(name)
def programmatically_schema_example(spark):
'''
Create an RDD of tuples or lists from the original RDD;
Create the schema represented by a StructType matching the structure of tuples or lists in the RDD created in the step 1.
Apply the schema to the RDD via createDataframe method provided by SparkSession.
'''
sc = spark.sparkContext
# Load a text file and convert each line to a Row.
path = r'file:///home/tarena/桌面/spark/txt/age.txt'
lines = sc.textFile(path)
parts = lines.map(lambda l: l.split(" "))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))
# The schema is encoded in a string.
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(' ')]
schema = StructType(fields)
# Apply the schema to the RDD.
schemaPeople = spark.createDataframe(people, schema)
# Creates a temporary view using the Dataframe
schemaPeople.createOrReplaceTempView("people")
# SQL can be run over Dataframes that have been registered as a table.
results = spark.sql("SELECT * FROM people")
results.show()
if __name__ == '__main__':
spark = SparkSession.builder.appName('spark_sql').getOrCreate()
# sql = SparkSession.builder.appName('spark_sql').enableHiveSupport().getOrCreate()
# basic(spark)
# schema_inference_example(spark)
programmatically_schema_example(spark)
spark.stop()
2. spark连接sql
from pyspark import SparkContext
from pyspark.sql import SQLContext
if __name__ == "__main__":
sc = SparkContext(appName="mysqltest")
sqlContext = SQLContext(sc)
df = sqlContext.read.format("jdbc").options(
url="jdbc:mysql://192.168.137.1:3306/employees?user=root&password=123456", dbtable="ztp").load()
df.show()
sc.stop()
Sql -->抽象语法树-->逻辑执行计划 -->优化过后的逻辑执行计划-->物理执行计划-->优化后的物理执行计划-->最终执行引擎上面执行
Catalyst做了很多的优化工作
Python无法使用Dataset,只可以使用Dataframe, Dataframe也是个分布式的集合,组织成带名字的列
Create an RDD of tuples or lists from the original RDD;
Create the schema represented by a StructType matching the structure of tuples or lists in the RDD created in the step 1.
Apply the schema to the RDD via createDataframe method provided by SparkSession.
Spark中RDD、Dataframe和DataSet的区别 - LestatZ - 博客园



