Recently I needed to check for equality between Pyspark dataframes as part of a test suite. To my surprise I discovered that there is no built in function to test for dataframe equality.
It turns out that checking dataframe equality in PySpark is not a trivial issue. Looking at the problem the except command which subtracts one dataframe from another looks like a promising approach since it will deal with structured data columns. Naively you night think you could simply write a function to subtract one dataframe from the other and check the result is empty:
def are_dataframes_equal(df_actual, df_expected): return df_actual.subtract(df_expected).rdd.isEmpty()
However this will fail if df_actual contains more rows than df_expected. We can avoid that pitfall by checking both ways round
def are_dataframes_equal(df_actual, df_expected):
if df_actual.subtract(df_expected).rdd.isEmpty():
return df_expected.subtract(df_actual).rdd.isEmpty()
return False
Even this solution can still run into problems due to duplicate rows. A good way to avoid this is to do a group by and count on all columns of each dataframe. The resulting count column will differ if the two dataframes do not have the same row duplication. This gives us a function like:
def are_dataframes_equal(df_actual, df_expected):
# sorts are needed in case if disordered columns
a_cols = sorted(df_actual.columns)
e_cols = sorted(df_expected.columns)
# we don't know the column names so count on the first column we find
df_a = df_actual.groupby(a_cols).agg(fn.count(a_cols[1]))
df_e = df_expected.groupby(e_cols).agg(fn.count(e_cols[1]))
# then perform our equality checks on the dataframes with the row counts
if df_a.subtract(df_e).rdd.isEmpty():
return df_e.subtract(df_a).rdd.isEmpty()
return False
Finally we need to be able to handle schema mismatches. Fortunately for us it turns out that the above code smoothly handles both differing column names and differing column types correctly. This solution also handles struct and array columns nicely. However caution should still be observed around map columns. These may run into issues if the elements of the map are disordered. Further work may be required to sort the map columns prior to comparison.
This function can be slow to run on large dataframes, but is very useful for testing purposes. You can find more on PySpark testing here. Some further notes including another approach to dataframe equality can be found here
A workbook with the equality checking functions discussed above, and some example pass and failure cases, can be found on GitHub
UPDATE: If you are looking for a good library for checking PySpark dataframe equality you could try Chispa by MrPowers which is available on Github https://github.com/MrPowers/chispa