PySpark’s Delta Storage Format

Recently the Apache Foundation have released a very useful new storage format for use with Spark called Delta. Delta is an extension to the parquet format and as such basic creation and reading of Delta files follows a very similar syntax.

# to write parquet
df.write\
  .format('parquet')\
  .mode("overwrite")\
  .partitionBy("date")\
  .save(path)

# to write to delta we can do exactly the same 
# note this OVERWRITES the Delta not upserts
df.write\
  .format('delta')\
  .mode("overwrite")\
  .partitionBy("date")\
  .save(path)

# to read parquet
df = spark.read\
  .format("parquet")\
  .load(path)

# to read delta we can do exactly the same
df = spark.read\
  .format('delta')\
  .load(path)

However Delta offers three additional benefits over Parquet which make it a much more attractive and easy to use format

Firstly  Delta allows an unusual method of writing to an existing Delta file. After loading the Delta file into a variable as a data frame, you can write direct to the Delta file using SQL commands.

Secondly Delta allows upserting of records to existing data. That is new records will be inserted, while old records can be updated. This is an improvement over Parquet where it was not possible to update an existing partition. Instead the partition in question had to be read, merged with the new data, deleted and then rewritten which required some rather careful handling and was not especially efficient. In Delta this process has been replaced by a simple SQL command which is quicker and easier.

These two features combine together to allow for exceptionally easy updating of Delta files:

import pyspark
from pyspark.sql import SparkSession, SQLContext
def upsert_to_delta(df_update, path, key_column):
  """
  Upserts the rows in df_update to the existing Delta file 
  found at path using key_column as the upsert key
  IMPORTANT : This upsert will be applied directly to the 
  Delta file in place, not just to df_baseline
  Arguments:
  ----------
  df_update (dataframe): the dataframe to be upserted
  path (string) : path to Delta file resource
  key_column (string) : name of the key column (common 
            between Delta file and upserted dataframe)
  Returns:
  --------
  None : (the upsert is applied to the Delta in place)
  """
  # connect to our Delta
  df_baseline = spark.read.format('delta').load(path)
  # give the dataframes to view names
  df_update.createOrReplaceTempView("updates")
  df_baseline.createOrReplaceTempView('baseline')
  # setup our upsert merge SQL statement
  sql = """
    MERGE INTO baseline b
    USING updates u
    ON b.{id} = u.{id}
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
    """
  sql = sql.format(id = key_column)
  # execute the upsert
  spark.sql(sql)
  # finally optimise Delta file for future use
  spark.sql("OPTIMIZE baseline")

# to use the function
upsert_to_delta(df, path, "UniqueID")

Thirdly Delta allows you to view data as it was at some earlier state. This can be extremely useful in the case that an incorrect update was pushed to the Delta file. Using the time travel feature is extremely simple at the basic level. We can also rollback the existing Delta file to an earlier state using SQL as shown below.

# to load the dataframe from an earlier state
df = spark.read\
  .format("delta")\
  .option("timestampAsOf", "2019-10-10")\
  .load(path)

# to rollback a delta file 
# first choose the number of days
DAYS_TO_ROLLBACK = 1
# connect to our Delta
delta = spark.read.format('delta').load(path)
# create a view so we can use SQL
delta.createOrReplaceTempView("rollback")
# define our rollback
sql = """
  INSERT INTO rollback
  SELECT * FROM rollback 
  TIMESTAMP AS OF 
  DATE_SUB(CURRENT_DATE(), DAY_TO_ROLLBACK  )
  """
# and rollback
spark.sql(sql)

The Delta format is new and its documentation is still evolving, documentation  on upserting can be found here and documentation on time travel can be found here. including information on using rollback with SQL commands

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.