how to propagate the ServiceRequestContext to my customized thread pool

199 views Asked by At

I have a scenario, which process armeria request, and dispatch some event to guava's EventBus. the problem is I loss the context while process the event in the EventBus handler. I want to know is there any way to let the event processor access ServiceRequestContext.

class EventListener {
    @Subscribe
    public void process(SomeCustomizedClass event) {
        final ServiceRequestContext context = ServiceRequestContext.currentOrNull();
        log.info("process ServiceRequestContext context={}", context);
    }
}

register the event handler.

EventBus eventBus = new AsyncEventBus(ThreadPoolTaskExecutor());
eventBus.register(new EventListener());

here is my Armeria service

@Slf4j
public class NameAuthRestApi {
    final NameAuthService nameAuthService;

    @Post("/auth")
    @ProducesJson
    public Mono<RealNameAuthResp> auth(RealNameAuthReq req) {
        return nameAuthService.auth(NameAuthConverter.CONVERTER.toDto(req))
                              .handle((result, sink) -> {
                                  if (result.isSuccess()) {
                                      // I post an event here, but the event process couldn't access the ServiceRequestContext
                                      // that's would be the problem.
                                      eventBus.post(new SomeCustomizedClass(result));

                                      final RealNameAuthResp realNameAuthResp = new RealNameAuthResp();
                                      realNameAuthResp.setTradeNo(result.getTradeNo());
                                      realNameAuthResp.setSuccess(true);
                                      sink.next(realNameAuthResp);
                                      sink.complete();
                                  } else {
                                      sink.error(new SystemException(ErrorCode.API_ERROR, result.errors()));
                                  }
                              });
    }
}
3

There are 3 answers

1
minwoox On

You need to do:

public Mono<RealNameAuthResp> auth(ServiceRequestContxt ctx, RealNameAuthReq req) {
    // Executed by an EventLoop 1.
    // This thread has the ctx in its thread local.
    return nameAuthService.auth(NameAuthConverter.CONVERTER.toDto(req))
                          .handle((result, sink) -> {
                              // Executed by another EventLoop 2.
                              // But this doens't.
                              try (SafeCloseable ignord = ctx.push()) {
                                  if (result.isSuccess()) {
                                      ...
                                  } else {
                                      ...
                                  }
                              }
                          });
}

The problem is that the handle method is executed by another thread that does not have the ctx in its thread local. So, you should manually set the ctx.

You can achieve the same effect using xAsync method with the ctx.eventLoop():

public Mono<RealNameAuthResp> auth(ServiceRequestContxt ctx, RealNameAuthReq req) {
    return nameAuthService.auth(NameAuthConverter.CONVERTER.toDto(req))
                          .handleAsync((result, sink) -> {
                              if (result.isSuccess()) {
                                  ...
                              } else {
                                  ...
                              }
                          }, ctx.eventLoop());
}
0
Eone On

@minwoox, to simplify, my code would be looks like this

public class NameAuthRestApi {
    JobExecutor executor = new JobExecutor();

    @Post("/code")
    public HttpResponse authCode(ServiceRequestContext ctx) {
        try (SafeCloseable ignore = ctx.push()) {
            executor.submit(new Task(new Event("eone")));
        }
        return HttpResponse.of("OK");
    }

    @Getter
    @AllArgsConstructor
    public static class Event {
        private String name;
    }

    @RequiredArgsConstructor
    @Slf4j
    public static class Task implements Runnable {
        final Event event;

        @Override
        public void run() {
            // couldn't access ServiceRequestContext here
            ServiceRequestContext ctx = ServiceRequestContext.currentOrNull();
            log.info("ctx={}, event={}", ctx, event);
        }
    }

    public static class JobExecutor {
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        public void submit(Task task) {
            executorService.submit(task);
        }
    }
}

1
minwoox On

We have two ways to solve this: First, use the executor which has the ctx:

ctx.eventLoop().submit(new Task(new Event("eone")));
// If it's blocking task, then we must use ctx.blockingTaskExecutor().

Or, propagate the ctx manually:


@Slf4j
public static class Task implements Runnable {
    private final Event event;
    private final ServiceRequestContext ctx;

    Task(Event event) {
        this.event = event;
        ctx = ServiceRequestContext.current();
    }

    @Override
    public void run() {
        try (SafeCloseable ignored = ctx.push()) {
            ...
        }
    }
}