I deploy 2 services by Dockerfile+Docker-compose:
- Airflow 2.8.1 (with apache-airflow-providers-daskexecutor==1.1.1)
- Daskexecutor (pip install "dask[complete]==2023.4.1")
My dag on airflow:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from dask.distributed import Client
import dask
# Set up a Dask cluster
dask_client = Client()
# Define a simple Dask task
def dask_task():
with dask.config.set(scheduler=dask_client.get_executor()):
# Your Dask computation here
result = dask.delayed(sum)(range(10)).compute()
print("Dask Task Result:", result)
# Default_args specify the default parameters for the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Define the DAG
dag = DAG(
'dask_example',
default_args=default_args,
description='An example DAG using Dask with Airflow',
schedule_interval=timedelta(days=1), # Run the DAG daily
)
# Define the Dask task using PythonOperator
dask_operator = PythonOperator(
task_id='dask_task',
python_callable=dask_task,
dag=dag,
)
dask_operator
When I run dag that I got error on airflow webserver and dask worker.
Error on airflow webserver:
airflow-docker-compose-fec-airflow-scheduler-1 | [2024-01-26T04:10:15.543+0000] {base_executor.py:146} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'dask_example', 'dask_task', 'manual__2024-01-26T04:10:15.316699+00:00', '--local', '--subdir', 'DAGS_FOLDER/test2.py']
airflow-docker-compose-fec-airflow-scheduler-1 | [2024-01-26T04:10:15.594+0000] {dask_executor.py:116} ERROR - Failed to execute task: FileNotFoundError(2, 'No such file or directory')
Error on dask worker:
dask-executor-docker-compose-fec-worker-1 | 2024-01-26 04:10:15,557 - distributed.worker - WARNING - Compute Failed
dask-executor-docker-compose-fec-worker-1 | Key: check_call-dc9350e6-0d6d-4aa5-a469-4c49c3d841aa
dask-executor-docker-compose-fec-worker-1 | Function: check_call
dask-executor-docker-compose-fec-worker-1 | args: (['airflow', 'tasks', 'run', 'dask_example', 'dask_task', 'manual__2024-01-26T04:10:15.316699+00:00', '--local', '--subdir', 'DAGS_FOLDER/test2.py'])
dask-executor-docker-compose-fec-worker-1 | kwargs: {}
dask-executor-docker-compose-fec-worker-1 | Exception: "FileNotFoundError(2, 'No such file or directory')"
I expect that dag runs on dask worker, then it returns result on airflow webserver.
How could I fix issue?