NATS jetstream timeout inside anyio taskgroup

145 views Asked by At

I have a problem with NATS jetstream. I set up everything locally with docker-compose (NATS with 3 Nodes, jetstream enabled).

I have a fastapi application which should execute a state machine. I instantiate a SM class inside a taskgroup and execute it. It then hops into this SM and in one state it should send a event with my NATS EventHandler implementation I give when I instantiate the SM.

So my problem is: Outside of the taskgroup I test the EventHandler implementation and it works just fine:

testBroadcaster = NatsEventBroadcaster()
await testBroadcaster.subscribe(source="test")
await testBroadcaster.broadcast(source="test", event="Hello")

Inside the SM execution function I call it like:

self._event_broadcaster.broadcast(source=self._name, event=action.event)

The broadcast implementation function looks like this:

    async def broadcast(self, *, source: str, event: str):
        logger.info(f"Source: {source}")
        await self._ensure_stream_exists(source)
        bytes_event = bytes(event, 'utf-8')
        try:
            ack = await self._js.publish(source, bytes_event)
            print(f"Message broadcasted with sequence {ack.seq}")
        except ErrTimeout:
            print("Failed to broadcast message due to timeout")

The _ensure_stream_exists function looks like this:

    async def _ensure_stream_exists(self, subject: str):
        logger.info(f"Source_ensure Function: {subject}")
        """Ensure that the stream for the subject exists."""
        await self._ensure_connected()
        try:
            logger.info(f"Looking for Stream: {subject} ... ")
            stream_info = await self._js.stream_info(subject)
        except Exception as e:
            logger.warn(e)
            config = api.StreamConfig(name=subject, subjects=[subject])
            logger.info(config)
            await self._js.add_stream(config)
        except TimeoutError:
            print(f"Timeout when checking for stream: {subject}")

So finally the stack trace for my problem is:

await self._event_broadcaster.broadcast(
2023-10-20 20:28:38       |   File "/app/app/services/nats_controller.py", line 50, in broadcast

2023-10-20 20:28:38       |     await self._ensure_stream_exists(source)

2023-10-20 20:28:38       |   File "/app/app/services/nats_controller.py", line 44, in _ensure_stream_exists

2023-10-20 20:28:38       |     await self._js.add_stream(config)

2023-10-20 20:28:38       |   File "/usr/local/lib/python3.11/site-packages/nats/js/manager.py", line 93, in add_stream

2023-10-20 20:28:38       |     resp = await self._api_request(

2023-10-20 20:28:38       |            ^^^^^^^^^^^^^^^^^^^^^^^^

2023-10-20 20:28:38       |   File "/usr/local/lib/python3.11/site-packages/nats/js/manager.py", line 361, in _api_request

2023-10-20 20:28:38       |     msg = await self._nc.request(req_subject, req, timeout=timeout)

2023-10-20 20:28:38       |           
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

2023-10-20 20:28:38       |   File "/usr/local/lib/python3.11/site-packages/nats/aio/client.py", line 976, in request

2023-10-20 20:28:38       |     msg = await self._request_new_style(

2023-10-20 20:28:38       |           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

2023-10-20 20:28:38       |   File "/usr/local/lib/python3.11/site-
+packages/nats/aio/client.py", line 1025, in _request_new_style

2023-10-20 20:28:38       |     raise errors.TimeoutError

2023-10-20 20:28:38       | nats.errors.TimeoutError: nats: timeout

I believe it is some problem with the async behaviour of the whole system.

I instantiate a SM inside a taskgroup and the execution of the state_machine is also inside a taskgroup. However, I can't manage to find a solution.

I appreciate any tips/hints!

0

There are 0 answers