Whats worse than having a data pipeline fail. Having a data pipeline apparently succeed but actually fail! While having a pipeline fail is bad, having a pipeline write junk data without warning you of the error is even worse.
The Problem
While we all try to write pipelines that will correctly handle incoming data, how can you defend against data which is corrupted in a way you have not seen before? As this is a new error we may not be able to detect and correct the error specifically. That is because this is a new unknown error mode which the pipeline does not yet handle. It might be an edge case you have not thought of, data corrupted or missing in a new and interesting way or data with upstream errors. If you are lucky, this may just cause your pipeline to fail. However what can be even worse is if the pipeline completes but outputs bad data without warning you.
However with some thought you may be able to at least detect the error. You can then halt the pipeline before you write incorrect data to a durable store and alert so that . One way you could consider is property based assertions.

Property Based Assertions
The trick with property based assertions is that you do not need a specific test case and thus an assert that checks for the specific error. What you test instead is that the properties of the the data before and after a pipeline stage are compatible and reasonable. Lets consider some possible properties in the context of a data pipeline using PySpark data frames and the sorts of assertions we could use.
Do you expect that the output data should have the same number of rows as the input data?
assert df_in.count() == df_out.count()
When performing a grouping action should the overall total from a given column correspond to the total of some column from the original data frame?
(assert df_in.select('in').groupBy().sum().collect()[0][0] ==
df_out.select('out').groupBy().sum().collect()[0][0])
When doing a join do you expect to end up with the same number of rows as one of the original dataframes?
assert df_left.count() == df_joined.count()
If producing a set of values in a column. Is there an acceptable range of values (say 0.0 to 1.0) which can appear in that column?
assert df_out.filter(fn.col('out') >= fn.lit(0.0)).filter(fn.col('out') <= fn.lit(1.0)).count() == 0
Should your output data be free of duplicate IDs?
assert df_out.count() == df_out.dropDuplicates(subset=['uniqueId']).count()
Assertion Strategy
By now you should be getting the idea. Rather than looking for a specific error we are checking that certain properties of the data make sense. For instance may not know what values for the transactions for tomorrow should total to. However they should likely have the same total value in the initial data as they do at the end of the process. If not something has likely gone wrong and needs investigation.
We don’t need to limit ourselves to one assertion either. We might check that totals match and are within a given range, the overall number of rows of data and that there is no duplication
The possibilities for property based assertions will depend on what your data pipeline is doing. By placing these kinds of asserts at the end of a pipeline stage, just before you write data out to a durable store such as a delta file, you can ensure that bad data is far less likely to get written out. Instead, faced with a failure your pipeline is not currently configured to handle, it will halt with an error allowing you to investigate.
Of course this can become even more complicated if you are dealing with streaming data. I this case you may find this paper by Adrian Riesco and Juan Rodriguez-Hortal useful http://maude.sip.ucm.es/~adrian/files/sstr.pdf. It looks at Scala Spark but has useful information about the considerations required for property based testing of streaming data.
Tradeoffs
Of course there is likely to be an overhead to performing these assertion checks. You will need to consider the trade offs between running a very comprehensive set of assertions and the computational cost of doing so. If this is a major concern, you could always make the property assertions an optional section of the pipeline. That way it can be switched off via a parameter once you are confident the pipeline is running smoothly or enabled only for data from certain sources.