I have some python code in which an APScheduler job is not firing. As context, I also have a handler that is looking a directory for file modifications in addition using eventlet/GreenPool to do multi-threading. Based on some troubleshooting, it seems like there's some sort of conflict between APScheduler and eventlet.
My output looks as follows:
2016-12-26 02:30:30 UTC (+0000): Finished Download Pass
2016-12-26 02:46:07 UTC (+0000): EXITING due to control-C or other exit signal
Jobstore default:
Time-Activated Download (trigger: interval[0:05:00], next run at: 2016-12-25 18:35:00 PST) 2016-12-26 02:46:07 UTC (+0000): 1
(18:35 PST = 02:35 UTC)...so it should have fired 11 minutes before I pressed control-C
from apscheduler import events ## pip install apscheduler
from apscheduler.schedulers.background import BackgroundScheduler
# Threading
from eventlet import patcher, GreenPool ## pip install eventlet
patcher.monkey_patch(all = True)
def setSchedule(scheduler, cfg, minutes = 60*2, hours = 0):
"""Set up the schedule of how frequently a download should be attempted.
scheduler object must already be declared.
will accept either minutes or hours for the period between downloads"""
if hours > 0:
minutes = 60*hours if minutes == 60 else 60*hours+minutes
handle = scheduler.add_job(processAllQueues,
trigger='interval',
kwargs={'cfg': cfg},
id='RQmain',
name='Time-Activated Download',
coalesce=True,
max_instances=1,
minutes=minutes,
start_date=dt.datetime.strptime('2016-10-10 00:15:00', '%Y-%m-%d %H:%M:%S') # computer's local time
)
return handle
def processAllQueues(cfg):
SQSpool = GreenPool(size=int(cfg.get('GLOBAL','Max_AWS_Connections')))
FHpool = GreenPool(size=int(cfg.get('GLOBAL','Max_Raw_File_Process')))
arSects = []
dGlobal = dict(cfg.items('GLOBAL'))
for sect in filter(lambda x: iz.notEqualz(x,'GLOBAL','RUNTIME'),cfg.sections()):
dSect = dict(cfg.items(sect)) # changes all key names to lowercase
n = dSect['sqs_queue_name']
nn = dSect['node_name']
fnbase = "{}_{}".format(nn,n)
dSect["no_ext_file_name"] = os.path.normpath(os.path.join(cfg.get('RUNTIME','Data_Directory'),fnbase))
arSects.append(mergeTwoDicts(dGlobal,dSect)) # section overrides global
arRes = []
for (que_data,spec_section) in SQSpool.imap(doQueueDownload,arSects):
if que_data: fileResult = FHpool.spawn(outputQueueToFiles,spec_section,que_data).wait()
else: fileResult = (False,spec_section['sqs_queue_name'])
arRes.append(fileResult)
SQSpool.waitall()
FHpool.waitall()
pr.ts_print("Finished Download Pass")
return None
def main():
cfgglob = readConfigs(cfgdir, datdir)
sched = BackgroundScheduler()
cron_job = setSchedule(sched, cfgglob, 5)
sched.start(paused=True)
try:
change_handle = win32file.FindFirstChangeNotification(cfgdir, 0, win32con.FILE_NOTIFY_CHANGE_FILE_NAME | win32con.FILE_NOTIFY_CHANGE_LAST_WRITE)
processAllQueues(cfgglob)
sched.resume() # turn the scheduler back on and monitor both wallclock and config directory.
cron_job.resume()
while 1:
SkipDownload = False
result = win32event.WaitForSingleObject(change_handle, 500)
if result == win32con.WAIT_OBJECT_0: # If the WaitForSO returned because of a notification rather than error/timing out
sched.pause() # make sure we don't run the job as a result of timestamp AND file modification
while 1:
try:
win32file.FindNextChangeNotification(change_handle) # rearm - done at start because of the loop structure here
cfgglob = None
cfgglob = readConfigs(cfgdir,datdir)
cron_job.modify(kwargs={'cfg': cfgglob}) # job_id="RQmain",
change_handle = win32file.FindFirstChangeNotification(cfgdir, 0, win32con.FILE_NOTIFY_CHANGE_FILE_NAME | win32con.FILE_NOTIFY_CHANGE_LAST_WRITE) # refresh handle
if not SkipDownload: processAllQueues(cfgglob)
sched.resume()
cron_job.resume()
break
except KeyboardInterrupt:
if VERBOSE | DEBUG: pr.ts_print("EXITING due to control-C or other exit signal")
finally:
sched.print_jobs()
pr.ts_print(sched.state)
sched.shutdown(wait=False)
If I comment out most of the processAllQueues function along with the eventlet includes at top, it fires appropriately. If I keep the
from eventlet import patcher, GreenPool ## pip install eventlet
patcher.monkey_patch(all = True)
but comment out processAllQueues up to the print line in the second-to-last line, it fails to fire the APScheduler, indicating that there's either a problem with importing patcher and GreenPool or with the monkey_patch statement. Commenting out the patcher.monkey_patch(all = True)
makes it "work" again.
Does anyone know what an alternate monkey_patch statement would be that would work in my circumstances?
You have an explicit event loop watching for file changes. That blocks eventlet event loop from running. You have two options:
win32event.WaitForSingleObject()
) ineventlet.tpool.execute()
eventlet.sleep()
before/after blocking calls and make sure you don't block for too long.eventlet.monkey_patch(thread=False)
is shorter alternative to listing every other module as true. Generally you wantthread=True
when using locks or thread-local storage or threading API to spawn green threads. You may wantthread=False
if you genuinely use OS threads, like for funny GUI frameworks.You shouldn't really consider Eventlet on Windows for running important projects. Performance is much inferior against POSIX. I didn't run tests on Windows since 0.17. It's rather for ease of development on popular desktop platform.