I'm brand new to using the Python celery framework. I've read various docs and articles. There seems to be a common theme that long running tasks while some times necessary are problematic for various reasons.
I need to create a workflow, or chain of tasks, that submits a request with an external system, waits for human approval of the request which can span anywhere from a few seconds to a few days, and then perform some work once the request is approved. Within the task I'll be requesting the status of the approval from a REST API. I'll be using a celery process as the message producer along with a few worker processes that are spread out across separate containers.
This question is specifically how to design the approval task in a way that reduces operational friction with celery and its workers?
So far my thoughts are either...
- Use a long running task that uses a loop to check for approval status and if the status is not approved, sleep for some period of time. When the status changes to approved break out of the loop and end the task.
@shared_task(
acks_late=True,
task_reject_on_worker_lost=True
)
def long_wait_for_approval(request_id: str) -> bool:
import requests
import time
while True:
r = requests.get('https://some.domain/approval_status/{request_id}')
if 'approved' in r.text:
break
time.sleep(30)
return True
- Use a task with retries enabled for a custom exception type, check for approval status and if the status is not approved then raise the custom exception to instruct celery to retry the task.
class PendingApprovalError(Exception):
pass
@shared_task(
acks_late=True,
task_reject_on_worker_lost=True,
autoretry_for=(PendingApprovalError,),
retry_backoff=True,
max_retries=100_000,
)
def wait_for_approval_with_retry(self, request_id: str) -> bool:
import requests
r = requests.get('https://some.domain/approval_status/{request_id}')
if not 'approved' in r.text:
raise PendingApprovalError()
return True
In the context of this question, are either of these options better than the other; why or why not?
Is there an improved pattern to handle this kind of workflow with celery?
Thanks in advance.
NB: I have tried both of the sample tasks in a test environment and they both get the job done (with no load.) However I have concerns about how they would perform in a production environment under load where hundreds of tasks are waiting for approval and the need arises to restart workers or deploy changes to the celery app.