Pandas User Defined Functions for PySpark

Today I want to take a look at a neat feature of Pyspark called Pandas User Defined Functions. As the name suggests, a Pandas UDF works similarly to a normal User Defined Function. The main difference is that the function receives the data as a Pandas’ series rather than as a python scalar value.

This means that you can then apply any function that you would normally apply to a panda’s Series to the data. You can even convert the data to a Pandas dataframe and use dataframe functions if you wish.

pyspark and pandas logos

Basic Usage

So how does this work in practice? Well there are a couple of ways the syntax can be formed. Firstly we can create the Pandas UDF by adding a decorator to a function. This is where things may seem slightly confusing. The decorator needs to be told the return type as a PySpark type. However for a  standard pandas UDF the internal type of the wrapped function’s parameter will be a Pandas series. The return type of the function will also be a Pandas series. This is because Pandas will be loading and processing the entire column it is handed at once then handing the whole column back at once. The underlying mechanisms use use PyArrow to transfer the data. The @pandas_udf decorator then reinserts this data into the column.

import pandas as pd
from pyspark.sql.functions import pandas_udf, col, lit, sum
from pyspark.sql.types import IntegerType, FloatType

# define the return type as a parameter within the decorator
@pandas_udf(IntegerType())
# pass and return a pd.Series in the decorated function
def string_length(strings: pd.Series) -> pd.Series:
    # now we can use pandas string functions on PySpark data
    return strings.str.len()

df = df.withColumn(
    'output_column', string_length('input_column')
)

The other way to use the Pandas UDF is to use it like a regular function to wrap our function

# pass and return a pd.Series in the function for the UDF
def string_length(strings: pd.Series) -> pd.Series:
    return strings.str.len()

# construct our pandas udf with pandas_udf
udf_string_length = pandas_udf(string_length, IntegerType())
df = df.withColumn(
    'output_column', string_length('input_column')
)

This is great because it allows us to use built in Pandas functions where there is not an equivalent PySpark function.

Using @pandas_udf with Aggregations

The real power of these functions is that they can be used as User Defined Aggregation Functions which will work as aggregations when grouping or windowing data.

The syntax here is much the same except that in these cases the return type needs to be a python scalar type. this is because the series needs to be returned as a single value. An example of this might be.

# define the return type as a parameter within the decorator
@pandas_udf(FloatType())
# pass a pd.Series and return a scalar python type in the udaf
def pd_median(values: pd.Series) -> float:
    return values.median()

df_grouped = df.groupBy(
    'grouping_column'
).agg(
    pd_median('column').alias('median'),
)

The code above is certainly an easier and more readable way to extract a median from a group than some of the other methods.

It is also certainly a simpler way to create a user defined aggregation than some other workarounds such as using collect list and then running a standard UDF over the resulting list as explained here https://danvatterott.com/blog/2018/09/06/python-aggregate-udfs-in-pyspark/

Handling Errors and other Issues

There can be some issues though. A big one is that you cannot mix Pandas UDFs and standard PySpark aggregation functions. Therefore you may have to conduct two aggregations and then join on the grouping columns in some cases.

Some Pandas series functions may return data in a format which needs some conversion before you can return it from the function. For example pd.mode() will not work directly as it does not return a standard python type. Another strange feature @pandas_udf exhibits is that sometimes the internal Pandas code wants to mutate values in ways which PySpark will not allow. This results in errors like PythonException: ‘ValueError: buffer source array is read-only’. This can be easily rectified by taking a copy of the series, rather than trying to operate on the original series. Then just use Pandas functions on the copy.

@pandas_udf(IntegerType())
def pd_mode(values: pd.Series) -> int:
    # take a copy so we can mutate it
    v = values.copy()
    # return a scalar from the result
    return v.mode()[0]

df_grouped = df.groupBy(
    'grouping_column'
).agg(
    pd_mode('column').alias('median')
)

But what if a suitable function does not exist in Pandas? No problem, you can just convert the series to a python list and then apply any functions you like from there. For calculating the  mode for a group by once more, another approach might be to convert to a list. Then we can use use mode from the statistics package.

from statistics import mode
# create pandas udf using statistics' mode
@pandas_udf(IntegerType())
def pd_mode(values: pd.Series) -> int:
    # convert series to list if you need to
    new_values = values.to_list()
    # then you can use other libraries on the list
    return mode(new_values)

df_grouped = df.groupBy(
    'grouping_column'
).agg(
    pd_mode('column').alias('mode')
)

Further Reading

To see examples of this code to play around with take a look at this workbook on Databricks Community Cloud. Or alternatively download the workbook from github to play around with. You can also see some more advanced things that can be done with @pandas_udf here and here. You can find documentation for the function including other ways of structuring its syntax here.

 

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.