How client sends close signal to graphql subscription server?

123 views Asked by At

I'm working on a graphql server and there is one subscription API. This is the starter code I found on gqlgen documentation:

// CurrentTime is the resolver for the currentTime field.
func (r *subscriptionResolver) CurrentTime(ctx context.Context) (<-chan *model.Time, error) {
    // First you'll need to `make()` your channel. Use your type here!
    ch := make(chan *model.Time)

    // You can (and probably should) handle your channels in a central place outside of `schema.resolvers.go`.
    // For this example we'll simply use a Goroutine with a simple loop.
    go func() {
        // Handle deregistration of the channel here. Note the `defer`
    defer close(ch)

        for {
            // In our example we'll send the current time every second.
            time.Sleep(1 * time.Second)
            fmt.Println("Tick")

            // Prepare your object.
            currentTime := time.Now()
            t := &model.Time{
                UnixTime:  int(currentTime.Unix()),
                TimeStamp: currentTime.Format(time.RFC3339),
            }

            // The subscription may have got closed due to the client disconnecting.
            // Hence we do send in a select block with a check for context cancellation.
            // This avoids goroutine getting blocked forever or panicking,
            select {
            case <-ctx.Done(): // This runs when context gets cancelled. Subscription closes.
                fmt.Println("Subscription Closed")
                // Handle deregistration of the channel here. `close(ch)`
                return // Remember to return to end the routine.
            
            case ch <- t: // This is the actual send.
                // Our message went through, do nothing 
            }
        }
    }()

    // We return the channel and no error.
    return ch, nil
}

I want to know what happens when I receive ctx.Done() signal. Is the client sending this signal by unsubscribing or closing subscription? Or it can happen automatically after some time?( I mean setting some timeout parameter for being idle.) Also, I want to know can a timeout on my side(server side) trigers Done() signal?

1

There are 1 answers

0
Mir On

Looking at the weksocket implemetation of gqlgen (the run implementation in particular), we can see that a context.WithCancel is created, and the cancel function is called in the defer function. So the context is canceled whenever we exit the run function (e.g. when the client close the connection sending a connectionCloseMessageType or a unexpected message like in the default case).

That cancel triggers the ctx.Done() that closes the subscription, since the context is the same.

You can add your own timeout or deadline directly into the resolver using the context.WithDeadline or context.WithTimeout if you want to close the connection after a certain amount of time.

You can also use the InitFunc to provide a common logic among all you subscribe resolver, like we can see in this proposal:

server.AddTransport(&transport.Websocket{
    Upgrader: websocket.Upgrader{
        CheckOrigin: func(r *http.Request) bool {
            return true
        },
    },
    KeepAlivePingInterval: 10 * time.Second,
    InitFunc: func(ctx context.Context, initPayload transport.InitPayload) (context.Context, *transport.InitPayload, error) {
                    payloadAuth := initPayload.Authorization()
        if payloadAuth == "" {
            return ctx, &initPayload, errors.New("the JWT is missing in the initialization payload")
        }

        jwt, err := authenticateJWT(payloadAuth)
        if err != nil {
            return ctx,&initPayload, err
        }

        // Add the JWT expiration as a deadline, and add the reason
        newCtx, _ := context.WithDeadline(transport.AppendCloseReason(ctx, "authentication token has expired"), time.Unix(jwt.ExpiresAt, 0))
        return newCtx, &initPayload,nil
    },
})