I have a problem with NATS jetstream. I set up everything locally with docker-compose (NATS with 3 Nodes, jetstream enabled).
I have a fastapi application which should execute a state machine. I instantiate a SM class inside a taskgroup and execute it. It then hops into this SM and in one state it should send a event with my NATS EventHandler implementation I give when I instantiate the SM.
So my problem is: Outside of the taskgroup I test the EventHandler implementation and it works just fine:
testBroadcaster = NatsEventBroadcaster()
await testBroadcaster.subscribe(source="test")
await testBroadcaster.broadcast(source="test", event="Hello")
Inside the SM execution function I call it like:
self._event_broadcaster.broadcast(source=self._name, event=action.event)
The broadcast implementation function looks like this:
async def broadcast(self, *, source: str, event: str):
logger.info(f"Source: {source}")
await self._ensure_stream_exists(source)
bytes_event = bytes(event, 'utf-8')
try:
ack = await self._js.publish(source, bytes_event)
print(f"Message broadcasted with sequence {ack.seq}")
except ErrTimeout:
print("Failed to broadcast message due to timeout")
The _ensure_stream_exists function looks like this:
async def _ensure_stream_exists(self, subject: str):
logger.info(f"Source_ensure Function: {subject}")
"""Ensure that the stream for the subject exists."""
await self._ensure_connected()
try:
logger.info(f"Looking for Stream: {subject} ... ")
stream_info = await self._js.stream_info(subject)
except Exception as e:
logger.warn(e)
config = api.StreamConfig(name=subject, subjects=[subject])
logger.info(config)
await self._js.add_stream(config)
except TimeoutError:
print(f"Timeout when checking for stream: {subject}")
So finally the stack trace for my problem is:
await self._event_broadcaster.broadcast(
2023-10-20 20:28:38 | File "/app/app/services/nats_controller.py", line 50, in broadcast
2023-10-20 20:28:38 | await self._ensure_stream_exists(source)
2023-10-20 20:28:38 | File "/app/app/services/nats_controller.py", line 44, in _ensure_stream_exists
2023-10-20 20:28:38 | await self._js.add_stream(config)
2023-10-20 20:28:38 | File "/usr/local/lib/python3.11/site-packages/nats/js/manager.py", line 93, in add_stream
2023-10-20 20:28:38 | resp = await self._api_request(
2023-10-20 20:28:38 | ^^^^^^^^^^^^^^^^^^^^^^^^
2023-10-20 20:28:38 | File "/usr/local/lib/python3.11/site-packages/nats/js/manager.py", line 361, in _api_request
2023-10-20 20:28:38 | msg = await self._nc.request(req_subject, req, timeout=timeout)
2023-10-20 20:28:38 |
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2023-10-20 20:28:38 | File "/usr/local/lib/python3.11/site-packages/nats/aio/client.py", line 976, in request
2023-10-20 20:28:38 | msg = await self._request_new_style(
2023-10-20 20:28:38 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2023-10-20 20:28:38 | File "/usr/local/lib/python3.11/site-
+packages/nats/aio/client.py", line 1025, in _request_new_style
2023-10-20 20:28:38 | raise errors.TimeoutError
2023-10-20 20:28:38 | nats.errors.TimeoutError: nats: timeout
I believe it is some problem with the async behaviour of the whole system.
I instantiate a SM inside a taskgroup and the execution of the state_machine is also inside a taskgroup. However, I can't manage to find a solution.
I appreciate any tips/hints!