I have a TCP server running and have a handler function which needs to take the contents of the request, add it to an asyncio queue and reply with an OK status.
On the background I have an async coroutine running that detects when a new item is added and performs some processing.
How do I put items in the asyncio queue from the handler function, which isn't and can't be an async coroutine?
I am running a DICOM server pynetdicom which listens on port 104 for incoming TCP requests (DICOM C-STORE
specifically).
I need to save the contents of the request to a queue and return a a 0x0000
response so that the listener is available to the network.
This is modeled by a producer-consumer pattern.
I have tried to define a consumer co-routine consume_dicom()
that is currently stuck in await queue.get()
since I can't properly define the producer.
The producer needs to simply invoke queue.put(produce_item)
but this happens inside a handle_store(event)
function which is not part of the event_loop
but is called every time a request is received by the server.
import asyncio
from pynetdicom import (
AE, evt,
StoragePresentationContexts
)
class PacsServer():
def __init__(self, par, listen=True):
# Initialize other stuff...
# Initialize DICOM server
ae = AE(ae_title='DICOM-NODE')
ae.supported_contexts = StoragePresentationContexts
# When a C-STORE request comes, it will be passed to self.handle_store
handlers = [(evt.EVT_C_STORE, self.handle_store)]
# Define queue
loop = asyncio.get_event_loop()
self.queue = asyncio.Queue(loop=loop)
# Define consumer
loop.create_task(self.consume_dicom(self.queue))
# Start server in the background with specified handlers
self.scp = ae.start_server(('', 104), block=False, evt_handlers=handlers)
# Start async loop
self.loop.run_forever()
def handle_store(self, event):
# Request handling
ds = event.dataset
# Here I want to add to the queue but this is not an async method
await queue.put(ds)
return 0x0000
async def consume_dicom(self, queue):
while True:
print(f"AWAITING FROM QUEUE")
ds = await queue.get()
do_some_processing(ds)
I would like to find a way to add items to the queue and return the OK status in the handle_store()
function.
Since
handle_store
is running in a different thread, it needs to tell the event loop to enqueue the item. This is done withcall_soon_threadsafe
:Note that you need to call
queue.put_nowait
instead ofqueue.put
because the former is a function rather than a coroutine. The function will always succeed for unbounded queues (the default), otherwise it will raise an exception if the queue is full.