Queue.asyncio ValueError: task_done() called too many times - Coding error or a bug detected?

1.4k views Asked by At

I implemented a piece of code that get an element from one Queue and put the same object into each queue from a list of queues. The problem is that when I run a specific test I'm getting a ValueError: task_done() called too many times exception. This error happens in the test code, not in the code being tested.

I'm using the asyncio.Queue and programming using coroutines. I matched each Queue.get with one exactly Queue.task_done call. I'm testing the code with pytest.

I'm using the following libs:

  • Python 3.7
  • pytest==3.10.0
  • pytest-asyncio==0.9.0

I have two files: middleware.py that contains my class implementation and test_middleware.py that implements the pytest test.

File middlware.py:

import asyncio

class DistributorMiddleware:

    def __init__(self, in_queue, out_list_queue):
        self._in = in_queue
        self._out = out_list_queue

    async def distribute(self):

        while True:
            ele = await self._in.get()
            count=0
            for queue in self._out:
                await queue.put(ele)
                count+=1
                print(f'inserted ele in {count}')
            queue.task_done()
            if ele == None:
                break
        for queue in self._out:
            await queue.join()

File test_middleware.py:

import pytest
import asyncio                
from asyncio import Queue
from middleware import DistributorMiddleware
import random
import os


@pytest.mark.asyncio                                                                                     
async def test_distribution(request, event_loop):                                                        
    q_list = [ Queue() for _ in range(10) ]                                                              
    _in = Queue()
    distrib = DistributorMiddleware(_in, q_list)                                                         
    event_loop.create_task(distrib.distribute())                                                         
    num_ele = random.randint(1, 10)
    ele_set = set()
    for _ in range(num_ele):                                                                             
        ele = os.urandom(4)                                                                              
        ele_set.add(ele)
        await _in.put(ele)
    await _in.put(None)                                                                                  
    await asyncio.sleep(1)                                                                               

    for i,q in enumerate(q_list):
        assert q.qsize() == num_ele + 1
        c_set = ele_set.copy()
        count= 0
        while True:
            e = await q.get()
            count+=1
            print(f'Queue {i}: element: "{e}" number {count} extracted of {q.qsize()}!')
            q.task_done()
            if e == None:
                break
            assert e in c_set
            c_set.remove(e)

In the test the middleware should get elements from the input queue and put them into 10 queues from the list. And it does the work correctly.

The test code gets all elements from each of the 10 queues and checks if they are present in the original queue. For the 9 first queues, everything goes well without error, but when the test tries to get the first element from the tenth list, a ValueError is raised:

request = <FixtureRequest for <Function 'test_distribution'>>, event_loop = <_UnixSelectorEventLoop running=False closed=False debug=False>

    @pytest.mark.asyncio
    async def test_distribution(request, event_loop):
        q_list = [ Queue() for _ in range(10) ]
        _in = Queue()
        distrib = DistributorMiddleware(_in, q_list)
        event_loop.create_task(distrib.distribute())
        num_ele = random.randint(1, 10)
        ele_set = set()
        for _ in range(num_ele):
            ele = os.urandom(4)
            ele_set.add(ele)
            await _in.put(ele)
        await _in.put(None)
        await asyncio.sleep(1)

        for i,q in enumerate(q_list):
            assert q.qsize() == num_ele + 1
            c_set = ele_set.copy()
            count= 0
            while True:
                e = await q.get()
                count+=1
                print(f'Queue {i}: element: "{e}" number {count} extracted of {q.qsize()}!')
>               q.task_done()

test_middlewares.py:34: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <Queue at 0x7f7af5b9d828 maxsize=0 _queue=[b'\x15\xad\t\xaf', b'\x8b\xa2M=', None]>

    def task_done(self):
        """Indicate that a formerly enqueued task is complete.

        Used by queue consumers. For each get() used to fetch a task,
        a subsequent call to task_done() tells the queue that the processing
        on the task is complete.

        If a join() is currently blocking, it will resume when all items have
        been processed (meaning that a task_done() call was received for every
        item that had been put() into the queue).

        Raises ValueError if called more times than there were items placed in
        the queue.
        """
        if self._unfinished_tasks <= 0:
>           raise ValueError('task_done() called too many times')
E           ValueError: task_done() called too many times

/usr/lib/python3.7/asyncio/queues.py:202: ValueError

Every get matches a task_done. I can validate doing the following modification on the test_middlware.py file:

-            q.task_done()
+            try:
+                q.task_done()
+            except ValueError as err:
+                print(f'Value Error: {err}')
+                print(q.qsize())

Doing that I'm able to see that even with many ValueError being raised, the elements keep being retrieved from the queue. The test succeeds:

platform linux -- Python 3.7.1, pytest-3.10.0, py-1.7.0, pluggy-0.8.0
rootdir: /tmp/stack, inifile:
plugins: asyncio-0.9.0
collected 1 item                                                                                                                                                                                                  

test_middlewares.py .                                                                                                                                                                                       [100%]

============================================================================================ 1 passed in 1.04 seconds =============================================================================================

To make sure that the test is consuming all elements from all lists I forced an error adding an false assertion at the end of the test:

             assert e in c_set
             c_set.remove(e)

+    assert False == True
+

The result output shows that all elements are retrieved from all the lists, but each task_done on the last queue generates a ValueError.

Queue 7: element: "b'\x9b\xf8m\x02'" number 1 extracted of 3!
Queue 7: element: "b'\x15\xad\t\xaf'" number 2 extracted of 2!
Queue 7: element: "b'\x8b\xa2M='" number 3 extracted of 1!
Queue 7: element: "None" number 4 extracted of 0!
Queue 8: element: "b'\x9b\xf8m\x02'" number 1 extracted of 3!
Queue 8: element: "b'\x15\xad\t\xaf'" number 2 extracted of 2!
Queue 8: element: "b'\x8b\xa2M='" number 3 extracted of 1!
Queue 8: element: "None" number 4 extracted of 0!
Queue 9: element: "b'\x9b\xf8m\x02'" number 1 extracted of 3!
============================================================================================ 1 failed in 1.06 seconds ==

The question is, am I missing something and there is an error in my code or I found a bug?

1

There are 1 answers

1
Martijn Pieters On BEST ANSWER

You have an error in your code. Indeed, queue.task_done() should only be called when taking elements out of the queue, not when putting them into the queue.

But your middleware class is calling it on a queue it just used .put() on, for the last queue in self._out list; remove the queue.task_done() call from DistributorMiddleware.distribute():

async def distribute(self):

    while True:
        ele = await self._in.get()
        count=0
        for queue in self._out:
            await queue.put(ele)
            count+=1
            print(f'inserted ele in {count}')
        queue.task_done()
        # ^^^^^ you didn't take anything from the queue here!

When you remove that line, your test passes.

The reason you see the exception raised in the test is because only then does the queue know task_done() was called too often. The queue.task_done() call in DistributorMiddleware.distribute() decrements the unfinished task counter by 1, but only when that counter drops to below zero can the anomaly be detected. And you only get to that point when the last task has been taken out of the queue in test_distribution(), at which point the unfinished task counter has reached 0 at least one step too early.

Perhaps that was meant to be a call to self._in.task_done() instead? You did just get an element out of that queue in that while loop:

async def distribute(self):

    while True:
        ele = await self._in.get()
        # getting an element from self._in
        count=0
        for queue in self._out:
            await queue.put(ele)
            count+=1
            print(f'inserted ele in {count}')
        self._in.task_done()
        # done with ele, so decrement the self._in unfinished tasks counter