Checking Dataframe equality in Pyspark

Recently I needed to check for equality between Pyspark dataframes as part of a test suite. To my surprise I discovered that there is no built in function to test for dataframe equality.

It turns out that checking dataframe equality in PySpark  is not a trivial issue.  Looking at the problem the except command which subtracts one dataframe from another looks like a promising approach since it will deal with structured data columns. Naively you night think you could simply write a function to subtract one dataframe from the other and check the result is empty:

def are_dataframes_equal(df_actual, df_expected): 
  return df_actual.subtract(df_expected).rdd.isEmpty() 

However this will fail if df_actual contains more rows than df_expected. We can avoid that pitfall by checking both ways round

def are_dataframes_equal(df_actual, df_expected): 
  if df_actual.subtract(df_expected).rdd.isEmpty():
    return df_expected.subtract(df_actual).rdd.isEmpty()
  return False

Even this solution can still run into problems due to duplicate rows. A good way to avoid this is to do a group by and count on all columns of each dataframe. The resulting count column will differ if the two dataframes do not have the same row duplication. This gives us a function like:

def are_dataframes_equal(df_actual, df_expected): 
  # sorts are needed in case if disordered columns
  a_cols = sorted(df_actual.columns)
  e_cols = sorted(df_expected.columns)
  # we don't know the column names so count on the first column we find
  df_a = df_actual.groupby(a_cols).agg(fn.count(a_cols[1]))
  df_e = df_expected.groupby(e_cols).agg(fn.count(e_cols[1]))
  # then perform our equality checks on the dataframes with the row counts
  if df_a.subtract(df_e).rdd.isEmpty():
    return df_e.subtract(df_a).rdd.isEmpty()
  return False

Finally we need to be able to handle schema mismatches. Fortunately for us it turns out that the above code smoothly handles both differing column names and differing column types correctly. This solution also handles struct and array columns nicely. However caution should still be observed around map columns. These may run into issues if the elements of the map are disordered. Further work may be required to sort the map columns prior to comparison.

This function can be slow to run on large dataframes, but is very useful for testing purposes. You can find more on PySpark testing  here. Some further notes including another approach to dataframe equality can be found here

A workbook with the equality checking functions discussed above, and some example pass and failure cases, can be found on GitHub

UPDATE: If you are looking for a good library for checking PySpark dataframe equality you could try Chispa by MrPowers which is available on Github  https://github.com/MrPowers/chispa

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.