I am a beginner when it comes to gRPC. I have a telegraf instance that wants to export some metrics to a collector via gRPC. The requirement is to implement a gRPC dial-out connection as follows:
- Telegraf reaches out to the collector (telegraf conf has connection details of the collector), to advertise it's public IP and port at which it can stream metrics from.
- The client process reverts back with an ACK and a time period T in it's response.
- The server reads T and establishes a stream onto which it pushes metrics in batches every T seconds.
I've attached a code sample that implements a client dial-in (client reaching out to telegraf) but I am not sure how to implement the above logic.
service MetricService {
rpc SendMetrics(Empty) returns (stream Metric); //Stream starts.
rpc AdvertiseCapability (Empty) returns (AdvertiseCapabilityResponse);
}
message AdvertiseCapabilityResponse {
string ip = 1;
int64 port = 2;
bool subscribeToMetricStream = 3;
}
//client.go
client := pb.NewMetricServiceClient(conn)
// Create a context with timeout
ctx := context.WithoutCancel(context.Background())
res, err := client.AdvertiseCapability(ctx, &pb.Empty{})
// Call the SendMetrics method to receive the stream of metrics
stream, err := client.SendMetrics(ctx, &pb.Empty{})
if err != nil {
log.Fatalf("Stream RPC Failed: %v", err)
}
// Receive and print metrics from the stream
for {
metric, err := stream.Recv()
}
//server.go
type metricServer struct {
pb.UnimplementedMetricServiceServer
}
func (s *metricServer) AdvertiseCapability(ctx context.Context, request *pb.Empty) (*pb.AdvertiseCapabilityResponse, error) {
return &pb.AdvertiseCapabilityResponse{Ip: "<some_ip>", Port: 3000}, nil
}
func (s *metricServer) SendMetrics(empty *pb.Empty, stream pb.MetricService_SendMetricsServer) error {
for _, metric := range metrics {
time.Sleep(2 * time.Second)
stream.Send(metric)
}
time.Sleep(20 * time.Second)
fmt.Println("Stream Ending...")
return nil
}
func main(){
lis, err := net.Listen("tcp", "0.0.0.0:50052")
grpcServer := grpc.NewServer()
// register service into grpc server
pb.RegisterMetricServiceServer(grpcServer, &metricServer{})
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("grpc serve err: %v", err)
}
Unfortunately due to requirement constraints, I am unable to use gnmi to implement this. I am confused on how to implement the RPCs for this as I do not see how this can happen over a single connection. In order to establish a server-side stream, the client would have to invoke it on a separate connection. Because the first connection is telegraf reaching out, the second connection is the client reaching out to telegraf to start a stream. I also have to perform mTLS to secure the channel.
I am not sure if this is correct or even if this is the recommended way to get things done. Should I probably look at bi-directional streaming RPCs? A very basic pseudo-code would make things a lot more clearer for me.