栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Delta Lake DeltaTable

Delta Lake DeltaTable

Spark Scala Shell

Download the compatible version of Apache Spark by following instructions from Downloading Spark, either using pip or by downloading and extracting the archive and running spark-shell in the extracted directory.

spark-shell --packages io.delta:delta-core_2.12:0.8.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"  --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

Create a table

To create a Delta table, write a Dataframe out in the delta format. You can use existing Spark SQL code and change the format from parquet, csv, json, and so on, to delta.

val data = spark.range(0, 5)

data.write.format("delta").save("/tmp/delta-table")

data.show

val data = spark.range(5, 10)

data.write.format("delta").mode("overwrite").save("/tmp/delta-table")

df.show()

Conditional update without overwrite

Delta Lake provides programmatic APIs to conditional update, delete, and merge (upsert) data into tables. Here are a few examples.

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath("/tmp/delta-table")

deltaTable.delete(condition = expr("id % 2 == 0"))

deltaTable.toDF.show

 Read older versions of data using time travel

You can query previous snapshots of your Delta table by using time travel. If you want to access the data that you overwrote, you can query a snapshot of the table before you overwrote the first set of data using the versionAsOf option.

val df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df.show()

Write a stream of data to a table

You can also write to a Delta table using Structured Streaming. The Delta Lake transaction log guarantees exactly-once processing, even when there are other streams or batch queries running concurrently against the table. By default, streams run in append mode, which adds new records to the table:

val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/752774.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号