Java: two WAITING + one BLOCKED threads, notify() leads to a livelock, notifyAll() doesn't, why?

710 views Asked by At

I was trying to implement something similar to Java's bounded BlockingQueue interface using Java synchronization "primitives" (synchronized, wait(), notify()) when I stumbled upon some behavior I don't understand.

I create a queue capable of storing 1 element, create two threads that wait to fetch a value from the queue, start them, then try to put two values into the queue in a synchronized block in the main thread. Most of the time it works, but sometimes the two threads waiting for a value start seemingly waking up each other and not letting the main thread enter the synchronized block.

Here's my (simplified) code:

import java.util.LinkedList;
import java.util.Queue;

public class LivelockDemo {
    private static final int MANY_RUNS = 10000;

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < MANY_RUNS; i++) { // to increase the probability
            final MyBoundedBlockingQueue ctr = new MyBoundedBlockingQueue(1);

            Thread t1 = createObserver(ctr, i + ":1");
            Thread t2 = createObserver(ctr, i + ":2");

            t1.start();
            t2.start();

            System.out.println(i + ":0 ready to enter synchronized block");
            synchronized (ctr) {
                System.out.println(i + ":0 entered synchronized block");
                ctr.addWhenHasSpace("hello");
                ctr.addWhenHasSpace("world");
            }

            t1.join();
            t2.join();

            System.out.println();
        }
    }

    public static class MyBoundedBlockingQueue {
        private Queue<Object> lst = new LinkedList<Object>();;

        private int limit;

        private MyBoundedBlockingQueue(int limit) {
            this.limit = limit;
        }

        public synchronized void addWhenHasSpace(Object obj) throws InterruptedException {
            boolean printed = false;
            while (lst.size() >= limit) {
                printed = __heartbeat(':', printed);
                notify();
                wait();
            }
            lst.offer(obj);
            notify();
        }

        // waits until something has been set and then returns it
        public synchronized Object getWhenNotEmpty() throws InterruptedException {
            boolean printed = false;
            while (lst.isEmpty()) {
                printed = __heartbeat('.', printed); // show progress
                notify();
                wait();
            }
            Object result = lst.poll();
            notify();
            return result;
        }

        // just to show progress of waiting threads in a reasonable manner
        private static boolean __heartbeat(char c, boolean printed) {
            long now = System.currentTimeMillis();
            if (now % 1000 == 0) {
                System.out.print(c);
                printed = true;
            } else if (printed) {
                System.out.println();
                printed = false;
            }
            return printed;
        }
    }

    private static Thread createObserver(final MyBoundedBlockingQueue ctr,
            final String name) {
        return new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(name + ": saw " + ctr.getWhenNotEmpty());
                } catch (InterruptedException e) {
                    e.printStackTrace(System.err);
                }
            }
        }, name);
    }
}

Here's what I see when it "blocks":

(skipped a lot)

85:0 ready to enter synchronized block
85:0 entered synchronized block
85:2: saw hello
85:1: saw world

86:0 ready to enter synchronized block
86:0 entered synchronized block
86:2: saw hello
86:1: saw world

87:0 ready to enter synchronized block
............................................

..........................................................................

..................................................................................
(goes "forever")

However, if I change the notify() calls inside the while(...) loops of addWhenHasSpace and getWhenNotEmpty methods to notifyAll(), it "always" passes.

My question is this: why does the behavior vary between notify() and notifyAll() methods in this case, and also why is the behavior of notify() the way it is?

I would expect both methods to behave in the same way in this case (two threads WAITING, one BLOCKED), because:

  1. it seems to me that in this case notifyAll() would only wake up the other thread, same as notify();
  2. it looks like the choice of the method which wakes up a thread affects how the thread that is woken up (and becomes RUNNABLE I guess) and the main thread (that has been BLOCKED) later compete for the lock — not something I would expect from the javadoc as well as searching the internet on the topic.

Or maybe I'm doing something wrong altogether?

2

There are 2 answers

2
John Vint On

There appears to be some kind of fairness/barging going on using intrinsic locking - probably due to some optimization. I am guessing, that the native code checks to see if the current thread has notified the monitor it is about to wait on and allows it to win.

Replace the synchronized with ReentrantLock and it should work as you expect it. The different here is how the ReentrantLock handles waiters of a lock it has notified on.


Update:

Interesting find here. What you are seeing is a race between the main thread entering

        synchronized (ctr) {
            System.out.println(i + ":0 entered synchronized block");
            ctr.addWhenHasSpace("hello");
            ctr.addWhenHasSpace("world");
        }

while the other two thread enter their respective synchronized regions. If the main thread does not get into its sync region before at least one of the two, you will experience this live-lock output you are describing.

What appears to be happening is that if both the two consumer threads hit the sync block first they will ping-pong with each other for notify and wait. It may be the case the JVM gives threads that are waiting priority to the monitor while threads are blocked.

0
Solomon Slow On

Without looking too deeply into your code, I can see that you are using a single condition variable to implement a queue with one producer and more than one consumer. That's a recipe for trouble: If there's only one condition variable, then when a consumer calls notify(), there's no way of knowing whether it will wake the producer or wake the other consumer.

There are two ways out of that trap: The simplest is to always use notifyAll().

The other way is to stop using synchronized, wait(), and notify(), and instead use the facilities in java.util.concurrent.locks.

A single ReentrantLock object can give you two (or more) condition variables. Use one exclusively for the producer to notify the consumers, and use the other exclusively for the consumers to notify the producer.

Note: The names change when you switch to using ReentrantLocks: o.wait() becomes c.await(), and o.notify() becomes c.signal().