Issue setup an asynchronous client callback using the golang implementation of capnp

45 views Asked by At

Using the golang implementation of capnp, I would like to build a server that can accept a callback function from the client to be called later by the server. Here is my example.

example_schema.capnp

using Go = import "/go.capnp";

$Go.package("rpc_example");
$Go.import("rpc_example");

interface ClientCallbackInterface {
    callbackMethod @0 () -> ();
}

interface ServerInterface {
    registerCallback @0 (callback :ClientCallbackInterface);
    invokeCallback @1 ();
}

example_server.go

package main

import (
    "capnproto.org/go/capnp/v3"
    "capnproto.org/go/capnp/v3/rpc"
    "context"
    "example-capnp/rpc_example"
    "fmt"
    "net"
)

type ServerImpl struct {
    callback rpc_example.ClientCallbackInterface
}

func (s *ServerImpl) RegisterCallback(ctx context.Context, call rpc_example.ServerInterface_registerCallback) error {
    fmt.Println("Server: Registering a callback.")

    params := call.Args()
    cb := params.Callback()
    s.callback = cb

    fmt.Println("Server: Callback registered.")

    // This invocation works
    _, _ = s.callback.CallbackMethod(ctx, nil)
    return nil
}

func (s *ServerImpl) InvokeCallback(_ context.Context, _ rpc_example.ServerInterface_invokeCallback) error {
    fmt.Println("Server: Invoking the callback.")

    // This invocation does not work.
    _, _ = s.callback.CallbackMethod(context.Background(), nil)

    fmt.Println("Server: Callback invoked.")
    return nil
}

func main() {
    ctx := context.Background()

    listener, err := net.Listen("tcp", "127.0.0.1:2000")
    if err != nil {
        fmt.Printf("%s", err.Error())
    }

    server := ServerImpl{}
    client := rpc_example.ServerInterface_ServerToClient(&server)

    rwc, err := listener.Accept()
    if err != nil {
        fmt.Printf("%s", err.Error())
    }

    conn := rpc.NewConn(rpc.NewStreamTransport(rwc), &rpc.Options{
        BootstrapClient: capnp.Client(client),
    })

    // Block until the connection terminates.
    select {
    case <-conn.Done():
        client.Release()
    case <-ctx.Done():
        _ = conn.Close()
    }
}

example_client.go

package main

import (
    "capnproto.org/go/capnp/v3/rpc"
    "context"
    "example-capnp/rpc_example"
    "fmt"
    "net"
    "time"
)

type Callback struct{}

func (c Callback) CallbackMethod(_ context.Context, call rpc_example.ClientCallbackInterface_callbackMethod) error {
    fmt.Println("Client: CallbackMethod has been invoked.")
    return nil
}

func main() {
    ctx := context.Background()

    rwc, err := net.Dial("tcp", "127.0.0.1:2000")
    if err != nil {
        panic(err)
    }

    conn := rpc.NewConn(rpc.NewStreamTransport(rwc), nil)
    defer conn.Close()

    d := rpc_example.ServerInterface(conn.Bootstrap(ctx))

    // Register callback
    fmt.Println("Client: Registering a callback.")
    callback := Callback{}
    d.RegisterCallback(ctx, func(params rpc_example.ServerInterface_registerCallback_Params) error {
        cb := rpc_example.ClientCallbackInterface_ServerToClient(callback)
        return params.SetCallback(cb)
    })

    time.Sleep(time.Second * 1)

    fmt.Println("Client: Invoking callback.")
    _, release := d.InvokeCallback(ctx, nil)
    defer release()
    fmt.Println("Client: Invoked callback.")

    time.Sleep(time.Second * 1)

}

The issue

The problem that I am facing is that the ServerImpl.callback becomes invalid when the scope of the RegisterCallback method ends. So the client's callback function can be called in the RegisterCallback method but not in the InvokeCallback method.

This conversation in capnp's cpp repo suggests that this should be possible.

Am I missing something obvious here?

1

There are 1 answers

0
William Hicklin On

I have made this work by calling the AddRef() method on the callback passed to the RegisterCallback method. I.e. by changing the line s.callback = cb to s.callback = cb.AddRef() in the ServerImpl.RegisterCallback method.

According to the documentation, AddRef() creates a new client that points to the same capability. I'm not sure how the clients obtained from the params.Callback() and the AddRef() are different is such a way that one of them remains valid.