Below is (working) code for a generic websocket streamer.
It creates a daemon thread from which performs asyncio.run(...).
The asyncio code spawns 2 tasks, which never complete.
How to correctly destroy this object?
One of the tasks is executing a keepalive 'ping', so I can easily exit that loop using a flag. But the other is blocking on a message from the websocket.
import json
import aiohttp
import asyncio
import gzip
import asyncio
from threading import Thread
class WebSocket:
    KEEPALIVE_INTERVAL_S = 10
    def __init__(self, url, on_connect, on_msg):
        self.url = url
        self.on_connect = on_connect
        self.on_msg = on_msg
        self.streams = {}
        self.worker_thread = Thread(name='WebSocket', target=self.thread_func, daemon=True).start()
    def thread_func(self):
        asyncio.run(self.aio_run())
    async def aio_run(self):
        async with aiohttp.ClientSession() as session:
            self.ws = await session.ws_connect(self.url)
            await self.on_connect(self)
            async def ping():
                while True:
                    print('KEEPALIVE')
                    await self.ws.ping()
                    await asyncio.sleep(WebSocket.KEEPALIVE_INTERVAL_S)
            async def main_loop():
                async for msg in self.ws:
                    def extract_data(msg):
                        if msg.type == aiohttp.WSMsgType.BINARY:
                            as_bytes = gzip.decompress(msg.data)
                            as_string = as_bytes.decode('utf8')
                            as_json = json.loads(as_string)
                            return as_json
                        elif msg.type == aiohttp.WSMsgType.TEXT:
                            return json.loads(msg.data)
                        elif msg.type == aiohttp.WSMsgType.ERROR:
                            print('⛔️ aiohttp.WSMsgType.ERROR')
                        return msg.data
                    data = extract_data(msg)
                    self.on_msg(data)
            # May want this approach if we want to handle graceful shutdown
            # W.task_ping = asyncio.create_task(ping())
            # W.task_main_loop = asyncio.create_task(main_loop())
            await asyncio.gather(
                ping(),
                main_loop()
            )
    async def send_json(self, J):
        await self.ws.send_json(J)
 
                        
I'd suggest the use of
asyncio.run_coroutine_threadsafeinstead ofasyncio.run. It returns aconcurrent.futures.Futureobject which you can cancel:Another approach would be to make
pingandmain_loopa task, and cancel them when necessary:This doesn't change the fact that
aio_runshould also be called withasyncio.run_coroutine_threadsafe.asyncio.runshould be used as a main entry point for asyncio programs and should be only called once.