I'm currently implementing a data container / data structure that has a addRecord(final Redcord record) and a close() method.

public class Container{

    Container() {
    }

    public void addRecord(final Record record){
        // do something with the record
        // add the modified it to the container
    }

    public void close(){

    }     
}

Since in order to store a record to the container several steps need to be done, I want to outsource these steps to threads before finally adding them to the container.

My problem is that I don't want any container instance to use more than let's say 4 threads and that if I have several container instances open at the same time in total not more than, e.g., 12 threads should be used. In addition to that I'd like that once I create the 4th container each of the other 3 open containers should lose one of their threads, such that all 4 containers can use 3 threads each.

I was looking into ThreadPools and ExecutorServices. While settings up a threadpool of size 12 is simple, it is hard to restrict the number of used threads to 4 when making use of ExecutorServices. Recall I want at most 12 threads, that's why I have a static instance of size 12 that I'm sharing between Container instances.

This is what I started with

public class Container{

    public static final ExecutorService SERVICE= Executors.newFixedThreadPool(12);

    final List<Future<Entry>> _fRecords = new LinkedList<>();

    Container() {
    }

    public void addRecord(final Record record){
        _fRecords.add(SERVICE.submit(new Callable<Entry>{

            @Override
            public Entry call() throws Exception {
                return record.createEntry();
            }
        }));
    }

    public void close() throws Exception {
        for(final Future<Entry> f) {
            f.get(); // and add the entry to the container           
        }
    }     
}

Clearly, this will not ensure that at most 4 threads are used by a single instances. Futhermore, I don't see how these Executorservices can be forced to shutdown workers or reassign them, if another container requires a couple of threads.

Questions:

  1. Since Redord itself is an interface and the running time of createEntry() depends on the actual implementation, I want to ensure that different containers don't use the same threads/workers, i.e., I don't want that any thread does the work for two different containers. The idea behind this is that I don't want that a container instance that only processes "long running" records is slowed down by containers that only deal with "short running records". If this does conceptually not make sense (question to you), there would be no need to have isolated threads/workers for different containers.

  2. Is implementing my own ExecutorService the proper way to go? Can someone point me to a project that has a similar problem/solution? Otherwise I guess I'd have to implement my own ExecutorService and share that amongst my Containers, or is there a better solution?

  3. Currently, I'm collecting all futures in the close method, since the container has to store the records/entries in the order they arrived. Clearly, this requires a lot of time and therefore I'd like to do this once the Future is finished. Does it make sense to have an additional worker solely processing the futures, or is it better to have my worker do that work, i.e., having thread interaction.

1 Answers

0
Alexander Pavlov On Best Solutions

I do not think Java runtime provides something which meets your needs out of the box.

I wrote my own class to serve such needs (common pool + restriction for given queue).

public static class ThreadLimitedQueue {
    private final ExecutorService executorService;
    private final int limit;

    private final Object lock = new Object();
    private final LinkedList<Runnable> pending = new LinkedList<>();
    private int inProgress = 0;

    public ThreadLimitedQueue(final ExecutorService executorService, final int limit) {
        this.executorService = executorService;
        this.limit = limit;
    }

    public void submit(Runnable runnable) {
        final Runnable wrapped = () -> {
            try {
                runnable.run();
            } finally {
                onComplete();
            }
        };
        synchronized (lock) {
            if (inProgress < limit) {
                inProgress++;
                executorService.submit(wrapped);
            } else {
                pending.add(wrapped);
            }
        }
    }

    private void onComplete() {
        synchronized (lock) {
            final Runnable pending = this.pending.poll();
            if (pending == null || inProgress > limit) {
                inProgress--;
            } else {
                executorService.submit(pending);
            }
        }
    }
}

The only difference in my case limit is constant but you can modify it. For example, you can replace int limit with Supplier<Integer> limitFunction and this function must provide dynamic limit, e.g.

Supplier<Integer> limitFunction = 12 / containersList.size();

Just make it more robust (e.g. what to do if containersList is empty or exceed 12)