I have a celery function with definition -
@async_worker.task(ignore_result=True, queue="data_path")
def publish_msg_from_lock(self, mac: str, data: bytes, gateway_euid: str):
previously it wasn't a celery task it was being called like this -
n.publish_msg_from_lock(addr, unhexlify(payload), gateway_euid)
After making it a celery task I updated the call in this way -
n.publish_msg_from_lock.apply_async(args=(addr, unhexlify(payload), gateway_euid),)
I've also tried -
n.publish_msg_from_lock.apply_async(args=(addr, unhexlify(payload), gateway_euid), kwargs={})
and
n.publish_msg_from_lock.apply_async(kwargs={"mac": addr, "data": unhexlify(payload), "gateway_euid": gateway_euid})
But I'm getting error -
File "/usr/local/lib/python3.8/dist-packages/celery/app/task.py", line 531, in apply_async
check_arguments(*(args or ()), **(kwargs or {}))
TypeError: publish_msg_from_lock() missing 1 required positional argument: 'gateway_euid'
Can you please help to fix it?
Your task fail because it is not bound. To use the parameter self in the function signature you need to add bind=True to the celery decorator.
Example:
It allows to access some celery functionality like described here https://docs.celeryq.dev/en/stable/userguide/tasks.html#bound-tasks
You could also remove the parameter self if you don't need it.