Apache Airflow: DAG executed twice before start_date

3.5k views Asked by At

.Hi Everyone,

From the Airflow UI, we are trying to understand how to start a DAG run in the future at a specific time, but we always get 2 additional runs in catch-up mode (even though catch-up is disabled)

Example

Create a DAG run with the below parameters

  • start_date: 10:30
  • execution_date: not defined
  • interval = 3 minutes (from the .py file)
  • catchup_by_default = False

Turn the ON switch at Current time: 10:28. What we get is Airflow triggers 2 DAG runs with execution_date at:

  • 10:24
  • 10:27

and these 2 DAG runs are run in catch-up mode one after the other, and that's not what we want :-(

What are we doing wrong? We maybe understand the 10:27 run (ETL concept), but we do not get the 10:24 one :-(

Thank you for the help :-)

DETAILS:

OS: RedHat 7

Python: 2.7

Airflow: v1.8.0

DAG python file:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
     'owner': 'aa',
     'depends_on_past': False,
     'start_date': datetime(2017, 9, 7, 10, 30),
     'run_as_user': 'aa'
}

dag = DAG(
    'dag3', default_args=default_args, schedule_interval=timedelta(minutes=3))
dag.catchup = False

create_command = "/script.sh "

t1 = BashOperator(
    task_id='task',
    bash_command='date',
    dag=dag)
3

There are 3 answers

0
Chris269 On

I tried with Airflow v.1.8.0, python v.3.5, db on SQLite. The following DAG, unpaused at 10:28, is quite similar to yours, and works as it should (only one run, at 10:33, for 10:30).

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

def print_hello_3min():
    return ('Hello world! %s' % datetime.now())

dag = DAG('hello_world_3min', description='Simple tutorial DAG 3min',
          schedule_interval='*/3 * * * *',
          start_date=datetime(2017, 9, 18, 10, 30),
          catchup=False)

dummy_operator = DummyOperator(task_id='dummy_task_3min', retries=3, dag=dag)

hello_operator = PythonOperator(task_id='hello_task_3min',
                                python_callable=print_hello_3min, dag=dag)

dummy_operator >> hello_operator
0
Phariyawit Chaiparitte On

Written with StackEdit.

I'm not sure about my solution whether good enough, but I'd like to present my understanding. There are 2 things to consider together:

  1. schedule_interval mode, such as 'hourly', 'daily', 'weekly','annually'.

    • hourly = (* 1 * * *) = “At every minute past hour 1.”
    • daily = (0 1 * * *) = “At 01:00.”
    • monthly = (0 1 1 * *) = “At 01:00 on day-of-month 1.”
  2. start_date

    • hourly = datetime(2019, 4, 5, 1, 30)
    • daily = datetime(2019, 4, 5)
    • monthly = datetime(2019, 4, 1)

My strategy is to set [start_date] by doing minus the expecting start date & time by the 1 unit of your interval mode.

Example:

  1. To start the first job at 2019-4-5 01:00 and the interval are hourly.

    • schedule_interval mode = hourly
    • expecting start datetime = 2019-4-5 01:00
    • so, start_date = 2019-4-5 00:00
    • minus hour by 1 hour
    • CRON = ( * 1 * * * ) which means “At every minute past hour 1.”
    default_args = {
         'owner': 'aa',
         'depends_on_past': False,
         'start_date': datetime(2019, 4, 5, 0, 0),
         'run_as_user': 'aa'
    }    
    dag = DAG(
        'dag3', default_args=default_args, catchup = False, schedule_interval='* 1 * * *')
  1. To start the first job at 2019-4-5 01:00 and the interval are daily.

    • schedule_interval mode = daily
    • expecting start datetime date = 2019-4-5 01:00
    • so, start_date = 2019-4-4
    • minus day by 1 day
    • CRON = ( 0 1 * * * ) which means “At 01:00.”
    default_args = {
        'owner': 'aa',
        'depends_on_past': False,
        'start_date': datetime(2019, 4, 4),
        'run_as_user': 'aa'
    }

    dag = DAG(
        'dag3', default_args=default_args, catchup = False, schedule_interval='0 1 * * *')
  1. To start the first job at 2019-4-5 01:00 and the interval are monthly.

    • schedule_interval mode = monthly
    • expecting start datetime date = 2019-4-5 01:00
    • so, start_date = 2019-4-4
    • minus day by 1 day
    • CRON = ( 0 1 1 * * ) which means “At 01:00 on day-of-month 1.”
    default_args = {
         'owner': 'aa',
         'depends_on_past': False,
         'start_date': datetime(2019, 4, 4),
         'run_as_user': 'aa'
    }

    dag = DAG(
        'dag3', default_args=default_args, catchup = False, schedule_interval='0 1 1 * *')

So far, the strategy is useful for me, but if anyone got better, please kindly share.

PS. I'm using [https://crontab.guru] to generate a perfect cron-schedule.

0
Mike Precup On

This appears to happen exclusively when providing a timedelta as a schedule. Switch your schedule interval to be cron formatted and it won't run twice anymore.