I have Python server and client in gRPC. After adding ClientInterceptor I can't receive messages from unary_stream method from the server on client side. On the server side I can see that stream is over, all messages has been sent.
It's like client is not aware that there are no messages left and there's no need to wait. Adding wait_for_ready didn't help, it just fails with the DEADLINE error. stream_stream method works fine.
Calling response.code() or response.result() results infinite wait time.
Calling response.done() returns false, response.running() returns true. My interceptor:
def _intercept_stream_call(self, continuation, client_call_details, request_or_iterator):
for try_i in range(self.max_attempts):
response = continuation(client_call_details, request_or_iterator)
if isinstance(response, _MultiThreadedRendezvous):
if response.code() == grpc.StatusCode.CANCELLED:
if try_i == (self.max_attempts - 1):
return response
else:
self.sleeping_policy.sleep(try_i)
else:
return list(response)
def intercept_unary_stream(self, continuation, client_call_details, request):
return self._intercept_stream_call(continuation, client_call_details, request)
Code freezes on line if response.code() == grpc.StatusCode.CANCELLED:. If I run that line in evaluator, same situation. The workaround is to call in evaluator list(response). After that, response.code() will return the value.
Pls help to understand what am I doing wrong.
My server method:
def watchStream(self, request, context):
for i in range(3):
log.info(f'Returned response {i} to watch stream...')
yield social_media_stream_pb2.StreamUpdate(audio_chunk=social_media_stream_pb2.AudioChunk(audio_data=f'Audio{i}'.encode()),
video_frame=social_media_stream_pb2.VideoFrame(frame_data=f'Video{i}'.encode()))
I've tried to run list(response) before running response.code() or response.result(). It helped, but I don't know why, it's just a workaround.
You're encountering a deadlock when calling
response.code(). From the docs, the return value ofcontinuationisThat means that the object is guaranteed to fulfill these two interfaces:
grpc.Callgrpc.FutureThe return value is not however, guaranteed to be a response message. This is because
continuationreturns not after the RPC response has been received, but instead immediately after the RPC has been sent. If you attempt to access the code on the RPC, the library will block waiting for your RPC to complete so that the code is set. But the library will not make progress on receiving the RPC because you've blocked the thread waiting on the code to be populated. Deadlock.Instead, you should set up a callback to do something conditioned on the RPC status only once the RPC response has been received. The method to do this is
add_done_callbackP.S. Please don't reference
_MultiThreadedRendezvousin your code. It's an internal API. You should be able to do everything without referencing it.