I implemented a piece of code that get an element from one Queue and put the same object into each queue from a list of queues. The problem is that when I run a specific test I'm getting a ValueError: task_done() called too many times
exception. This error happens in the test code, not in the code being tested.
I'm using the asyncio.Queue
and programming using coroutines.
I matched each Queue.get
with one exactly Queue.task_done
call.
I'm testing the code with pytest.
I'm using the following libs:
- Python 3.7
- pytest==3.10.0
- pytest-asyncio==0.9.0
I have two files: middleware.py
that contains my class implementation and test_middleware.py
that implements the pytest test.
File middlware.py
:
import asyncio
class DistributorMiddleware:
def __init__(self, in_queue, out_list_queue):
self._in = in_queue
self._out = out_list_queue
async def distribute(self):
while True:
ele = await self._in.get()
count=0
for queue in self._out:
await queue.put(ele)
count+=1
print(f'inserted ele in {count}')
queue.task_done()
if ele == None:
break
for queue in self._out:
await queue.join()
File test_middleware.py
:
import pytest
import asyncio
from asyncio import Queue
from middleware import DistributorMiddleware
import random
import os
@pytest.mark.asyncio
async def test_distribution(request, event_loop):
q_list = [ Queue() for _ in range(10) ]
_in = Queue()
distrib = DistributorMiddleware(_in, q_list)
event_loop.create_task(distrib.distribute())
num_ele = random.randint(1, 10)
ele_set = set()
for _ in range(num_ele):
ele = os.urandom(4)
ele_set.add(ele)
await _in.put(ele)
await _in.put(None)
await asyncio.sleep(1)
for i,q in enumerate(q_list):
assert q.qsize() == num_ele + 1
c_set = ele_set.copy()
count= 0
while True:
e = await q.get()
count+=1
print(f'Queue {i}: element: "{e}" number {count} extracted of {q.qsize()}!')
q.task_done()
if e == None:
break
assert e in c_set
c_set.remove(e)
In the test the middleware should get elements from the input queue and put them into 10 queues from the list. And it does the work correctly.
The test code gets all elements from each of the 10 queues and checks if they are present in the original queue. For the 9 first queues, everything goes well without error, but when the test tries to get the first element from the tenth list, a ValueError
is raised:
request = <FixtureRequest for <Function 'test_distribution'>>, event_loop = <_UnixSelectorEventLoop running=False closed=False debug=False>
@pytest.mark.asyncio
async def test_distribution(request, event_loop):
q_list = [ Queue() for _ in range(10) ]
_in = Queue()
distrib = DistributorMiddleware(_in, q_list)
event_loop.create_task(distrib.distribute())
num_ele = random.randint(1, 10)
ele_set = set()
for _ in range(num_ele):
ele = os.urandom(4)
ele_set.add(ele)
await _in.put(ele)
await _in.put(None)
await asyncio.sleep(1)
for i,q in enumerate(q_list):
assert q.qsize() == num_ele + 1
c_set = ele_set.copy()
count= 0
while True:
e = await q.get()
count+=1
print(f'Queue {i}: element: "{e}" number {count} extracted of {q.qsize()}!')
> q.task_done()
test_middlewares.py:34:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Queue at 0x7f7af5b9d828 maxsize=0 _queue=[b'\x15\xad\t\xaf', b'\x8b\xa2M=', None]>
def task_done(self):
"""Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each get() used to fetch a task,
a subsequent call to task_done() tells the queue that the processing
on the task is complete.
If a join() is currently blocking, it will resume when all items have
been processed (meaning that a task_done() call was received for every
item that had been put() into the queue).
Raises ValueError if called more times than there were items placed in
the queue.
"""
if self._unfinished_tasks <= 0:
> raise ValueError('task_done() called too many times')
E ValueError: task_done() called too many times
/usr/lib/python3.7/asyncio/queues.py:202: ValueError
Every get
matches a task_done
. I can validate doing the following modification on the test_middlware.py
file:
- q.task_done()
+ try:
+ q.task_done()
+ except ValueError as err:
+ print(f'Value Error: {err}')
+ print(q.qsize())
Doing that I'm able to see that even with many ValueError
being raised, the elements keep being retrieved from the queue. The test succeeds:
platform linux -- Python 3.7.1, pytest-3.10.0, py-1.7.0, pluggy-0.8.0
rootdir: /tmp/stack, inifile:
plugins: asyncio-0.9.0
collected 1 item
test_middlewares.py . [100%]
============================================================================================ 1 passed in 1.04 seconds =============================================================================================
To make sure that the test is consuming all elements from all lists I forced an error adding an false assertion at the end of the test:
assert e in c_set
c_set.remove(e)
+ assert False == True
+
The result output shows that all elements are retrieved from all the lists, but each task_done on the last queue generates a ValueError
.
Queue 7: element: "b'\x9b\xf8m\x02'" number 1 extracted of 3!
Queue 7: element: "b'\x15\xad\t\xaf'" number 2 extracted of 2!
Queue 7: element: "b'\x8b\xa2M='" number 3 extracted of 1!
Queue 7: element: "None" number 4 extracted of 0!
Queue 8: element: "b'\x9b\xf8m\x02'" number 1 extracted of 3!
Queue 8: element: "b'\x15\xad\t\xaf'" number 2 extracted of 2!
Queue 8: element: "b'\x8b\xa2M='" number 3 extracted of 1!
Queue 8: element: "None" number 4 extracted of 0!
Queue 9: element: "b'\x9b\xf8m\x02'" number 1 extracted of 3!
============================================================================================ 1 failed in 1.06 seconds ==
The question is, am I missing something and there is an error in my code or I found a bug?
You have an error in your code. Indeed,
queue.task_done()
should only be called when taking elements out of the queue, not when putting them into the queue.But your middleware class is calling it on a queue it just used
.put()
on, for the last queue inself._out
list; remove thequeue.task_done()
call fromDistributorMiddleware.distribute()
:When you remove that line, your test passes.
The reason you see the exception raised in the test is because only then does the queue know
task_done()
was called too often. Thequeue.task_done()
call inDistributorMiddleware.distribute()
decrements the unfinished task counter by 1, but only when that counter drops to below zero can the anomaly be detected. And you only get to that point when the last task has been taken out of the queue intest_distribution()
, at which point the unfinished task counter has reached 0 at least one step too early.Perhaps that was meant to be a call to
self._in.task_done()
instead? You did just get an element out of that queue in thatwhile
loop: