How to convert grpc.ServerInterceptor to grcp.aio.ServerInterceptor

1.5k views Asked by At

I am trying to implement async ServerInterceptor [grcp.aio.ServerInterceptor]. My current synchronous ServerInterceptor looks like this https://github.com/zhyon404/python-grpc-prometheus/blob/master/python_grpc_prometheus/prometheus_server_interceptor.py#L48. When i try to use grpc.aio.ServerInterceptor and start the server, My Server code

from grpc_opentracing import open_tracing_server_interceptor
from grpc_opentracing.grpcext import intercept_server


import PromServerInterceptor

class MyServicer():
    async def _start_async_server(self, tracer=None,service, grpc_port=8083, http_port=8080):
        tracing_interceptor = open_tracing_server_interceptor(tracer)

       
        server = aio.server(nterceptors=(PromServerInterceptor(),))
        server = intercept_server(server, tracing_interceptor)
        my_service_pb2_grpc.add_MyServicer_to_server(service, server)
        server.add_insecure_port("[::]:" + str(grpc_port))
        await server.start()
        logger.info("Started prometheus server at port %s", http_port)
        prometheus_client.start_http_server(http_port)
        await server.wait_for_termination()

 def async_serve(self, tracer=None, service, grpc_port=8083, http_port=8080):
    loop = asyncio.get_event_loop()
    loop.create_task(self._start_async_server(service, tracer, grpc_port, http_port))
    loop.run_forever()

Following are the lib versions i am using:

    grpcio=1.32.0
    grpcio-opentracing==1.1.4


I see the following error:
  File "src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi", line 646, in grpc._cython.cygrpc._handle_exceptions
  File "src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi", line 745, in _handle_rpc
  File "src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi", line 511, in _handle_unary_unary_rpc
  File "src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi", line 368, in _finish_handler_with_unary_response
  File "prometheus_server_interceptor.py", line 93, in new_behavior
    rsp = await behavior(request_or_iterator, service_context)
  File "/anaconda3/lib/python3.7/site-packages/grpc_opentracing/grpcext/_interceptor.py", line 272, in adaptation
    _UnaryServerInfo(self._method), handler)
  File "/anaconda3/lib/python3.7/site-packages/grpc_opentracing/_server.py", line 145, in intercept_unary
    timeout=servicer_context.time_remaining(),
AttributeError: 'grpc._cython.cygrpc._ServicerContext' object has no attribute 'time_remaining

Following is my PromServerInterceptor implemenation:

from grpc import aio
import grpc

from timeit import default_timer

from python_grpc_prometheus.server_metrics import (SERVER_HANDLED_LATENCY_SECONDS,
                                                   SERVER_HANDLED_COUNTER,
                                                   SERVER_STARTED_COUNTER,
                                                   SERVER_MSG_RECEIVED_TOTAL,
                                                   SERVER_MSG_SENT_TOTAL)
from python_grpc_prometheus.util import type_from_method
from python_grpc_prometheus.util import code_to_string


def _wrap_rpc_behavior(handler, fn):
    if handler is None:
        return None

    if handler.request_streaming and handler.response_streaming:
        behavior_fn = handler.stream_stream
        handler_factory = grpc.stream_stream_rpc_method_handler
    elif handler.request_streaming and not handler.response_streaming:
        behavior_fn = handler.stream_unary
        handler_factory = grpc.stream_unary_rpc_method_handler
    elif not handler.request_streaming and handler.response_streaming:
        behavior_fn = handler.unary_stream
        handler_factory = grpc.unary_stream_rpc_method_handler
    else:
        behavior_fn = handler.unary_unary
        handler_factory = grpc.unary_unary_rpc_method_handler

    return handler_factory(fn(behavior_fn,
                              handler.request_streaming,
                              handler.response_streaming),
                           request_deserializer=handler.request_deserializer,
                           response_serializer=handler.response_serializer)


def split_call_details(handler_call_details, minimum_grpc_method_path_items=3):
    parts = handler_call_details.method.split("/")
    if len(parts) < minimum_grpc_method_path_items:
        return '', '', False

    grpc_service, grpc_method = parts[1:minimum_grpc_method_path_items]
    return grpc_service, grpc_method, True


class PromServerInterceptor(aio.ServerInterceptor):
    async def intercept_service(self, continuation, handler_call_details):

        handler = await continuation(handler_call_details)
        if handler is None:
            return handler

        # only support unary
        if handler.request_streaming or handler.response_streaming:
            return handler

        grpc_service, grpc_method, ok = split_call_details(handler_call_details)
        if not ok:
            return continuation(handler_call_details)

        grpc_type = type_from_method(handler.request_streaming, handler.response_streaming)

        SERVER_STARTED_COUNTER.labels(
            grpc_type=grpc_type,
            grpc_service=grpc_service,
            grpc_method=grpc_method).inc()

        def latency_wrapper(behavior, request_streaming, response_streaming):
            async def new_behavior(request_or_iterator, service_context):
                start = default_timer()

                SERVER_MSG_RECEIVED_TOTAL.labels(
                    grpc_type=grpc_type,
                    grpc_service=grpc_service,
                    grpc_method=grpc_method
                ).inc()

                # default
                code = code_to_string(grpc.StatusCode.UNKNOWN)

                try:
                    rsp = await behavior(request_or_iterator, service_context)
                    if service_context._state.code is None:
                        code = code_to_string(grpc.StatusCode.OK)
                    else:
                        code = code_to_string(service_context._state.code)

                    SERVER_MSG_SENT_TOTAL.labels(
                        grpc_type=grpc_type,
                        grpc_service=grpc_service,
                        grpc_method=grpc_method
                    ).inc()

                    return rsp
                except grpc.RpcError as e:
                    if isinstance(e, grpc.Call):
                        code = code_to_string(e.code())

                    raise e
                finally:
                    SERVER_HANDLED_COUNTER.labels(
                        grpc_type=grpc_type,
                        grpc_service=grpc_service,
                        grpc_method=grpc_method,
                        grpc_code=code
                    ).inc()

                    SERVER_HANDLED_LATENCY_SECONDS.labels(
                        grpc_type=grpc_type,
                        grpc_service=grpc_service,
                        grpc_method=grpc_method).observe(max(default_timer() - start, 0))

            return new_behavior

        return _wrap_rpc_behavior(handler, latency_wrapper)
1

There are 1 answers

0
sneawo On

The interceptor from grpc.aio can look like:

    class RequestIdInterceptor(grpc.aio.ServerInterceptor):
        async def intercept_service(self, continuation, handler_call_details):
            for (header, value) in handler_call_details.invocation_metadata:
                if header == "request_id":
                    ...
                    break
            return await continuation(handler_call_details)

To convert it you need to add async for intercept_service and await for continuation.