Syncing multiple asynchronous requests in Java

1.4k views Asked by At

I am using the official Telegram Api (TDLib) in Java to request information about all members of a group. Using their IDs I am sending asynchronous requests to the server and I receive User objects for each request inside the ResultHandler like this:

private static ArrayList<TdApi.User> chatUsers= new ArrayList<>();

private static void addUsers(){

    for (int i = 0; i < userIDs.length; i++){

        client.send(new TdApi.GetUser(userIDs[i]), new Client.ResultHandler() {

                        @Override
                        public void onResult(TdApi.Object object) {
                            TdApi.User user = (TdApi.User)object;
                            chatUsers.add(user);
                        }
        });
    }
}

Since I am pretty new to anychronous requests in Java I am wondering the following:

  1. What would be an appropriate approach to call this method and wait for all results received before moving on?

  2. Generally, when calling several requests consecutively and waiting for each result before moving on with the next request, what is an usual approach instead of nesting the requests inside of each other to sync them in Java? I want to avoid something like this:

    private static void getSupergroupId(int chatId){
    
    //first step
    client.send(new TdApi.GetChat(chatId), new Client.ResultHandler() {
                @Override
                public void onResult(TdApi.Object object) {
                    supergroupId = ((TdApi.ChatTypeSupergroup)((TdApi.Chat)object).type).supergroupId;
    
                    //second step when result received
                    client.send(new TdApi.GetSupergroupMembers(supergroupId, null, 0, 200), new Client.ResultHandler() {
                        @Override
                        public void onResult(TdApi.Object object) {
                            chatMembers = ((TdApi.ChatMembers)object).members;
    
                            //further steps which need to wait for the result of the step before
                        }
                    });
                }
    });
    }
    

Thank you!

1

There are 1 answers

0
Mikita Harbacheuski On

1 One of Java Synchronizers should work. I would start with CountDownLatch as it the simplest one.

  private static final ArrayList<TdApi.User> chatUsers = Collections.synchronizedList(new ArrayList<>());

  private static void addUsers() {
    final CountDownLatch latch = new CountDownLatch(userIDs.length);
    for (int i = 0; i < userIDs.length; i++) {
      client.send(new TdApi.GetUser(userIDs[i]), new Client.ResultHandler() {
        @Override
        public void onResult(TdApi.Object object) {
          TdApi.User user = (TdApi.User) object;
          chatUsers.add(user);
          latch.countDown();
        }
      });
    }
    // handle InterruptedException
    latch.await(10, TimeUnit.SECONDS);
  }

Notice that chatUsers is accessed from different threads so access to it should be guarded by a lock. I used Collections.synchronizedList in the example for simplicity. However you should use more fine-grained approach.

2 Take a look at Completablefuture, seems that is what you are looking for.

  private static void getSupergroupId(int chatId) {
    CompletableFuture.supplyAsync(() -> {
      AtomicReference<TdApi.ChatTypeSupergroup> atomicReference = new AtomicReference<>();
      CountDownLatch latch = new CountDownLatch(1);
      client.send(new TdApi.GetChat(chatId), new Client.ResultHandler() {
        @Override
        public void onResult(TdApi.Object object) {
          atomicReference.set(((TdApi.ChatTypeSupergroup) ((TdApi.Chat) object).type).supergroupId);
          latch.countDown();
        }
      });
      // handle InterruptedException
      latch.await(10, TimeUnit.SECONDS);
      return atomicReference.get();
    }).thenApply(supergroupId -> {
      AtomicReference<TdApi.ChatMembers> atomicReference = new AtomicReference<>();
      CountDownLatch latch = new CountDownLatch(1);
      client.send(new TdApi.GetSupergroupMembers(supergroupId, null, 0, 200), new Client.ResultHandler() {
        @Override
        public void onResult(TdApi.Object object) {
          atomicReference.set((TdApi.ChatMembers) object).members;
          latch.countDown();
        }
      });
      // handle InterruptedException
      latch.await(10, TimeUnit.SECONDS);
      return atomicReference.get();
    });
    //further steps which need to wait for the result of the step before)
  }

Notice that the same trick with CountDownLatch is used to wait for the result. Again you result from callback should be guarded by lock as it is accessed by different threads. To be 100% clear it's not required because of piggybacking on CountDownLatch however i would recommend to use explicit synchronization anyway, for example AtomicReference.