Using the golang implementation of capnp, I would like to build a server that can accept a callback function from the client to be called later by the server. Here is my example.
example_schema.capnp
using Go = import "/go.capnp";
$Go.package("rpc_example");
$Go.import("rpc_example");
interface ClientCallbackInterface {
callbackMethod @0 () -> ();
}
interface ServerInterface {
registerCallback @0 (callback :ClientCallbackInterface);
invokeCallback @1 ();
}
example_server.go
package main
import (
"capnproto.org/go/capnp/v3"
"capnproto.org/go/capnp/v3/rpc"
"context"
"example-capnp/rpc_example"
"fmt"
"net"
)
type ServerImpl struct {
callback rpc_example.ClientCallbackInterface
}
func (s *ServerImpl) RegisterCallback(ctx context.Context, call rpc_example.ServerInterface_registerCallback) error {
fmt.Println("Server: Registering a callback.")
params := call.Args()
cb := params.Callback()
s.callback = cb
fmt.Println("Server: Callback registered.")
// This invocation works
_, _ = s.callback.CallbackMethod(ctx, nil)
return nil
}
func (s *ServerImpl) InvokeCallback(_ context.Context, _ rpc_example.ServerInterface_invokeCallback) error {
fmt.Println("Server: Invoking the callback.")
// This invocation does not work.
_, _ = s.callback.CallbackMethod(context.Background(), nil)
fmt.Println("Server: Callback invoked.")
return nil
}
func main() {
ctx := context.Background()
listener, err := net.Listen("tcp", "127.0.0.1:2000")
if err != nil {
fmt.Printf("%s", err.Error())
}
server := ServerImpl{}
client := rpc_example.ServerInterface_ServerToClient(&server)
rwc, err := listener.Accept()
if err != nil {
fmt.Printf("%s", err.Error())
}
conn := rpc.NewConn(rpc.NewStreamTransport(rwc), &rpc.Options{
BootstrapClient: capnp.Client(client),
})
// Block until the connection terminates.
select {
case <-conn.Done():
client.Release()
case <-ctx.Done():
_ = conn.Close()
}
}
example_client.go
package main
import (
"capnproto.org/go/capnp/v3/rpc"
"context"
"example-capnp/rpc_example"
"fmt"
"net"
"time"
)
type Callback struct{}
func (c Callback) CallbackMethod(_ context.Context, call rpc_example.ClientCallbackInterface_callbackMethod) error {
fmt.Println("Client: CallbackMethod has been invoked.")
return nil
}
func main() {
ctx := context.Background()
rwc, err := net.Dial("tcp", "127.0.0.1:2000")
if err != nil {
panic(err)
}
conn := rpc.NewConn(rpc.NewStreamTransport(rwc), nil)
defer conn.Close()
d := rpc_example.ServerInterface(conn.Bootstrap(ctx))
// Register callback
fmt.Println("Client: Registering a callback.")
callback := Callback{}
d.RegisterCallback(ctx, func(params rpc_example.ServerInterface_registerCallback_Params) error {
cb := rpc_example.ClientCallbackInterface_ServerToClient(callback)
return params.SetCallback(cb)
})
time.Sleep(time.Second * 1)
fmt.Println("Client: Invoking callback.")
_, release := d.InvokeCallback(ctx, nil)
defer release()
fmt.Println("Client: Invoked callback.")
time.Sleep(time.Second * 1)
}
The issue
The problem that I am facing is that the ServerImpl.callback
becomes invalid when the scope of the RegisterCallback
method ends. So the client's callback function can be called in the RegisterCallback
method but not in the InvokeCallback
method.
This conversation in capnp's cpp repo suggests that this should be possible.
Am I missing something obvious here?
I have made this work by calling the
AddRef()
method on the callback passed to theRegisterCallback
method. I.e. by changing the lines.callback = cb
tos.callback = cb.AddRef()
in theServerImpl.RegisterCallback
method.According to the documentation,
AddRef()
creates a new client that points to the same capability. I'm not sure how the clients obtained from theparams.Callback()
and theAddRef()
are different is such a way that one of them remains valid.