I am trying to implement async ServerInterceptor [grcp.aio.ServerInterceptor]. My current synchronous ServerInterceptor looks like this https://github.com/zhyon404/python-grpc-prometheus/blob/master/python_grpc_prometheus/prometheus_server_interceptor.py#L48. When i try to use grpc.aio.ServerInterceptor and start the server, My Server code
from grpc_opentracing import open_tracing_server_interceptor
from grpc_opentracing.grpcext import intercept_server
import PromServerInterceptor
class MyServicer():
async def _start_async_server(self, tracer=None,service, grpc_port=8083, http_port=8080):
tracing_interceptor = open_tracing_server_interceptor(tracer)
server = aio.server(nterceptors=(PromServerInterceptor(),))
server = intercept_server(server, tracing_interceptor)
my_service_pb2_grpc.add_MyServicer_to_server(service, server)
server.add_insecure_port("[::]:" + str(grpc_port))
await server.start()
logger.info("Started prometheus server at port %s", http_port)
prometheus_client.start_http_server(http_port)
await server.wait_for_termination()
def async_serve(self, tracer=None, service, grpc_port=8083, http_port=8080):
loop = asyncio.get_event_loop()
loop.create_task(self._start_async_server(service, tracer, grpc_port, http_port))
loop.run_forever()
Following are the lib versions i am using:
grpcio=1.32.0
grpcio-opentracing==1.1.4
I see the following error:
File "src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi", line 646, in grpc._cython.cygrpc._handle_exceptions
File "src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi", line 745, in _handle_rpc
File "src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi", line 511, in _handle_unary_unary_rpc
File "src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi", line 368, in _finish_handler_with_unary_response
File "prometheus_server_interceptor.py", line 93, in new_behavior
rsp = await behavior(request_or_iterator, service_context)
File "/anaconda3/lib/python3.7/site-packages/grpc_opentracing/grpcext/_interceptor.py", line 272, in adaptation
_UnaryServerInfo(self._method), handler)
File "/anaconda3/lib/python3.7/site-packages/grpc_opentracing/_server.py", line 145, in intercept_unary
timeout=servicer_context.time_remaining(),
AttributeError: 'grpc._cython.cygrpc._ServicerContext' object has no attribute 'time_remaining
Following is my PromServerInterceptor implemenation:
from grpc import aio
import grpc
from timeit import default_timer
from python_grpc_prometheus.server_metrics import (SERVER_HANDLED_LATENCY_SECONDS,
SERVER_HANDLED_COUNTER,
SERVER_STARTED_COUNTER,
SERVER_MSG_RECEIVED_TOTAL,
SERVER_MSG_SENT_TOTAL)
from python_grpc_prometheus.util import type_from_method
from python_grpc_prometheus.util import code_to_string
def _wrap_rpc_behavior(handler, fn):
if handler is None:
return None
if handler.request_streaming and handler.response_streaming:
behavior_fn = handler.stream_stream
handler_factory = grpc.stream_stream_rpc_method_handler
elif handler.request_streaming and not handler.response_streaming:
behavior_fn = handler.stream_unary
handler_factory = grpc.stream_unary_rpc_method_handler
elif not handler.request_streaming and handler.response_streaming:
behavior_fn = handler.unary_stream
handler_factory = grpc.unary_stream_rpc_method_handler
else:
behavior_fn = handler.unary_unary
handler_factory = grpc.unary_unary_rpc_method_handler
return handler_factory(fn(behavior_fn,
handler.request_streaming,
handler.response_streaming),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer)
def split_call_details(handler_call_details, minimum_grpc_method_path_items=3):
parts = handler_call_details.method.split("/")
if len(parts) < minimum_grpc_method_path_items:
return '', '', False
grpc_service, grpc_method = parts[1:minimum_grpc_method_path_items]
return grpc_service, grpc_method, True
class PromServerInterceptor(aio.ServerInterceptor):
async def intercept_service(self, continuation, handler_call_details):
handler = await continuation(handler_call_details)
if handler is None:
return handler
# only support unary
if handler.request_streaming or handler.response_streaming:
return handler
grpc_service, grpc_method, ok = split_call_details(handler_call_details)
if not ok:
return continuation(handler_call_details)
grpc_type = type_from_method(handler.request_streaming, handler.response_streaming)
SERVER_STARTED_COUNTER.labels(
grpc_type=grpc_type,
grpc_service=grpc_service,
grpc_method=grpc_method).inc()
def latency_wrapper(behavior, request_streaming, response_streaming):
async def new_behavior(request_or_iterator, service_context):
start = default_timer()
SERVER_MSG_RECEIVED_TOTAL.labels(
grpc_type=grpc_type,
grpc_service=grpc_service,
grpc_method=grpc_method
).inc()
# default
code = code_to_string(grpc.StatusCode.UNKNOWN)
try:
rsp = await behavior(request_or_iterator, service_context)
if service_context._state.code is None:
code = code_to_string(grpc.StatusCode.OK)
else:
code = code_to_string(service_context._state.code)
SERVER_MSG_SENT_TOTAL.labels(
grpc_type=grpc_type,
grpc_service=grpc_service,
grpc_method=grpc_method
).inc()
return rsp
except grpc.RpcError as e:
if isinstance(e, grpc.Call):
code = code_to_string(e.code())
raise e
finally:
SERVER_HANDLED_COUNTER.labels(
grpc_type=grpc_type,
grpc_service=grpc_service,
grpc_method=grpc_method,
grpc_code=code
).inc()
SERVER_HANDLED_LATENCY_SECONDS.labels(
grpc_type=grpc_type,
grpc_service=grpc_service,
grpc_method=grpc_method).observe(max(default_timer() - start, 0))
return new_behavior
return _wrap_rpc_behavior(handler, latency_wrapper)
The interceptor from grpc.aio can look like:
To convert it you need to add
async
forintercept_service
andawait
forcontinuation
.