I send a task with the .send_task method and using the same object to get the result.
To read the result I am using result.get(). This is not returning any value and also not giving any error.
result = celery_obj.send_task("task_name", args=[parameters])
print(f"result1 from celery -------: {result}") //result1 from celery -------: d6e7e95a-5097-4601-ab9c-24d126461933
print(f"result from celery -------: {result.get()}") // result.get() not returning anything also not giving error
Below is the DAG example:
RBROKER = "db+sqlite:///db.sqlite3"
CELERY_BROKER_URL = "pyamqp://guest:guest@hostname//"
celery_obj = Celery( 'worker',backend=RBROKER, broker=CELERY_BROKER_URL)
# Create the DAG
with DAG(
'mydag',
default_args=default_args,
schedule_interval='@once',
) as dag:
def run_celery(**kwargs):
context=get_current_context()
print(f"context value: {context}")
custom_param = kwargs['dag_run'].conf.get('custom_parameter')
result = celery_obj.send_task("celery_task_name", args=[custom_param])
print(f"result1 from celery -------: {result}")
#print(f"result from celery -------: {result.get()}")
#result2 = celery_obj.AsyncResult(id=result.id, app=celery_obj)
#print(result2.get())
#result.get()
#result_value = result.get() # Retrieve the result from the AsyncResult object
#return result
start_task = BashOperator(
task_id='start_task',
bash_command="echo Start Task!!"
)
run_task = PythonOperator(
task_id='run_task',
provide_context=True,
python_callable=run_celery
)
end_task = BashOperator(
task_id='end_task',
bash_command="echo end Task!"
)
# Define the task dependencies
start_task >> run_task >> end_task