I'm trying to learn socketio with Flask. Say in a route there's a time-consuming process that takes about an hour to complete with celery. when the time_consuming_task() finishes, I want to send the user notification that the task is completed. I'm not really sure what to do. Any help will be appreciated. Have a great day! Here's what I've got so far:
my init.py:
from .routes.tools import tools
from flask import Flask, render_template
from flask_cors import CORS
from celery import Celery, Task
from .extensions import socketio
def celery_init_app(app: Flask) -> Celery:
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)
celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config["CELERY"])
celery_app.set_default()
app.extensions["celery"] = celery_app
return celery_app
def create_app():
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret'
CORS(app, origins='http://127.0.0.1:666')
app.config.from_mapping(
CELERY=dict(
broker_url="redis://localhost",
result_backend="redis://localhost",
task_ignore_result=True,
),
)
app.config.from_prefixed_env()
celery_init_app(app)
socketio.init_app(app)
@socketio.on('connect', namespace='/tools')
def on_connect():
print('Client connected')
@socketio.on('task_completed', namespace='/tools')
def on_task_completed(data):
print('Task completed:', data)
app.extensions['socketio'] = socketio
@app.route('/')
def base_tool():
return render_template('main.html')
app.register_blueprint(tools, url_prefix='/tools')
return app
my task.py file:
from celery import shared_task
from flask_socketio import emit
import time
from .extensions import socketio
@shared_task(ignore_result=False)
def time_consuming_task():
time.sleep(5)
# with socketio.app.app_context():
# socketio.emit('task_completed', {'status': 'completed'}, namespace='/tools')
socketio.emit('task_completed', {'status': 'completed'}, namespace='/tools')
return None
Route:
from flask import Blueprint, jsonify, render_template
from flask_cors import CORS
from ..tasks import do_the_waiting
tools = Blueprint('tools', __name__)
# handle_cors(tools)
CORS(tools, cors_allowed_origins="http://127.0.0.1:666")
@tools.route('/get')
def get_tools():
time_consuming_task.delay()
return jsonify({'status': 'started'})