Executing airflow tasks that are themselves in docker containers

61 views Asked by At

I have a task that I want to execute on a schedule using airflow. I have airflow running in docker using the docker-compose.yaml provided in the airflow docker tutorial.

I build the docker image for the task with docker build -f Dockerfile -t twm_step01

My task consists of a bash script which sets up some directories to read from before calling docker run.

So the script below is called ex-my-script.sh, and it reads from another script called config.sh which provides paths to directories that should be read from/written to.

A further script (my-script.sh) is executed in the docker container as shown below. This script then executes another bash script, which then executes a final bash script which calls a software program installed in the container to write the output data of the task.

#!/bin/bash

source scripts/config.sh

in_dir=$event_image_dir
in_ext=$zip_ext
processing_graph_xml=$graph_0
out_dir=$out_step01
out_ext=$dim_ext
lvl_parallelism=$parallel_lvl
data_dir=$data_directory

docker run -it \
        -v $(pwd)/write_storage:$data_directory \
        twm_step01 \
                bash /scripts/my-script.sh \
                        $in_dir \
                        $in_ext \
                        $processing_graph_xml \
                        $out_dir \
                        $out_ext \
                        $lvl_parallelism \
                        $data_dir

Here is the config.sh to make things easier to follow

parallel_lvl=4

local_directory=/opt/airflow/tasks
data_directory=/opt/airflow/tasks/write_storage

zip_ext=.zip
dim_ext=.dim
txt_ext=.txt
tif_ext=.tif
shp_ext=.shp

# step01
event_image_dir=Events_Images/2015
graph_0=/snap_graphs/snap_graph_0.xml
out_step01=step01

And here is the volumes section of the docker-compose.yaml where I have added local directories. I added the last line with docker.sock because I read this answer: How to run a docker operator task from airflow which itself runs in a docker container?

I felt that it aligned with what I was trying to do.

volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
    - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
    - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
    - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
    - ${AIRFLOW_PROJ_DIR:-.}/scripts:/opt/airflow/scripts
    - ${AIRFLOW_PROJ_DIR:-.}/src:/opt/airflow/src
    - ${AIRFLOW_PROJ_DIR:-.}/write_storage:/opt/airflow/tasks/write_storage
    - ${AIRFLOW_PROJ_DIR:-.}/snap_graphs:/opt/airflow/snap_graphs
    - /var/run/docker.sock:/var/run/docker.sock

My dag looks as follows:

import os
from datetime import timedelta, datetime
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
from airflow.providers.docker.operators.docker import DockerOperator

@dag(
    dag_id="SAR_flooding_demo_docker",
    start_date=datetime(24, 1, 15),
    schedule="@continuous",
    max_active_runs=1,
    catchup=False,
    default_args={
        "retries":0,
        "retry_delay": timedelta(minutes=1)
    },
    description="Testing containerized demo",
    tags=["Test"]
)
def demo_runner():

    @task
    def task_01():
        #t1=BashOperator(
        #    task_id="Task01",
        #    bash_command='/opt/airflow/scripts/ex-my-script.sh ')
        t1 = DockerOperator(
        task_id="Task01",
        image="twm_step01",
        api_version='auto',
        auto_remove=True,
        command='echo "this is a test message shown from within the container',
        docker_url='unix://var/run/docker.sock',
        network_mode='bridge'
        )
        return

    task_01()

demo_runner()

I have tried both BashOperator and DockerOperator. The DAG is scheduled fine and does not fail, but I suspect something is wrong since it completes in less than a second. I am also looking for a way to check that the data output by the task is what I expect it to be.

I am very new to airflow and docker so I am just trying anything I can think of.

I have put paths to where my bash script and input/output data are located/should be located on my local machine in the volumes: section of the docker-compose.yaml

If the task cannot execute the bash script, why doesn't it fail? If the task can execute the bash script, why does it succeed in less than a second?

Here is the log from one of the 'successful' task runs:

bdb1f78ac8d2
*** Found local files:
***   * /opt/airflow/logs/dag_id=SAR_flooding_demo_docker/run_id=scheduled__2024-03-04T10:00:06.056780+00:00/task_id=task_01/attempt=1.log
[2024-03-04, 10:00:08 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: SAR_flooding_demo_docker.task_01 scheduled__2024-03-04T10:00:06.056780+00:00 [queued]>
[2024-03-04, 10:00:08 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: SAR_flooding_demo_docker.task_01 scheduled__2024-03-04T10:00:06.056780+00:00 [queued]>
[2024-03-04, 10:00:08 UTC] {taskinstance.py:2193} INFO - Starting attempt 1 of 1
[2024-03-04, 10:00:08 UTC] {taskinstance.py:2214} INFO - Executing <Task(_PythonDecoratedOperator): task_01> on 2024-03-04 10:00:06.056780+00:00
[2024-03-04, 10:00:08 UTC] {standard_task_runner.py:60} INFO - Started process 699 to run task
[2024-03-04, 10:00:08 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'SAR_flooding_demo_docker', 'task_01', 'scheduled__2024-03-04T10:00:06.056780+00:00', '--job-id', '296', '--raw', '--subdir', 'DAGS_FOLDER/SAR_flooding_demonstator_dag.py', '--cfg-path', '/tmp/tmpd36mdp9m']
[2024-03-04, 10:00:08 UTC] {standard_task_runner.py:88} INFO - Job 296: Subtask task_01
[2024-03-04, 10:00:09 UTC] {task_command.py:423} INFO - Running <TaskInstance: SAR_flooding_demo_docker.task_01 scheduled__2024-03-04T10:00:06.056780+00:00 [running]> on host bdb1f78ac8d2
[2024-03-04, 10:00:09 UTC] {taskinstance.py:2510} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='SAR_flooding_demo_docker' AIRFLOW_CTX_TASK_ID='task_01' AIRFLOW_CTX_EXECUTION_DATE='2024-03-04T10:00:06.056780+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-03-04T10:00:06.056780+00:00'
[2024-03-04, 10:00:09 UTC] {python.py:202} INFO - Done. Returned value was: None
[2024-03-04, 10:00:09 UTC] {taskinstance.py:1149} INFO - Marking task as SUCCESS. dag_id=SAR_flooding_demo_docker, task_id=task_01, execution_date=20240304T100006, start_date=20240304T100008, end_date=20240304T100009
[2024-03-04, 10:00:09 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 0
[2024-03-04, 10:00:09 UTC] {taskinstance.py:3309} INFO - 0 downstream tasks scheduled from follow-on schedule check```
1

There are 1 answers

0
Andrey Anshin On

You can't run one task inside of another, inner one do not executed because Airflow scheduler/worker do not know about it.

In your case you could directly use task flow operators, e.g. docker and

Task Docker Decorator

@task.docker(...)
def task_01():
    ...

In Airflow 2.9.0 (not released yet) it will able to use Task Bash Decorator

@task.bash(...)
def task_01():
    ...

Or use classical operators

BashOperator(
    task_id="task_01",
    ...
)
DockerOperator(
    task_id="task_01",
    ...
)