Implementing gRPC DialOut streaming of metrics

36 views Asked by At

I am a beginner when it comes to gRPC. I have a telegraf instance that wants to export some metrics to a collector via gRPC. The requirement is to implement a gRPC dial-out connection as follows:

  1. Telegraf reaches out to the collector (telegraf conf has connection details of the collector), to advertise it's public IP and port at which it can stream metrics from.
  2. The client process reverts back with an ACK and a time period T in it's response.
  3. The server reads T and establishes a stream onto which it pushes metrics in batches every T seconds.

I've attached a code sample that implements a client dial-in (client reaching out to telegraf) but I am not sure how to implement the above logic.

service MetricService {
    rpc SendMetrics(Empty) returns (stream Metric); //Stream starts.
    rpc AdvertiseCapability (Empty) returns (AdvertiseCapabilityResponse);
}
message AdvertiseCapabilityResponse {
    string ip = 1;
    int64 port = 2;
    bool subscribeToMetricStream = 3;
}
    //client.go
    client := pb.NewMetricServiceClient(conn)
    // Create a context with timeout
    ctx := context.WithoutCancel(context.Background())
    res, err := client.AdvertiseCapability(ctx, &pb.Empty{})
    // Call the SendMetrics method to receive the stream of metrics
    stream, err := client.SendMetrics(ctx, &pb.Empty{})
    if err != nil {
        log.Fatalf("Stream RPC Failed: %v", err)
    }
    // Receive and print metrics from the stream
    for {
        metric, err := stream.Recv()
    }
//server.go
type metricServer struct {
    pb.UnimplementedMetricServiceServer
}

func (s *metricServer) AdvertiseCapability(ctx context.Context, request *pb.Empty) (*pb.AdvertiseCapabilityResponse, error) {
    return &pb.AdvertiseCapabilityResponse{Ip: "<some_ip>", Port: 3000}, nil
}

func (s *metricServer) SendMetrics(empty *pb.Empty, stream pb.MetricService_SendMetricsServer) error {
    for _, metric := range metrics {
        time.Sleep(2 * time.Second)
        stream.Send(metric)
    }
    time.Sleep(20 * time.Second)
    fmt.Println("Stream Ending...")
    return nil
}

func main(){
    lis, err := net.Listen("tcp", "0.0.0.0:50052")
    grpcServer := grpc.NewServer()
    // register service into grpc server
    pb.RegisterMetricServiceServer(grpcServer, &metricServer{})
    if err := grpcServer.Serve(lis); err != nil {
        log.Fatalf("grpc serve err: %v", err)
    }

Unfortunately due to requirement constraints, I am unable to use gnmi to implement this. I am confused on how to implement the RPCs for this as I do not see how this can happen over a single connection. In order to establish a server-side stream, the client would have to invoke it on a separate connection. Because the first connection is telegraf reaching out, the second connection is the client reaching out to telegraf to start a stream. I also have to perform mTLS to secure the channel.

I am not sure if this is correct or even if this is the recommended way to get things done. Should I probably look at bi-directional streaming RPCs? A very basic pseudo-code would make things a lot more clearer for me.

0

There are 0 answers