python3.9 and mysql async - Why i'm getting "Task was destroyed but it is pending!"?

404 views Asked by At

i have a weird problem since i'm using an async mysql driver with Python3.9. I often get this error:

Task was destroyed but it is pending!
task: <ClientEventTask state=pending event=on_message coro=<bound method OnMessage.on_message of <cogs.on_message.OnMessage object at 0x7fcb96f50700>>>

and I can't find many information about this error in google, the only things I found couldn't help me. So this error occurs at absolutely random times, so I can't exactly reproduce it. But if it occurs, I can directly see it, because he doesn't answer/count/whatever-he-should to a "message."

So a small explanation of what the "on_message" task does is "do something" if a message gets posted by a user. So depending on the number of users and the activity, it can be triggered very often or very rarely. I saw that this error comes most times on bot stop/start-up, but in my case, it occurs sometimes while the bot is running normally. And I don't know what this error exactly means, how can I fix it or why it occurs. I thought maybe is my code for this task too long, but that would not make any sense to me. So that's my code for this "task":

    @commands.Cog.listener("on_message")
    async def on_message(self, message):


        if message.author.bot:
            return

        if message.type != discord.MessageType.default:
            return

        if isinstance(message.channel, discord.channel.DMChannel):
            return

        prefixes = ["!", "?", "=", ".", ",", "@", "#", "/", '§', "$", "%", "&", ":?", "*", "<", ">", "-", "bday"]
        for prefix in prefixes:
            if message.content.startswith(str(prefix)):
                return

        global member_cooldown_list
        member_cooldown_list = [i for i in member_cooldown_list if i[1] + cooldown_val > int(time.time())]
        member_index = next((i for i, v in enumerate(member_cooldown_list) if v[0] == message.author.id), None)
        if member_index is not None:
            if member_cooldown_list[member_index][1] + cooldown_val > int(time.time()):
                return

        member_cooldown_list.append((message.author.id, int(time.time())))

        count = 1
        mydb = await getConnection()
        mycursor = await mydb.cursor()
        await mycursor.execute("SELECT ignore_role_id, bonus_role_id FROM guild_role_settings WHERE guild_id = %s", (message.author.guild.id,))
        in_database = await mycursor.fetchone()
        if in_database:
            if in_database[0] is not None:
                role_list = in_database[0].split(" ")
                for roleid in role_list:
                    role = message.author.guild.get_role(int(roleid))
                    if role in message.author.roles:
                        await mycursor.close()
                        mydb.close()
                        return

            if in_database[1] is not None:
                role_list = in_database[1].split(" ")
                for roleid in role_list:
                    role = message.author.guild.get_role(int(roleid))
                    if role in message.author.roles:
                        count += 1

        await mycursor.execute("SELECT ignore_channel_id FROM guild_channel_settings WHERE guild_id = %s", (message.author.guild.id,))
        in_database1 = await mycursor.fetchone()
        if in_database1:
            if in_database1[0] is not None:
                channel_list = in_database1[0].split(" ")
                for channelid in channel_list:
                    if int(message.channel.id) == int(channelid):
                        await mycursor.close()
                        mydb.close()
                        return

        await mycursor.execute("SELECT * FROM guild_message_count WHERE guild_id = %s AND user_id = %s", (message.author.guild.id, message.author.id))
        in_database2 = await mycursor.fetchone()
        if in_database2:
            await mycursor.execute("UPDATE guild_message_count SET user_id = %s, message_count = message_count + %s WHERE guild_id = %s AND user_id = %s", (message.author.id, count, message.author.guild.id, message.author.id))
        else:
            await mycursor.execute("INSERT INTO guild_message_count (user_id, message_count, guild_id) VALUES (%s, %s, %s)", (message.author.id, count, message.author.guild.id))

        await mydb.commit()
        await mycursor.close()
        mydb.close()

I would be really appreciate it, if anyone explains the error to me, a solution for a fix and why it occurs. I really couldn't find much information about it and its really frustrating. Especially, because it occurs since I use an async MySQL driver and python3.9.

2 days later - Still searching for fix: I don't found a solution since creation and it's really frustrating.

1

There are 1 answers

2
abysslover On

This is because of a race condition between discord client module and the message processing thread as explained in the link.

This is because the discord client module needs control once every minute or so.

This means that any function that steals control for more than a certain time causes discord's client to enter an invalid state (which will manifest itself as an exception some point later, perhaps upon next method call of client).

In summary, you should offload all the heavy operations to a separate process (not a thread due to the Python GIL). The details are as follows:

  1. Discord listener: dispatches the message to the worker server.
  2. Worker server: parse message and process SQLs.

In your discord listener, global variables should be avoided as follows:

....
@commands.Cog.listener("on_message")
async def on_message(self, message):
    import traceback
    if message.author.bot or 
        message.type != discord.MessageType.default or 
        isinstance(message.channel, discord.channel.DMChannel):
        return

    prefixes = ["!", "?", "=", ".", ",", "@", "#", "/", '§', "$", "%", "&",
                ":?", "*", "<", ">", "-", "bday"]
    for prefix in prefixes:
        if message.content.startswith(prefix):
            return
    #global member_cooldown_list
    self.member_cooldown_list = [i for i in self.member_cooldown_list
                                 if i[1] + cooldown_val > int(time.time())]
    member_index = next((i for i, v in enumerate(self.member_cooldown_list)
                         if v[0] == message.author.id), None)
    if member_index is not None:
        if self.member_cooldown_list[member_index][1] + cooldown_val > int(time.time()):
            return

    self.member_cooldown_list.append((message.author.id, int(time.time())))
    guild_id = message.author.guild.id
    try:
        response = await call("process", guild_id)
        ...
    except Exception as e:
        print (e)
        print (traceback.format_exc())

The following codes have to be moved to the work server script as follows:

class Handler:
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    async def process(self, guild_id):
        count = 1
        mydb = await getConnection()
        ...
        await mycursor.close()
        mydb.close()
        return 0       

Note:

  1. You should adopt the code in the link.
  2. You should run the server worker script using nohup python the_worker_script.py & or something similar.