I have a pipeline setup in my Foundry instance that is using incremental computation but for some reason isn't doing what I expect. Namely, I want to read the previous output of my transform and get the maximum value of a date, then read the input only for data immediately after this maximum date.
For some reason, it isn't doing what I expect and it's quite frustrating to step through the code on a build / analyze / modify code process.
My code looks like the following:
from pypsark.sql import functions as F, types as T, DataFrame
from transforms.api import transform, Input, Output, incremental
from datetime import date, timedelta
JUMP_DAYS = 1
START_DATE = date(year=2021, month=10, day=1)
OUTPUT_SCHEMA = T.StructType([
T.StructField("date", T.DateType()),
T.StructField("value", T.IntegerType())
])
@incremental(semantic_version=1)
@transform(
my_input=Input("/path/to/my/input"),
my_output=Output("/path/to/my/output")
)
def only_write_one_day(my_input, my_output):
"""Filter the input to only rows that are a day after the last written output and process them"""
# Get the previous output and full current input
previous_output_df = my_output.dataframe("previous", output_schema)
current_input_df = my_input.dataframe("current")
# Get the next date of interest from the previous output
previous_max_date_rows = previous_output_df.groupBy().agg(
F.max(F.col("date")).alias("max_date")
).collect() # noqa
# PERFORMANCE NOTE: It is acceptable to collect the max value here to avoid cross-join-filter expensive
# operation in favor of making a new query plan.
if len(previous_max_date_rows) == 0:
# We are running for the first time or re-snapshotting. There's no previous date. Use fallback.
previous_max_date = START_DATE
else:
# We have a previous max date, use it.
previous_max_date = previous_max_date_rows[0][0]
delta = timedelta(days=JUMP_DAYS)
next_date = previous_max_date + delta
# Filter the input to only the next date
filtered_input = current_input_df.filter(F.col("date") == F.lit(date))
# Do any other processing...
output_df = filtered_input
# Persist
my_output.set_mode("modify")
my_output.write_dataframe(output_df)
In incremental transforms, it can be difficult to isolate what conditions exist that are breaking your code. As such, it's typically best to:
In your code example, breaking up the execution to a bunch of testable methods will make it substantially easier to test it and see what's wrong.
The new method should look something like this:
You can now set up individual unit tests, assuming your code lives at
transforms-python/src/myproject/datasets/output.py
, following the methodology here to set everything up correctly.Therefore my testing file now looks like the following:
It's worth noting this is why you can enable things like McCabe complexity checkers and unit test coverage features inside Foundry so you can break up your code into smaller more durable pieces like this.
Following a design pattern like this will give you much more durable code that is more trustworthy in incremental transforms.
If you adopt this style of transform, you will also be able to iterate much faster on perfecting your logic by running the individual test you are looking for using the Code Repository feature of "Test". You can open the test file and click the green "Test" button next to the specific case you are interested in, which will let you get your logic written much faster than clicking build every time and trying to get your input conditions lined up like you want.