My scheduler in my Flask smart home app with a micro service architecture is not working as intended

27 views Asked by At

I have hosted my main flask UI app and the flask Scheduler app on a single T2.micro instance and I am facing an issue that the scheduler is not publishing the events as intended. It is storing the data in the database but not publishing it .

I have created a microservice which has two threads. The main thread is the flask app and the secondary thread is the scheduler. I have used schedule library from python to implement a basic scheduler which schedules the events that are received through an api. The scheduler also updates the database simultaneously. The issue is that the scheduler is scheduling the task and also storing in the database but not publishing it, despite of mentioning the correct api.

here is the code of the scheduler_func.py

import requests
import schedule
from . import app
from . api_urls import urls
from .models import db, Schedules
from datetime import datetime
import time

def get_schedule_id():
    """
    This function is used to get the next available schedule ID.
    """
    last_schedule = db.session.query(Schedules).order_by(Schedules.id.desc()).first()
    if last_schedule is None:
        return 1001
    else:
        return last_schedule.id+1

schedule_id = 1001

def scheduler_thread():
    """
    This function is a background thread that runs the schedule library to periodically execute scheduled tasks.
    It continuously loops, checking if any scheduled tasks are due to run, and if so, executes them.
    """
    with app.app_context():
        while True:
            schedule.run_pending()
            time.sleep(1)


def publish_job(message, topic):
    """
    This function is used to publish a message to a specific MQTT topic.

    Args:
        message (str): The message to be published.
        topic (str): The MQTT topic to which the message should be published.

    Returns:
        requests.models.Response: The response from the MQTT server.
    """
    response = requests.get(urls['publish_schedule']+f'?topic={topic}&pub_message={message}')

def do_job(message,topic):
    with app.app_context():
        publish_job(message,topic)

def do_once(message, topic, schedule_id):
    """
    This function is used to publish a message to a specific MQTT topic once, and then delete the schedule.

    Args:
        message (str): The message to be published.
        topic (str): The MQTT topic to which the message should be published.
        schedule_id (int): The ID of the schedule.

    Returns:
        schedule.CancelJob: A cancel job that can be used to cancel the scheduled task.
    """
    with app.app_context():
        publish_job(message, topic)
        delete_schedule(schedule_id)
    return schedule.CancelJob

def create_schedule(data):
    """
    This function creates a new schedule in the database and schedules it to run.

    Args:
        data (dict): The data required to create the schedule.

    Returns:
        dict: An acknowledgment indicating whether the schedule was created successfully.

    """
    global schedule_id

    acknowledgment = { 'success': True,
                            'code': 200,
                            'message': 'received'}
    scheduled_message = data['data']['message']
    scheduled_topic = data['topic']
    scheduled_time = data['data']['time']
    scheduled_repeat = data['data']['repeat']
    scheduled_days = data['data']['days']

    

    if scheduled_repeat == 'once':

        schedule.every().day.at(scheduled_time).do(do_once, scheduled_message,scheduled_topic,schedule_id).tag(schedule_id)
    
    elif scheduled_repeat == 'everyday' and scheduled_days:
        for day in scheduled_days:
            if day == 'sun':
                schedule.every().sunday.at(scheduled_time).do(do_job, scheduled_message,scheduled_topic).tag(schedule_id)
            elif day == 'mon':
                schedule.every().monday.at(scheduled_time).do(do_job, scheduled_message,scheduled_topic).tag(schedule_id)
            elif day == 'tue':
                schedule.every().tuesday.at(scheduled_time).do(do_job, scheduled_message,scheduled_topic).tag(schedule_id)
            elif day == 'wed':
                schedule.every().wednesday.at(scheduled_time).do(do_job, scheduled_message,scheduled_topic).tag(schedule_id)
            elif day == 'thu':
                schedule.every().thursday.at(scheduled_time).do(do_job, scheduled_message,scheduled_topic).tag(schedule_id)
            elif day == 'fri':
                schedule.every().friday.at(scheduled_time).do(do_job, scheduled_message,scheduled_topic).tag(schedule_id)
            elif day == 'sat':
                schedule.every().saturday.at(scheduled_time).do(do_job, scheduled_message,scheduled_topic).tag(schedule_id)

    days = ''
    for day in scheduled_days:
        days += day + ' '

    repeat = True if scheduled_repeat == 'everyday' else False

    db.session.add(Schedules(id = schedule_id, topic = scheduled_topic, message = scheduled_message, repeat = repeat, Days = days, Time = scheduled_time))
    db.session.commit()
    schedule_id = get_schedule_id()


    return {'ack':acknowledgment}

def delete_schedule(schedule_id):
    """
    Delete a schedule from the database and clear the schedule from the schedule library.

    Args:
        schedule_id (int): The ID of the schedule to be deleted.

    Returns:
        None

    """
    with app.app_context():
        db.session.query(Schedules).filter(Schedules.id == schedule_id).delete()
        db.session.commit()
        schedule.clear(schedule_id)

here is the code for routing

from flask import request, jsonify
import requests
import json
from .schedule_func import create_schedule, delete_schedule
from . import app
from .models import db, Schedules
from .api_urls import urls 


@app.route('/get_schedule',methods = ['GET', 'POST'])
def get_schedule():
    if request.method == 'POST':
        data = request.json
        acknowledgment = create_schedule(data)['ack']
    return jsonify(acknowledgment)

@app.route('/send_data',methods = ['GET', 'POST'])
def send_data():
    """
    This function is used to send data to the database.

    Parameters:
        topic (str): The topic of the data to be sent.

    Returns:
        dict: A JSON object containing the status, message, topic, and data.
    """
    if request.method == 'GET':
        topic = request.args.get('topic')
        if topic == '':
            data = db.session.query(Schedules).all()
        else:
            data = db.session.query(Schedules).filter(Schedules.topic == topic).all()
        
        data_list = [element.to_dict() for element in data]
        packet = {
            'status' : True,
            'message' : 'success',
            'topic' : topic,
            'data' : data_list
        }
    return jsonify(packet)
        
@app.route('/delete_data', methods = ['GET'])
def delete_data():
    """
    Delete a schedule from the database

    Args:
        schedule_id (int): The id of the schedule to delete

    Returns:
        dict: A JSON object with a status key indicating success or failure, and a message key indicating the result of the operation
    """
    if request.method == 'GET':
        schedule_id = request.args.get('schedule_id')
        delete_schedule(int(schedule_id))
        
    return jsonify({'status' : True, 'message' : 'schedule_deleted'})

here is the main file

from scheduler_app import app,db
import threading
import time
from scheduler_app.schedule_func import scheduler_thread



def driver_function():
    db.create_all()
    worker_thread = threading.Thread(target = scheduler_thread, daemon = True)
    worker_thread.start()
    return app


if __name__ == '__main__':
    app.app_context().push()
    driver_function().run(host = '0.0.0.0', port = 8002)
0

There are 0 answers