How to retrieve the result of a celery task in Apache Airflow?

79 views Asked by At

I send a task with the .send_task method and using the same object to get the result.

To read the result I am using result.get(). This is not returning any value and also not giving any error.

result = celery_obj.send_task("task_name", args=[parameters])
print(f"result1 from celery -------: {result}") //result1 from celery -------: d6e7e95a-5097-4601-ab9c-24d126461933
print(f"result from celery -------: {result.get()}") // result.get() not returning anything also not giving error

Below is the DAG example:

RBROKER = "db+sqlite:///db.sqlite3"
CELERY_BROKER_URL = "pyamqp://guest:guest@hostname//"

celery_obj = Celery( 'worker',backend=RBROKER, broker=CELERY_BROKER_URL)

# Create the DAG
with DAG(
   'mydag',
    default_args=default_args,
    schedule_interval='@once',
) as dag:
    
    def run_celery(**kwargs):
        context=get_current_context()
        print(f"context value: {context}")
        custom_param = kwargs['dag_run'].conf.get('custom_parameter')
        result = celery_obj.send_task("celery_task_name", args=[custom_param])
        print(f"result1 from celery -------: {result}")
        #print(f"result from celery -------: {result.get()}")

        #result2 = celery_obj.AsyncResult(id=result.id, app=celery_obj)
        #print(result2.get())
        #result.get()
        #result_value = result.get()  # Retrieve the result from the AsyncResult object
        #return result


    start_task = BashOperator(
        task_id='start_task',
        bash_command="echo Start Task!!"
    )

    run_task = PythonOperator(
        task_id='run_task',
        provide_context=True,
        python_callable=run_celery
    )

    end_task = BashOperator(
        task_id='end_task',
        bash_command="echo end Task!"
    )

    # Define the task dependencies
    start_task >> run_task >> end_task
0

There are 0 answers