create apscheduler job trigger from cron expression

4.1k views Asked by At

I'm trying to run some scheduled jobs using cron expressions in python. I'm new to python and I've already worked with quartz scheduler in java to achieve almost the same thing. Right now, I am trying to work with apscheduler in python. I know that it is possible to do this using

crontrig = CronTrigger(minute='*', second = '*');

But, I was working with cron expressions (like "0/5 * * * * *") and I would like to know if there is anything which could directly parse the expression and generate a CronTrigger.

4

There are 4 answers

1
Alex Grönholm On BEST ANSWER

Given that APScheduler supports a slightly different set of fields, it's not immediately obvious how those expressions would map to CronTrigger's arguments.

I should also point out that the preferred method of scheduling jobs does not involve directly instantiating triggers, but instead giving the arguments to add_job() instead.

If you want to do that yourself, you could simply split the expression and map the elements to whichever trigger arguments you want.

0
ofirule On

Made small changes given @raghu-venmarathoor code

def get_trigger(expression):
    # type: (str) -> CronTrigger
    """
    Evaluates a CronTrigger obj from cron expression
    :param expression: String representing the crons five first fields, e.g : '* * * * *'
    :return: A CronTrigger
    """
    vals = expression.split()
    vals = [(None if w == '?' else w) for w in vals]
    return CronTrigger(minute=vals[0], hour=vals[1], day=vals[2], month=vals[3], day_of_week=vals[4])

UPDATE

In version 3.5.0 the following method was added:

CronTrigger.from_crontab()
0
Raghu Venmarathoor On

As @Alex mentioned. I've created a function to map values to trigger.

The below function will return a tuple in the order year, month, day, week, day_of_week, hour, minute, second. Expecting some suggestions.

def evaluate(self, expression):
    '''
    order of values
    year, month, day, week, day_of_week, hour, minute, second, start_date, end_date, timezone
    '''
    splitValues = expression.split();
    for i in range(0,8):
        if (i == 0):
            if (splitValues[0] == '?'):
                year = None;
            else:
              year = splitValues[0];
        if (i == 1):
            if (splitValues[1] == '?'):
                month = None;
            else:
                month = splitValues[1];
        if (i == 2):
            if (splitValues[2] == '?'):
                day = None;
            else:
                day = splitValues[2];
        if (i == 3):
            if (splitValues[3] == '?'):
                week = None;
            else:
              week = splitValues[3];
        if (i == 4):
            if (splitValues[4] == '?'):
                day_of_week = None;
            else:
                day_of_week = splitValues[4];
        if (i == 5):
            if (splitValues[5] == '?'):
                hour = None;
            else:
                hour = splitValues[5];
        if (i == 6):
            if (splitValues[6] == '?'):
                minute = None;
            else:
                minute = splitValues[6];
        if (i == 7):
            if (splitValues[7] == '?'):
                second = None;
            else:
                second = splitValues[7];
    return year, month, day, week, day_of_week, hour, minute, second;

I've created a createTrigger function by using this expression

def getTrigger(self,cronExpression): 
    year, month, day, week,  day_of_week, hour, minute, second = evaluate(cronExpression);
    trigger = CronTrigger(year, month, day, week, day_of_week, hour, minute, second)
    return trigger;
0
Andrey On

You can use

from apscheduler.triggers.cron import CronTrigger
trigger = CronTrigger.from_crontab(cron_string)