How to send a specific user notification after a time consuming task is finished in my flask and react app?

34 views Asked by At

I'm trying to learn socketio with Flask. Say in a route there's a time-consuming process that takes about an hour to complete with celery. when the time_consuming_task() finishes, I want to send the user notification that the task is completed. I'm not really sure what to do. Any help will be appreciated. Have a great day! Here's what I've got so far:

my init.py:

from .routes.tools import tools

from flask import Flask, render_template

from flask_cors import CORS

from celery import Celery, Task
from .extensions import socketio

def celery_init_app(app: Flask) -> Celery:
    class FlaskTask(Task):
        def __call__(self, *args: object, **kwargs: object) -> object:
            with app.app_context():
                return self.run(*args, **kwargs)

    celery_app = Celery(app.name, task_cls=FlaskTask)
    celery_app.config_from_object(app.config["CELERY"])
    celery_app.set_default()
    app.extensions["celery"] = celery_app
    return celery_app


def create_app():
    app = Flask(__name__)
    app.config['SECRET_KEY'] = 'secret'

    CORS(app, origins='http://127.0.0.1:666')

    app.config.from_mapping(
        CELERY=dict(
            broker_url="redis://localhost",
            result_backend="redis://localhost",
            task_ignore_result=True,
        ),
    )
    app.config.from_prefixed_env()
    celery_init_app(app)

    socketio.init_app(app)

    @socketio.on('connect', namespace='/tools')
    def on_connect():
        print('Client connected')

    @socketio.on('task_completed', namespace='/tools')
    def on_task_completed(data):
        print('Task completed:', data)

    app.extensions['socketio'] = socketio
    

    @app.route('/')
    def base_tool():
        return render_template('main.html')

    app.register_blueprint(tools, url_prefix='/tools')

    return app

my task.py file:

from celery import shared_task
from flask_socketio import emit
import time
from .extensions import socketio


@shared_task(ignore_result=False)
def time_consuming_task():
    time.sleep(5)
    # with socketio.app.app_context():
    #     socketio.emit('task_completed', {'status': 'completed'}, namespace='/tools')

    socketio.emit('task_completed', {'status': 'completed'}, namespace='/tools')

    return None

Route:

from flask import Blueprint, jsonify, render_template
from flask_cors import CORS
from ..tasks import do_the_waiting


tools = Blueprint('tools', __name__)
# handle_cors(tools)
CORS(tools, cors_allowed_origins="http://127.0.0.1:666")


@tools.route('/get')
def get_tools():
    time_consuming_task.delay()
    
    return jsonify({'status': 'started'})
0

There are 0 answers