PySpark Window Functions

PySpark window functions are useful when you want to examine relationships within groups of data rather than between groups of data as for groupBy. To use them you start by defining a window function, then select a separate function or set of functions to operate within that window.

If you prefer to work through the example, you can download the workbook from Github. The workbook is designed to work on Databricks Community Edition. Alternatively for the next six months this link should work to give you direct access to a copy of the workbook on Databricks since happily their public publishing system works now.

Lets take a look at the sorts of things that can be achieved with window functions of varying complexity. The imports and dataframe  I am going to be using for this explanation are:

import pandas as pd
import pyspark.sql.functions as fn
from pyspark.sql import SparkSession
from pyspark.sql import Window

# Create a spark session
spark_session = SparkSession.builder.getOrCreate()

# lets define a demonstration DataFrame to work on
df_data = {
  'partition': ['a','a', 'a', 'a', 'b', 'b', 'b', 'c', 'c',],
  'col_1': [1,1,1,1,2,2,2,3,3,], 
  'aggregation': [1,2,3,4,5,6,7,8,9,],
  'ranking': [4,3,2,1,1,1,3,1,5,],
  'lagging': [9,8,7,6,5,4,3,2,1,],
  'cumulative': [1,2,4,6,1,1,1,20,30,],
}
df_pandas = pd.DataFrame.from_dict(df_data)
# create spark dataframe
df = spark_session.createDataFrame(df_pandas)

Simple aggregation functions

we can use the standard group by aggregations with window functions. These functions use the simplest form of window which just defines grouping. However instead of producing one value for each group, we get the value added to every row of each partition in the window.

# aggregation functions use the simplest form of window 
# which just defines grouping
aggregation_window = Window.partitionBy('partition')
# then we can use this window function for our aggregations
df_aggregations = df.select(
  'partition', 'aggregation'
).withColumn(
  'aggregation_sum', fn.sum('aggregation').over(aggregation_window),
).withColumn(
  'aggregation_avg', fn.avg('aggregation').over(aggregation_window),
).withColumn(
  'aggregation_min', fn.min('aggregation').over(aggregation_window),
).withColumn(
  'aggregation_max', fn.max('aggregation').over(aggregation_window),
)

Row wise ordering and ranking functions

We can also use window funtions to order and rank data. These functions add an element to the definition of the window which defines both grouping and ordering

# lets define a ranking window
ranking_window = Window.partitionBy('partition').orderBy('ranking')

df_ranks = df.select(
  'partition', 'ranking'
).withColumn(
# note that fn.row_number() does not take any arguments
  'ranking_row_number', fn.row_number().over(ranking_window)
).withColumn(
  # rank will leave spaces in ranking to account for preceding rows
  # receiving equal ranks
  'ranking_rank', fn.rank().over(ranking_window)
).withColumn(
  # dense rank does not account for previous equal rankings
  'ranking_dense_rank', fn.dense_rank().over(ranking_window)
).withColumn(
  # percent rank ranges between 0-1 not 0-100
  'ranking_percent_rank', fn.percent_rank().over(ranking_window)
).withColumn(
  # fn.ntile takes a parameter for now many 'buckets' to divide 
  # rows into when ranking
  'ranking_ntile_rank', fn.ntile(2).over(ranking_window)
)

We can also revers the order of ranking using .desc(). In this case the window would be defined as follows

# lets define a ranking window in reverse order using .desc() 
# note that we need to use fn.col to define the column 
desc_ranking_window = Window.partitionBy(
  'partition'
).orderBy(
  fn.col('ranking').desc()
)

Cumulative Calculations (Running Totals and Averages)

There are often good reasons to want to create a running total or running average column. In some cases we might want running totals for subsets of data. Window functions can be useful for that sort of thing.

In order to calculate such things we need to add yet another element to the window. Now we account for partition, order and which rows should be covered by the function. This can be done in two ways we can use rangeBetween to define how similar values in the window must be to be considered, or we can use rowsBetween to define how many rows should be considered. The current row is considered row zero, the following rows are numbered positively and the preceding rows negatively. For cumulative calculations you can define “all previous rows” with Window.unboundedPreceding and “all following rows” with Window.unboundedFolowing

Note that the window may vary in size as it progresses over the rows since at the start and end part of the window may “extend past” the existing rows

#suppose we want to average over the previous, current and next values
# running calculations need a more complicated window as shown here
cumulative_window_1 = Window.partitionBy(
  'partition'
).orderBy(
  'cumulative'
  # for a rolling average lets use rowsBetween
).rowsBetween(
  -1,1
)

df_cumulative_1 = df.select(
  'partition', 'cumulative'
).withColumn(
  'cumulative_avg', fn.avg('cumulative').over(cumulative_window_1)
)

# running totals also require a more complicated window as here.
cumulative_window_2 = Window.partitionBy(
  'partition'
).orderBy(
  'cumulative'
  # in this case we will use rangeBetween for the sum
).rangeBetween(
  # here we use Window.unboundedPreceding to catch all earlier rows
  Window.unboundedPreceding, 0
)

df_cumulative_2 = df.select(
  'partition', 'cumulative'
).withColumn(
  'cumulative_sum', fn.sum('cumulative').over(cumulative_window_2)
)

Combining Windows and Calling Different Columns

It is also possible to combine windows and also to call windows on columns other than the ordering column. These more advanced uses can require careful thought to ensure you achieve the intended results.

First lets look at using multiple window functions in a single expression

# we can make a window function equivalent to a standard groupBy:

# first define two windows
aggregation_window = Window.partitionBy('partition')
grouping_window = Window.partitionBy('partition').orderBy('aggregation')

# then we can use this window function for our aggregations
df_aggregations = df.select(
  'partition', 'aggregation'
).withColumn(
  # note that we calculate row number over the grouping_window
  'group_rank', fn.row_number().over(grouping_window) 
).withColumn(
  # but we calculate other columns over the aggregation_window
  'aggregation_sum', fn.sum('aggregation').over(aggregation_window),
).withColumn(
  'aggregation_avg', fn.avg('aggregation').over(aggregation_window),
).withColumn(
  'aggregation_min', fn.min('aggregation').over(aggregation_window),
).withColumn(
  'aggregation_max', fn.max('aggregation').over(aggregation_window),
).where(
  fn.col('group_rank') == 1
).select(
  'partition', 
  'aggregation_sum', 
  'aggregation_avg', 
  'aggregation_min', 
  'aggregation_max'
)

# this is equivalent to the rather simpler expression below
df_groupby = df.select(
  'partition', 'aggregation'
).groupBy(
  'partition'
).agg(
  fn.sum('aggregation').alias('aggregation_sum'),
  fn.avg('aggregation').alias('aggregation_avg'),
  fn.min('aggregation').alias('aggregation_min'),
  fn.max('aggregation').alias('aggregation_max'),
)

Secondly here is an example of ordering by one column but operating on a different column. This can only be done for calculation functions which take an column as a parameter

# create a window on one column but use the window on another column 
lag_window = Window.partitionBy('partition').orderBy('lagging')

df_cumulative_2 = df.select(
  'partition', 'lagging', 'cumulative',
).withColumn(
  'lag_the_laggging_col', fn.lag('lagging', 1).over(lag_window)
).withColumn(
  # It is possible to lag a column which was not the orderBy column
  'lag_the_cumulative_col', fn.lag('cumulative', 1).over(lag_window)
)

The effect of windows functions is best understood by experimenting with them so I encourage you to make use of the Databricks workbook linked at the top of the page.

Further Reading

Further useful references about PySpark window functions can be found here:

 

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.