I recently came across a strange little problem with a satisfying solution which people building path based models might be interested in.
The Problem
Imagine that you have path step data which is well segmented by person. If each person only has one path and you have a time signal then you are home and dry. You can simply order by the time data and use the person’s identity as their path identity as well.
However what if each person can have multiple paths? In that case things become more complicated. Fortunately my data had a couple of nice properties which allowed me to reconstruct paths simply:
- Firstly a single person’s paths did not overlap, one path would not begin until the person’s previous path had ended.
- Secondly each step had some information about the previous step.
This meant that each step could be compared to the previous step to see if it should be joined on.
The Solution
At this point the key issue became performing this join efficiently. I had millions of rows of this data in tabular format. I did not want to have to iterate over the rows or use functions which might disrupt PySpark’s parallelism. Fortunately, we can resolve this using some window functions and comparisons.
To do this we take 3 steps:
- we use a window function and the lag function to push information from the previous timestep for each person into the next timestep
- we compare this preceding information with the current information using a when function to check for a match assigning 1 for a non-match and 0 for a match
- we can then sum across all prior events using a window with rowsBetween from unboundedPreceding to the current row. Do this for each person to create equally numbered groups that can then be turned into the path identities
The Pyspark code to perform this looks like this:
person_window = Window.partitionBy( 'person', ).orderBy( 'time_step', ) cumulative_window = Window.partitionBy( 'person', ).orderBy( 'time_step', ).rowsBetween( Window.unboundedPreceding, 0 ) preceding_window = Window.partitionBy( 'person', 'path_group', ).orderBy( 'time_step', ) df_1_separate_paths = df_1.withColumn( # get information from the preceding step 'preceding_reference', fn.lag('reference').over(person_window) ).withColumn( # and compare to the current step 'not_same_as_preceding', fn.when( fn.col('previous_reference') == fn.col('preceding_reference'), # be careful to use 0 for matches 0 ).otherwise( # and 1 for non-matches 1 ) ).withColumn( # we can then separate into paths correctly 'path_group', fn.sum('not_same_as_preceding').over(cumulative_window) ).withColumn( # assign uuids if we want to have unique not numbered path groups 'uuid', fn.expr('uuid()') ).withColumn( # by taking the first uuid from each path group 'path_id', fn.first('uuid').over(preceding_window) ) display(df_1_lagged)
The key insight here is that when checking for matches we assign zero to matches but one to non-matches. This means that our later sum only increments when we do not find a match. That is, when the next step is part of a different path. If we assigned the other way round we would have a much tougher time joining the steps into the correct paths.
An example workbook can be seen on Databricks Community Edition here or on Github here.