Using AI on PySpark workloads with Databricks

Recently Databricks made an exciting announcement. They have created a new library which allows you to use large language models to perform operations on PySpark dataframes. The library is called pyspark-ai and is available at https://github.com/databrickslabs/pyspark-ai/. I have tested the library out and while it still has some rough edges and can be a little brittle, it shows considerable promise.

a robot looking at data

Setup

So how do you get this to work on Databricks. Well the first requirement seems to be that you are using Databricks runtime 13.2 on your cluster. Using runtime 12.2LTS (which sadly seems to be the latest version available on the Databricks community cloud) attempting to import pyspark-ai into a workbook results in an error. `TypeError: dataclass_transform() got an unexpected keyword argument ‘field_specifiers’`

Also you will need to get an openAI API key from https://platform.openai.com/account/api-keys. This is not free, but my experiments with it so far with pyspark-ai  have cost me pennies so it is economical. Once you have a key,, use it to set an environment variable as follows:

OPENAI_API_KEY=your-key-goes-here

In theory this could just be set in the cluster>configuration>advanced options>spark>environment variables setting. However Databricks recommends that you set secrets via the the Databricks CLI and the secrets API

So having set up a nice shiny new cluster, first of all you need to pip install pyspark-ai with

pip install pyspark-ai

Once this is done, you can import the library set up a SparkAI object and enable AI for you workbook as follows

from pyspark_ai import SparkAI
# langchain is auto installed as a dependency of pyspark-ai
from langchain.chat_models import ChatOpenAI

# If 'gpt-4' is unavailable, use 'gpt-3.5-turbo'
llm = ChatOpenAI(model_name='gpt-3.5-turbo', temperature=0)
# Initialize SparkAI with the OpenAI model
spark_ai = SparkAI(llm=llm, verbose=True)

spark_ai.activate()

Now that things are set up we can proceed to actually using AI to assist us in work with dataframes

Displaying Data using AI

Displaying data  can be achieved by using a simple df.ai.plot() command. Optionally you can add a string describing what you would like plotted in order to guide the AI’s choices, or you can leave the prompt out and just see what it chooses to do. Under the hood it appears these plots are achieved by converting to pandas and using pandas plotting libraries

# this lets the AI choose which columns to use for plotting
df_wine.ai.plot()

# while adding a prompt gives you extra control
df_wine.ai.plot('plot total sulphur dioxide against pH')

This can give you nice looking plots with very little effort. Here is the default plot the AI gives from the wine data set which comes as standard in Databricks

wine plot generated automatically by AI

Transforming Data using AI

transforming a dataframe is likewise simple to achieve as follows

df_averages = df_wine.ai.transform(
"""group by pH and calculate the average 
chlorides and sulphate for each group,
then order the results by ascending pH"""
)

While the AI generally seems to make a good job of this transformation, it would certainly be wise to check its work. The good news is that SparkAI returns you the SQL it used to transform your dataframe. This is useful for 2 reasons. Firstly it makes the checking of the AI’s workings easier. Secondly it means that you can extract that code to use directly meaning that you do not need to call the AI for subsequent runs of the notebook if you are creating something like an ETL pipeline.

AI powered UDFs

SparkAI also gives the ability to create custom User Defined Functions (UDFs) as follows. Here rather than passing in requirements as parameters, they are expressed as a docstring comment inside the function.


# create a spark UDF
@spark_ai.udf
def convert_quality(quality: int):
"""Convert the quality to a descriptive word where
1 would be awful and 10 would be amazing"""

Note that convert_quality actually appears to be created as a function. In the above case the function created is as follows (once again SparkAI helpfuly tells you what it has done)

# this code was what the AI generated
def convert_quality(quality) -> str:
    if quality is not None:
        if quality == 1:
            return "awful"
       elif quality == 2:
            return "bad"
        elif quality == 3:
            return "poor"
        elif quality == 4:
            return "fair"
        elif quality == 5:
            return "average"
        elif quality == 6:
            return "good"
        elif quality == 7:
            return "very good"
        elif quality == 8:
            return "excellent"
        elif quality == 9:
            return "outstanding"
        elif quality == 10:
            return "amazing"

The code can then be successfully used as a UDF as follows

from pyspark.sql import functions as fn

# add a udf wrapper to the AI created function
df_wine_descriptions = df_wine.withColumn(
'description', fn.udf(convert_quality)('quality')
)

Once again the provided code can be extracted for analysis and for direct use to reduce OpenAI AIP calls (and thus cost).

Loading data from the web with AI assistance

Data can also be loaded from the web. This can either be done by providing a URL of a page on which a table you wish to use is located, or (if you provide a google API key) by providing a search string for the AI to use to locate the data you want

# if you have the URL for the data you can submit that
df_auto = spark_ai.create_df(
"https://www.carpro.com/blog/full-year-2022-national-auto-sales-by-brand"
)

# alternatively use google to search for data
df_auto = spark_ai.create_df(
  "2022 USA national auto sales by brand"
)

Failure Cases and Overall Impressions

SparkAI is quite new and it does have some weaknesses and failure cases. These include:

  • Some transforms seem to fail where column names have spaces in them. This is because the spaces lead to incorrect syntax in the SQL the AI generates
  • Some data loading from the web fails. This happens even when you hint what columns you are looking for. Two factors seem to be present here. Too much data for the AI response to handle, and difficulties identifying where the data is on the webpage so it can be extracted
  • the UDF creator does not actually seem to create a UDF. rather it creates a function which can be converter to a udf by wrapping it in spark.sql.functions.udf()

The first two issues are likely model dependent (I was using chatGPT 3.5) and may well be fixed by improved models over time or better prompt engineering. The third issue is likely just an issue with how the UDF wrapper for the function is created and there is a simple workaround as noted above.

Overall this is an intriguing addition to PySpark and if it continues to be developed then it could turn into a very powerful data analysis tool.

Sample Notebook and Further Reading

It would appear that Databricks no longer supports publishing community workbooks directly, and furthermore the workbook does not work there currently so my example workbook can be found on Github. The workbook details a number of successful uses along with some failure cases. It also provides usage examples. These may be handy for getting started since some of the library’s usage conventions are not immediately obvious.

The announcement about SparkAI can be found here.

The Github repo for SparkAI is here.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.