Building a Custom Data Pipeline Using Curried Functions

If you work in data science you have probably come across the pipeline model for handling data transformations. It is used by many machine learning libraries.

Lets take a look at a toy example of how you could build a pipeline class to allow you to transform a pandas dataframe according to a pipeline recipe provided as a text file in JSON format.

We want our pipeline to do two things. First of all it needs to create a list of the functions we want to apply to the dataframe. Secondly (and this is a little more tricky) we need each function in the listĀ  to store the parameters it needs in order to act correctly on the dataframe. We shall achieve this second requirement via currying.

First lets define a a pipeline definition for our transformations and a dataframe to act on

import pandas as pd
import sys, os
import json

# define our pipeline using a JSON file
pipeline_json = """
  [
  {"function":"count_letter", "source":"word_1", "letter":"t", "target":"t_count_1"},
  {"function":"count_letter", "source":"word_2", "letter":"e", "target":"e_count_2"},
  {"function":"sum_columns", "sources":["t_count_1", "e_count_2"], "target":"t1_e2_total"},
  {"function":"rename_column", "source":"t1_e2_total", "target":"letter_count"},
  {"function":"drop_columns", "columns_to_drop":["t_count_1", "e_count_2"]}
  ]
  """

# convert the string to JSON
pipeline_definition = json.loads(pipeline_json)

# lets build a dataframe
data = {
  "word_1": ["terrible", "totter", "banana", "thick", "transparent"],
  "word_2": ["excellent", "strident", "oblong", "regular", "three"]
  }

df = pd.DataFrame(data)

The basic concept is that for each transformation we want available for use in our pipeline, we have a dictionary which gives the name of the function we want to use and the parameters we need to specify for that function to be able to do its job.

The operations the example pipeline defines are not especially useful. It counts letters inĀ  the words in a couple of columns, performs some column addition, then renames a column and drops two columns. However this will suffice to demonstrate the basic principles behind pipeline building.

In practise we would not define out data or our pipeline definition within the script, we would read them in from external files. We can simulate this by writing them out and reading them back in again

# export this definition to a text file to simulate
# the way we would actually want to import the data
with open('pipeline_file.txt', 'w') as outfile:
  json.dump(pipeline_definition, outfile)

# read from the text file to recover our pipeline
# in JSON format (note json.load NOT json.loads)
with open("pipeline_file.txt") as pipeline_file:
  pipeline_definition = json.load(pipeline_file)

# export the dataframe to a csv to allow us to simulate importing data
df.to_csv("df.csv", index = False)

# import the csv to get our dataframe
df = pd.read_csv("df.csv")

Now we need to build the class which will interpret and run this example pipeline. We will require one function for each dataframe operation we want to have available. We will also need an __init__ function to set up our class, a function to accept a operations to add to the pipeline, and a function to run the pipeline against a particular dataframe. For completeness a function to allow the pipeline to be emptied is also included.

The function to build the pipeline of functions to apply to the dataframe has a list of functions which are available. When given a pipeline definition it calls each currying function in turn until it finds one which is able to return the right function to perform the required operation. Then it adds that function’s returned function to the pipeline

Here is the code for the skeleton of the class, minus the functions we need to actually enact the dataframe operations


class DataFrameHandler():

  def __init__(self):
    # initialise an empty pipeline
    self.function_pipeline = []

### functions which enact the different pipeline operations go here ###

  def create_pipeline(self, pipeline_definition):
  '''create a pipeline from a definition'''
    # enumerate the available pipeline functions
    available_functions = [self.rename_column,
        self.sum_columns,
        self.count_letter,
        self.drop_columns]
    # iterate seeking the correct function for each operation
    for spec in pipeline_definition:
      for func in available_functions:
        if func(spec) is not None:
          # add the desired functions to the pipeline
          # currying in the required variables
          self.function_pipeline.append(func(spec))
          continue

  def clear_pipeline(self):
  '''removes current pipeline functions'''
    self.function_pipeline = []

  def run_pipeline(self, df):
  '''apply the pipeline functions in order to the dataframe'''
    for func in self.function_pipeline:
      df = func(df)
    return df

Each of the currying functions which defines a dataframe operation is structured in a similar way. When called it checks if it is the correct function, returning “None” otherwise. If it finds it is the correct function then it sets some variables using the specification it has been handed for the function. Due to scope, these functions are curried into the function located inside it which will actually operate on the dataframe. It then returns this function with its parameters baked in, to the pipeline builder to create the pipeline.

Here is the code for the dataframe operation building functions, notice the similar structure of each function

  def rename_column(self, spec):
  '''function to rename a column'''
    # check if this function curries the requested function
    if spec["function"] != "rename_column":
      # if it was not, then return None
      return None
    # otherwise extract the variables to be curried into this function
    source = spec["source"]
    target = spec["target"]
    # then curry the function which wil carry out the requested action
    def fn(df):
      #curried renaming function
      try:
        df = df.rename(columns={source: target})
      except:
        print("unable to complete rename_column")
      return df
    # return the internal function fn to be used in the pipeline
    return fn

  def sum_columns(self, spec):
  '''function to sum a list of columns'''
    if spec["function"] != "sum_columns":
      return None
    sources = spec["sources"]
    target = spec["target"]
    def fn(df):
    #curried sum function
      try:
        df[target] = df[sources].sum(axis=1)
      except:
        print("unable to complete sum_columns")
      return df
    return fn

  def count_letter(self, spec):
  '''function to create letter frequency counts'''
    if spec["function"] != "count_letter":
      return None
    source = spec["source"]
    target = spec["target"]
    letter = spec["letter"]
    # curry in a helper function to be used on the dataframe using apply
    def _letter_tally(phrase):
      count = 0
      for l in phrase:
        if l == letter:
          count += 1
      return count
    def fn(df):
    # curried count letter function
      try:
        df[target] = df.apply(_letter_tally)
      except:
        print("unable to complete count_letter")
      return df
    return fn

  def drop_columns(self, spec):
  '''function to delete a given list of columns from the dataframe'''
    if spec["function"] != "drop_columns":
      return None
    columns_to_drop = spec["columns_to_drop"]
    def fn(df):
    # curried drop columns function
      try:
        df = df.drop(columns_to_drop, axis = 1)
      except:
        print("unable to complete drop_columns")
      return df
    return fn

Once the pipeline is built, all we need to do is call the run pipeline function with our desired dataframe. The pipeline functions will be sequentially applied and if all has gone well we will end up with our desired dataframe.


# create our pipeline object
pipeline_object = DataFrameHandler()
# add the correct functions as per the defintion
pipeline_object.create_pipeline(pipeline_definition)
# run the pipeline on our data
output_df = pipeline_object.run_pipeline(df)
output_df

However what if there is something wrong with our pipeline definition? Well that is why we have try catches inside each pipeline function. If one of our operations fails, then we will get an alert telling us which function failed. Obviously this alerting could be made more sophisticated as desired.

As usual the example code for this post is on my Github. Currying can also do more advanced things than simply baking in variables, it can also be used to chain functions together and a variety of other handy trick, more details can be found here and here

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.