APScheduler task not firing due to eventlet monkey_patch

691 views Asked by At

I have some python code in which an APScheduler job is not firing. As context, I also have a handler that is looking a directory for file modifications in addition using eventlet/GreenPool to do multi-threading. Based on some troubleshooting, it seems like there's some sort of conflict between APScheduler and eventlet.

My output looks as follows:

2016-12-26 02:30:30 UTC (+0000): Finished Download Pass
2016-12-26 02:46:07 UTC (+0000): EXITING due to control-C or other exit signal
Jobstore default:
Time-Activated Download (trigger: interval[0:05:00], next run at: 2016-12-25 18:35:00 PST) 2016-12-26 02:46:07 UTC (+0000): 1

(18:35 PST = 02:35 UTC)...so it should have fired 11 minutes before I pressed control-C

from apscheduler import events ## pip install apscheduler
from apscheduler.schedulers.background import BackgroundScheduler

# Threading
from eventlet import patcher, GreenPool ## pip install eventlet
patcher.monkey_patch(all = True)

def setSchedule(scheduler, cfg, minutes = 60*2, hours = 0):
  """Set up the schedule of how frequently a download should be attempted.
  scheduler object must already be declared.
  will accept either minutes or hours for the period between downloads"""
  if hours > 0:
    minutes = 60*hours if minutes == 60 else 60*hours+minutes
  handle = scheduler.add_job(processAllQueues,
                            trigger='interval',
                            kwargs={'cfg': cfg},
                            id='RQmain',
                            name='Time-Activated Download',
                            coalesce=True,
                            max_instances=1,
                            minutes=minutes,
                            start_date=dt.datetime.strptime('2016-10-10 00:15:00', '%Y-%m-%d %H:%M:%S') # computer's local time
  )
  return handle

def processAllQueues(cfg):
  SQSpool = GreenPool(size=int(cfg.get('GLOBAL','Max_AWS_Connections')))
  FHpool = GreenPool(size=int(cfg.get('GLOBAL','Max_Raw_File_Process')))
  arSects = []
  dGlobal = dict(cfg.items('GLOBAL'))
  for sect in filter(lambda x: iz.notEqualz(x,'GLOBAL','RUNTIME'),cfg.sections()):
    dSect = dict(cfg.items(sect)) # changes all key names to lowercase
    n = dSect['sqs_queue_name']
    nn = dSect['node_name']
    fnbase = "{}_{}".format(nn,n)
    dSect["no_ext_file_name"] = os.path.normpath(os.path.join(cfg.get('RUNTIME','Data_Directory'),fnbase))
    arSects.append(mergeTwoDicts(dGlobal,dSect)) # section overrides global
  arRes = []
  for (que_data,spec_section) in SQSpool.imap(doQueueDownload,arSects): 
    if que_data: fileResult = FHpool.spawn(outputQueueToFiles,spec_section,que_data).wait()
    else: fileResult = (False,spec_section['sqs_queue_name'])
    arRes.append(fileResult)
  SQSpool.waitall()
  FHpool.waitall()
  pr.ts_print("Finished Download Pass")
  return None

def main():
  cfgglob = readConfigs(cfgdir, datdir)
  sched = BackgroundScheduler()
  cron_job = setSchedule(sched, cfgglob, 5)  
  sched.start(paused=True)
  try:
    change_handle = win32file.FindFirstChangeNotification(cfgdir, 0, win32con.FILE_NOTIFY_CHANGE_FILE_NAME | win32con.FILE_NOTIFY_CHANGE_LAST_WRITE)
    processAllQueues(cfgglob)
    sched.resume() # turn the scheduler back on and monitor both wallclock and config directory.
    cron_job.resume()
    while 1:
      SkipDownload = False
      result = win32event.WaitForSingleObject(change_handle, 500)
      if result == win32con.WAIT_OBJECT_0: # If the WaitForSO returned because of a notification rather than error/timing out
        sched.pause() # make sure we don't run the job as a result of timestamp AND file modification
        while 1:
          try:
            win32file.FindNextChangeNotification(change_handle) # rearm - done at start because of the loop structure here
            cfgglob = None
            cfgglob = readConfigs(cfgdir,datdir)
            cron_job.modify(kwargs={'cfg': cfgglob}) # job_id="RQmain",
            change_handle = win32file.FindFirstChangeNotification(cfgdir, 0, win32con.FILE_NOTIFY_CHANGE_FILE_NAME | win32con.FILE_NOTIFY_CHANGE_LAST_WRITE) # refresh handle
            if not SkipDownload: processAllQueues(cfgglob)
            sched.resume()
            cron_job.resume()
            break
  except KeyboardInterrupt:
    if VERBOSE | DEBUG: pr.ts_print("EXITING due to control-C or other exit signal")
  finally:
    sched.print_jobs()
    pr.ts_print(sched.state)
    sched.shutdown(wait=False)

If I comment out most of the processAllQueues function along with the eventlet includes at top, it fires appropriately. If I keep the

from eventlet import patcher, GreenPool ## pip install eventlet
patcher.monkey_patch(all = True)

but comment out processAllQueues up to the print line in the second-to-last line, it fails to fire the APScheduler, indicating that there's either a problem with importing patcher and GreenPool or with the monkey_patch statement. Commenting out the patcher.monkey_patch(all = True) makes it "work" again.

Does anyone know what an alternate monkey_patch statement would be that would work in my circumstances?

1

There are 1 answers

0
temoto On

You have an explicit event loop watching for file changes. That blocks eventlet event loop from running. You have two options:

  • Wrap blocking calls (such as win32event.WaitForSingleObject()) in eventlet.tpool.execute()
  • Run eventlet.sleep() before/after blocking calls and make sure you don't block for too long.

eventlet.monkey_patch(thread=False) is shorter alternative to listing every other module as true. Generally you want thread=True when using locks or thread-local storage or threading API to spawn green threads. You may want thread=False if you genuinely use OS threads, like for funny GUI frameworks.

You shouldn't really consider Eventlet on Windows for running important projects. Performance is much inferior against POSIX. I didn't run tests on Windows since 0.17. It's rather for ease of development on popular desktop platform.