Quartz scheduler job schedules more than 1 cron job

65 views Asked by At

I am writing a code inside a Spring Boot App to run a cron job everyday once (for test purpose I am running it each second).
The job should fetch a list of users and send them an email.
I wrote as well a re-try mechanism inside it, so once the cron job fails or something goes wrong between, the job should retry the same functionality 3 times, if it doesn't work so it should be skipped.
Here the code to schedule the job inside the main method:

JobBuilder jobBuilder = JobBuilder.newJob(UserEmailJob.class)
            .usingJobData(UserEmailJob.MAX_RETRY_ATTEMPTS_KEY, "0")
            .requestRecovery(true);
scheduler.scheduleCronJob(jobBuilder, UserEmailJob.JOB_NAME, UserEmailJob.GROUP_NAME, UserEmailJob.CRON_EXP, new Date());

Here the job definition:

@Service
@DisallowConcurrentExecution
@RequiredArgsConstructor
@Slf4j
public class UserEmailJob implements Job {

    @Autowired
    private EmailService emailService;

    public static final String JOB_NAME = "JOB_USER_EMAIL"
    public static final String CRON_EXP = "* * * * * ?";
    public static final String GROUP_NAME = "GROUP1";
    public static final String MAX_RETRY_ATTEMPTS_KEY = "retryCount";
    private static final int DEFAULT_MAX_RETRIES = 3;

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
        int retryCount = jobDataMap.containsKey(MAX_RETRY_ATTEMPTS_KEY) ? jobDataMap.getInt(MAX_RETRY_ATTEMPTS_KEY) : 0;

        try {
            sendEmails();
        } catch (Exception e) {
            handleJobExecutionException(context, e, retryCount);
        }
    }

    private void handleJobExecutionException(JobExecutionContext context, Exception e, int retryCount) throws JobExecutionException {
        if (retryCount >= getMaxRetriesForJob()) {
            log.error("Job has exceeded allowed retries. Notifying.");
            // Terminate the job after exceeding the maximum retry attempts
            return;
        } else {
            log.error("Job is being retried for {} time", retryCount);
            retryCount++;
            JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
            jobDataMap.put(MAX_RETRY_ATTEMPTS_KEY, "" + retryCount);

            JobExecutionException ex = new JobExecutionException(e);
            ex.setRefireImmediately(true);
            throw ex;
        }
    }

    protected void sendEmails() throws JobExecutionException {
        log.info("Send emails job started: {}.", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        emailService.sendEmails();
    }

    protected int getMaxRetriesForJob() {
        return DEFAULT_MAX_RETRIES;
    }
}

In my case, we have a common scheduler class which we use to schedule jobs/tasks:

@Service
public class MicroSchedulerImpl implements MicroScheduler {
    private static final Logger Log = LoggerFactory.getLogger(MicroScheduler.class);

    private Scheduler _scheduler;

    @Override
    public void schedule (JobBuilder jobBuilder, String jobNamePrefix, Date startAt) {
        schedule(jobBuilder, jobNamePrefix + "-" + UUID.randomUUID(), null, startAt);
    }

    @Override
    public void schedule(JobBuilder jobBuilder, String jobName, String grpName, Date startAt) {
        //Create the job detail
        JobDetail jobDetail = jobBuilder
            .storeDurably(true)
            .withIdentity(jobName, grpName).build();

        schedule(jobDetail, startAt);
    }

    public void schedule(JobDetail jobDetail, Date startAt) {
        try {
            Log.info("Scheduling job with name {} at {}", jobDetail.getKey().getName(), startAt);
            // Stores job and trigger. Will fail if job already exists.
            getScheduler().scheduleJob(jobDetail, buildTriggerForJob(jobDetail, startAt));
        } catch (SchedulerException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void reschedule(JobDetail jobDetail, Date startAt) {
        try {
            Log.info("Rescheduling job with name {} at {}", jobDetail.getKey().getName(), startAt);
            // Stores only trigger. Job must exist. Used for rescheduling.
        getScheduler().scheduleJob(buildTriggerForJob(jobDetail, startAt));
        } catch (SchedulerException e) {
            throw new RuntimeException(e);
        }

    }

    private Trigger buildTriggerForJob(JobDetail jobDetail, Date startAt) {
        String jobName = jobDetail.getKey().getName();
        String grpName = jobDetail.getKey().getGroup();

        String triggerName = jobName + "-trigger";

        //Create the trigger
        return newTrigger()
            .withIdentity(triggerName, grpName)
            .forJob(jobDetail)
            .startAt(startAt).build();

    }

    @Override
    public Scheduler getScheduler ()
    {
        if (_scheduler == null) {
            _scheduler = Environment.getBean(Scheduler.class);
        }
        return _scheduler;
    }

    @Override
    public void scheduleCronJob (JobBuilder jobBuilder, String jobName, String grpName, String cronExp, Date startAt)
    {
        scheduleCronJob(jobBuilder, jobName, grpName, cronExp, startAt, false);
    }

    @Override
    public void scheduleCronJob (JobBuilder jobBuilder, String jobName, String grpName, String cronExp, Date startAt,
                             boolean force)
    {
        try {
            JobKey jobKey = findJobKey(jobName, grpName);

            if(jobKey == null || force) {
                Log.info("Scheduling job with name {} with cron expression: {} ", jobName, cronExp);

                if(jobKey != null) {
                    getScheduler().deleteJob(jobKey);
                }

                //Create the job detail
                JobDetail jobDetail = jobBuilder
                    .storeDurably(true)
                    .withIdentity(jobName, grpName).build();

                String triggerName = jobName + "-trigger";

                //Create the trigger
                Trigger trigger = newTrigger()
                    .withIdentity(triggerName, grpName)
                    .withSchedule(cronSchedule(cronExp))
                    .startAt(startAt)
                    .build();
                getScheduler().scheduleJob(jobDetail, trigger);
            }
        } catch (SchedulerException e) {
            throw new RuntimeException(e);
        }

    }

    public boolean checkExistsCronJobWithSchedule(String jobName, String grpName, String cronExpression) {
        JobKey jobKey = findJobKey(jobName, grpName);

        if (jobKey == null) {
            Log.debug("Did not find Quartz job for name {}, group {}", jobName, grpName);
            return false;
        }

        try {
            List<Trigger> triggers = (List<Trigger>) getScheduler().getTriggersOfJob(jobKey);
            Log.debug("Found {} Quartz triggers for the job {}, group {}", triggers.size(), jobName, grpName);
            for (Trigger trigger: triggers) {
                if (trigger instanceof CronTrigger) {
                    CronTrigger cronTrigger = (CronTrigger) trigger;
                    String triggerCronExpression = cronTrigger.getCronExpression();
                    Log.debug("Checking trigger with expression {}", triggerCronExpression);
                    if (triggerCronExpression.equals(cronExpression)) {
                        Log.debug("Matching trigger found.");
                        return true;
                    }
                }
            }

        } catch (SchedulerException ex) {
            Log.warn(String.format("Cannot get job triggers for job key: %s", jobKey), ex);
            return false;
        }

        Log.info("Trigger for job {} group {} with schedule '{}' was not found.", jobName, grpName, cronExpression);
        return false;
    }

    private JobKey findJobKey(String jobName, String grpName) {
        // Check running jobs first
        try {
            for (JobExecutionContext runningJob : getScheduler().getCurrentlyExecutingJobs()) {
                if (Objects.equals(jobName, runningJob.getJobDetail().getKey().getName())) {
                    return runningJob.getJobDetail().getKey();
                }
            }
            // Check all jobs if not found
            Set<JobKey> jobKeys = getScheduler().getJobKeys(GroupMatcher.jobGroupEquals(grpName));
            for (JobKey jobKey : jobKeys) {
                if (Objects.equals(jobName, jobKey.getName())) {
                    return jobKey;
                }
            }
        }
        catch(SchedulerException e) {
            Log.error("Unable to get job");
        }

        return null;
    }
}

Everything starts fine, and to test that everything works really fine, I changed something in my service to throw an exception.
The retry mechanism went fine but after 3 retries (as I did in my code), the cron job continues to throw the exception without stop and I see the whole time the message Job has exceeded allowed retries. Notifying.
I couldn't figure out how to continue working without throwing the exception and just skipping the recording causing the issue (one of the email could not be sent).
I also saw that once I stop the application and I starts it again, and the last time the max attempts was exceeded, the counter won't start from 0 again.

0

There are 0 answers