FastAPI websocket can't handle large incoming of data?

5.7k views Asked by At

I have created an websocket server using python and FastAPI: https://fastapi.tiangolo.com/

And.. it is finished, buuuut of course in the end, i found a new "bug" that is giving me some panic.

When a connected client just sends a lot of data, he suddenly gets disconnected. Im not sure if it is the websocket code itself, if it is asyncio, or uvicorn, docker, or the linux server itself that cant handle the large incoming of data?

here is my server code which i run in dockercontainer:

import json
import time

import socketio

from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse

from starlette.websockets import WebSocketDisconnect


from typing import List


app = FastAPI()


sio = socketio.AsyncServer(async_mode='asgi')
socket_app = socketio.ASGIApp(sio, static_files={'/': 'app.html'})
background_task_started = False


from streaming_class import repetisjons_klasse


import asyncio
from pydantic import BaseModel

class UserClientWebSocket(BaseModel):
    id: str
    ws: WebSocket

    class Config:
        arbitrary_types_allowed = True


class WebPageClientWebSocket(BaseModel):
    id: str
    ws: WebSocket

    class Config:
        arbitrary_types_allowed = True

class ConnectionManager:
    def __init__(self):

        self.active_user_client_connections: List[UserClientWebSocket] = []
        self.collect_user_IDs = []


        self.active_webpage_client_connections: List[WebPageClientWebSocket] = []
        self.collect_webpages_IDs = []


    async def connect_the_user_client(self, websocket: WebSocket, THE_USER_ID):
        await websocket.accept()
        await self.send_message_to_absolutely_everybody(f"User: {THE_USER_ID} connected to server!")
        print("User: {} ".format(THE_USER_ID) + " Connected")
        if THE_USER_ID not in self.collect_user_IDs:
            self.collect_user_IDs.append(THE_USER_ID)
        else:
            await self.send_message_to_absolutely_everybody(f"Somebody connected with the same ID as client: {THE_USER_ID}")
            await self.send_message_to_absolutely_everybody("but Vlori is a nice and kind guy, so he wil not get kicked :)")
            self.collect_user_IDs.append(THE_USER_ID)
        self.active_user_client_connections.append(UserClientWebSocket(ws=websocket, id=THE_USER_ID)) #SJEEKK DENNE LINJA !!!!!
        await self.show_number_of_clients()



    async def connect_the_webpage_client(self, websocket: WebSocket, the_webpage_id):
        await websocket.accept()
        await self.send_message_to_absolutely_everybody(f"User: {the_webpage_id} connected to server through webbrowser!")
        print("User: {} ".format(the_webpage_id) + " Connected")
        if the_webpage_id not in self.collect_webpages_IDs:
            self.collect_webpages_IDs.append(the_webpage_id)
        else:
            await self.send_message_to_absolutely_everybody(f"Somebody connected with the same ID as client: {the_webpage_id}")
            await self.send_message_to_absolutely_everybody("but Vlori is a nice and kind guy, so he wil not get kicked :)")
            self.collect_webpages_IDs.append(the_webpage_id)
        self.active_webpage_client_connections.append(WebPageClientWebSocket(ws=websocket, id=the_webpage_id)) #SJEEKK DENNE LINJA !!!!!
        await self.show_number_of_clients()

    async def disconnect_the_webpage_client(self, websocket: WebSocket, the_webpage_id):
        await websocket.close(code=1000)
        self.collect_webpages_IDs.remove(the_webpage_id)
        self.active_webpage_client_connections.remove(WebPageClientWebSocket(ws=websocket, id=the_webpage_id))
        await self.show_number_of_clients()


    async def disconnect_the_user_client(self, websocket: WebSocket, THE_USER_ID):
        await websocket.close(code=1000)
        self.collect_user_IDs.remove(THE_USER_ID)
        self.active_user_client_connections.remove(UserClientWebSocket(ws=websocket, id=THE_USER_ID))
        await self.show_number_of_clients()





"""
PROBLEM: THE USER GETS DISCONNECTED WHEN SENDING TO MUCH DATA IN REAL TIME!
"""

@app.websocket("/ws/testchannel")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            print("Received data: {} ".format(data))
            #await websocket.send_text(f"you sent message: {data}")
            ¤await connection_manager.send_message_to_absolutely_everybody(data)

      
    except WebSocketDisconnect:
        print("client left chat.")

those below are settings i run inside dockerfile(could it be something here? im unsure):

FROM ubuntu:latest
FROM python:3

MAINTAINER raxor2k "xxx.com"

RUN apt-get update -y

RUN apt-get install -y python3-pip build-essential python3-dev

COPY . /app
WORKDIR /app

RUN pip3 install --upgrade pip
RUN pip3 install -r requirements.txt
RUN pip3 install fastapi uvicorn #dennekanfjernes?

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "80", "--reload"]

so, here is my client.py which i run from locally in my computer(works good):

#client.py

from websocket import create_connection
import json
import time

ws = create_connection("ws://134.122.76.213:5080/ws/testchannel")

time.sleep(1)


def send_json_all_the_time(position):
    generate_json = { "machineID":"001", "RepSensor": position}
    send_json = json.dumps(generate_json)
    print("JSON SENT FROM IoT SENSOR: {}".format(send_json))
    time.sleep(0.1)
    ws.send(json.dumps(send_json))
    time.sleep(0.1)


while True:
    for x in range(1000):
        send_json_all_the_time(x)

    for x in range(100, -1, -1):
        ws.send("pause a little bit, starting again soon!")
        send_json_all_the_time(x)

So i am confused.. is my server code wrong somehow? is it the linux server problem? is the problem inside the dockercontainer?

Any help would be so appreciated!!

1

There are 1 answers

5
raxor5k On

EDIT: after some research, testing and feedback from FastAPI "forum": https://gitter.im/tiangolo/fastapi

The issue was on the client due to missing ping-pong synchronization. There was nothing wrong on the server side.