栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark[1]:基本概念与python接口使用

Spark[1]:基本概念与python接口使用

一、RDD

RDD 是 Resilient Distributes Datasets 的缩写。

RDD 基于cluster中node个数进行partition

1. RDD Transformations

从当前RDD创建一个新的RDD懒加载:the results are only computed when evaluated by actions

比如map()就是一个transformation,从一个RDD根据对应函数生成另外一个RDD

2. RDD Actions

Actions return a value to driver program after running a computation.

比如reduce()就是一个action,用于aggregates all RDD elements

3. DAG

DAG的全称:Directed Acyclic Graph

Spark依赖DAGS确保fault tolerance,当一个节点坏掉,Spark复制DAG重新回复node

二、基础操作
    创建SparkContext与SparkSession创建RDDDataframes 和 SparkSQL的使用

预备工作

# Installing required packages
!pip install pyspark
!pip install findspark

import findspark
findspark.init()

# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the spark context. 
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

SparkContext是spark app的入口,包含一系列的function,比如创建RDD的parallelize()

SparkSession是SparkSQL和Dataframe操作的必须品

创建SparkContext和SparkSession的实例:

# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession 
    .builder 
    .appName("Python Spark Dataframes basic example") 
    .config("spark.some.config.option", "some-value") 
    .getOrCreate()

创建RDD然后使用transformations

# create an RDD which has integres from 1-30
data = range(1, 30)
xrangeRDD = sc.parallelize(data, 4)

# transformations
subRDD = xrangeRDD.map(lambda x: x-1)
filteredRDD = subRDD.filter(lambda x : x<10)

创建Dataframe并使用多种方法查询数据,最后关闭。

# Read the dataset into a spark dataframe using the `read.json()` function
df = spark.read.json("***.json").cache()
# Print the dataframe as well as the data schema
df.show()
df.printSchema()
# Register the Dataframe as a SQL temporary view
df.createTempView("people")

# Select and show basic data columns
df.select("name").show()
df.select(df["name"]).show()
spark.sql("SELECT name FROM people").show()


# Perform basic filtering
df.filter(df["age"] > 21).show()
spark.sql("SELECt age, name FROM people WHERe age > 21").show()

# 在单独col上的操作,新创造一列old,数值为age的3倍
df.withColumn('old', df['age']*3).show()

# Perfom basic aggregation of data
df.groupBy("age").count().show()
spark.sql("SELECt age, COUNT(age) as count FROM people GROUP BY age").show()


# close the SparkSession
spark.stop()
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/700279.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号