My telethon bot was working until I added Quart to my project and to main.py
import asyncio
from datetime import datetime
from bot.telegram_bot import TelegramBot, client
from web.server import create_app
from database.database import setup_database
import logging
logging.basicConfig(level=logging.DEBUG)
async def start_telegram_bot(asset_manager):
print(f"{datetime.now().strftime('%H:%M:%S.%f')[:-3]} Script was started.")
# Start the client
await client.start()
print("Client started.")
# Cache all entities
await client.get_dialogs()
print("Cached all entities.")
bot = TelegramBot()
# Log that the script was started and is running
start_script_log_text = f"⏳ | {datetime.now().strftime('%Y-%d-%m %H:%M:%S')} | Script started!"
await bot.send_log_message(start_script_log_text) # Send log message asynchronously
# Start the client
await client.run_until_disconnected()
async def main():
# Set up the database before starting the Quart app and the main async function
setup_database()
# Create the Quart app
app = create_app()
# Create a task for the Quart app
quart_app_task = asyncio.create_task(app.run_task(port=8000))
# Create a task for the Telegram bot
bot_task = asyncio.create_task(start_telegram_bot())
# Start both tasks
await asyncio.gather(quart_app_task, bot_task)
if __name__ == "__main__":
asyncio.run(main())
server.py
def create_app():
app = Quart(__name__)
app.config["DEBUG"] = True # Enable debug mode
...
return app
telegram_bot.py
...
client = TelegramClient(session_file_path, API_ID, API_HASH)
bot = Bot(token=BOT_API_TOKEN)
...
but this code gets stuck at await client.start() and gives me the following error:
RuntimeError: Task <Task pending name='Task-40' coro=<Queue.get() running at c:\users\me\appdata\local\programs\python\python39\lib\asyncio\queues.py:166> cb=[_release_waiter(<Future pendi...E9FE1E6D0>()]>)() at c:\users\me\appdata\local\programs\python\python39\lib\asyncio\tasks.py:416]> got Future <Future pending> attached to a different loop.
If I instead use this approach it is working:
if __name__ == "__main__":
loop = asyncio.get_event_loop() # Get the event loop for the current context
try:
loop.run_until_complete(main()) # Run the main coroutine
finally:
loop.close() # Close the loop
I read that this approach is soon to be deprecated, and I am using Python 3.9. For this reason I don't want to use this approach.