I am working on a codebase that implements something similar to this. We are having issues with one of the threads failing to synchronize with other threads when the value of count is incremented, thus going into an infinite loop.

The problem seems to come from the non-atomic behaviour of the post-increment operator.

You can find the code Repl here NB: You may need to run the code at least 3 times to observe it.

I need support to implement increment of count by as many threads as possible in a thread safety way.

class Main {

    static volatile Integer count = new Integer(0); //boxed integer is intentional to demonstrate mutable instance

    static final void Log(Object o) {
        System.out.println(o);
    }

    static synchronized void increaseCount(){
        count++;
    }

    static synchronized Integer getCount(){
        return count;
    }

    public static void main(String[] arg) throws InterruptedException {

        new Thread(() -> {
            while (getCount() != 60) {
                increaseCount();
                Log(count +" thread A");
            }
        }).start();

        new Thread(() -> {
            while (getCount() != 20) {
                increaseCount();
                Log(count +" thread B");
            }
        }).start();

        new Thread(() -> {
            while (getCount() != 50) {
                increaseCount();
                Log(count+" thread C");
            }
        }).start();
    }
}

1 Answers

0
erickson On

If many threads are incrementing a shared counter, there is no guarantee about which thread will see a particular value of the counter. To make sure a particular thread sees a particular value, that thread has to see every value of the counter. And then you might as well just have one thread, because they are all working in lockstep with each other.

If you want to do some work for every value of the counter, with special handling for particular values, and you want to parallelize that workload, every thread needs to be prepared to perform the special handling. Here's an example of how you could do that:

class Main {

    private static class Worker implements Runnable {

        private final AtomicInteger counter;
        private final Set<Integer> triggers;

        Worker(AtomicInteger counter, Set<Integer> triggers) {
            this.counter = counter;
            this.triggers = triggers;
        }

        public void run() {
            String name = Thread.currentThread().getName();
            while (!triggers.isEmpty()) {
                int value = counter.getAndIncrement();
                try { /* Simulate actually doing some work by sleeping a bit. */
                    long delay = (long) (-100 * Math.log(1 - ThreadLocalRandom.current().nextDouble()));
                    TimeUnit.MILLISECONDS.sleep(delay);
                } catch (InterruptedException ex) {
                    break;
                }
                boolean triggered = triggers.remove(value);
                if (triggered) {
                    System.out.println(name + " handled " + value);
                } else {
                    System.out.println(name + " skipped " + value);
                }
            }
        }
    }

    public static void main(String[] arg) throws InterruptedException {
        AtomicInteger counter = new AtomicInteger();
        Set<Integer> triggers = new ConcurrentSkipListSet<>();
        triggers.add(60);
        triggers.add(20);
        triggers.add(50);
        int concurrency = 4;
        ExecutorService workers = Executors.newFixedThreadPool(concurrency);
        for (int i = 0; i < concurrency; ++i) {
            workers.execute(new Worker(counter, triggers));
        }
        workers.shutdown();
    }

}

The number of worker threads can be adjusted so that it makes sense given the number of cores on your machine, and the real workload (how CPU or I/O intensive the tasks are).

In this approach, each value of the counter is processed by just one thread, and it doesn't matter which thread gets a "sentinel" value. But, when all the sentinel values have been processed, all the threads shut down. Threads coordinate with each other through the counter, and the set of "triggers", or sentinel values that they need to handle.