Combining Original Values with Global Aggregations in PySpark

Sometimes it is useful to not only compute aggregated functions, but also to be able to compare them on a row by row basis with the original data. For instance you might want to identify all rows where a value is above average. There are two neat ways to approach this problem.

Adding Averages to a DataFrame with groupBy

Let us consider that we have a very simple DataFrame of the form:

group values
A 1
A 2
B 3
B 4

Firstly you can use a groupBy function to collect the aggregated functions and then join back to your original data on the grouping columns. This would look something like this:

df_grouped = df.groupBy(
    'group'
).agg(
    fn.mean('values').alias('average'),
    fn.sum('values').alias('total'),
)

# note that if you do not alias df, you will be
# unable to resolve the two resulting 'group' columns
df_joined_groupings = df.alias('df').join(
    df_grouped,
    on = df.group == df_grouped.group,
    how = 'inner'
).select(
    fn.col('df.group'),
    'values',
    'average',
    'total'
)

Adding Averages to a DataFrame with a Window

Secondly  and perhaps neater you can use a Window function. This would look something like this:

window = Window().partitionBy('group')

df_with_groupings = df.withColumn(
    'average', fn.mean('values').over(window)
).withColumn(
    'total', fn.sum('values').over(window)
)

Generally this method also appears to be faster though the difference is not drastic. This is likely because when we .explain() the two methods we find that whereas the window method only uses one sort and one exchange of hash partitioning, the group and join method uses two of each.

Adding Global Averages to a DataFrame

However an issue arises if you want to consider global averages. The groupBy()  method works as expected with no columns specified in the grouping clause. However there is then the puzzle of how to join the two dataframes, since we have a single row of global averages we wish to join to every row in the original dataframe.  It is not permissible to simply set the on paramater equal to true since this gives the error: . The solution to this is to realise that any solution which evaluates to True is permissible, thus we can simply equate a column in the original dataframe to itself, this will always be true and will join the data as desired. In this case we do not even need to alias the DataFrame since the group totals will not have conflicting columns.

# here we will ignore the group column
df_grouped = df.groupBy(
).agg(
    fn.mean('values').alias('average'),
    fn.sum('values').alias('total'),
)
df_joined_groupings = df.join(
    df_grouped,
    # note our on clause always equates True
    on = df.values == df.values,
    how = 'inner'
).drop('group')

On the other hand, if you try to use a plain Window() function it fails with the error message: . The solution to this is to use the following window which is permissible Window().partitionBy() . This will then produce the results you need to provide. Thus the method works as follows where you need global averages.

# empty partition aggregates across all rows
window = Window().partitionBy()

df_with_groupings = df.withColumn(
    'average', fn.mean('values').over(window)
).withColumn(
    'total', fn.sum('values').over(window)
).drop('group')

These neat tricks of always true joins and empty windows have other applications beyond averages since they can be used with almost any grouping function. A short demonstration workbook can be found here or here.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.