I have a flow in which I use a .map()
; as such, I "loop" over multiple inputs, however some of the inputs I need to generate only once, but I notice that my flow keep re-generating them.
Is it possible to cache/checkpoint the result of a task (which is used in other tasks) for the duration of the run?
My understanding is that it's possible to cache for a specific amount of time like so:
import datetime
from prefect import task
@task(cache_for=datetime.timedelta(hours=1))
def some_task():
...
However, if the run is less than the cache_for
time, would the cache still hold for the next run (if not I guess a caching with a long time will work).
Yes, there are a few different ways to achieve this type of caching:
Use a different cache validator
In addition to configuring your cache expiration (as you've done above), you can also choose to configure a cache validator. In your case, you might use either an input or parameter validator.
Use a cache key
You can "share" a cache amongst tasks (both within a single Flow and across Flows) by specifying a
cache_key
on your tasks:This will then look up your candidate
Cached
states by key instead of by task ID.Use a file-based target
Lastly, and increasingly the more popular setup, is to use a file-based
target
for your task. You can then template this target string with things likeflow_run_id
and the inputs provided to your task. Whenever the task runs, it first checks for the existence of data at the specified target location, and if found, does not rerun. For example:This template has the effect of re-using the data at the target if both of the following are true:
You can then share this template across multiple tasks (or in your case, across all the mapped children).
Note that you can also provide inputs and parameters to your
target
template if you desire.