Parking threads in service

1.1k views Asked by At

I'm experimenting with threads parking and decided to build some sort of service. Here is how it looks like:

public class TestService {
    private static final Logger logger = LoggerFactory.getLogger(TestService.class); // logback I think this logger causes some troubles

    private final CountDownLatch stopLatch;
    private final Object parkBlocker = new Object();
    private volatile boolean stopped;
    private final Thread[] workers;

    public TestService(int parallelizm) {
        stopLatch = new CountDownLatch(parallelizm);
        workers = new Thread[parallelizm];
        for (int i = 0; i < parallelizm; i++) {
            workers[i] = new Thread(() -> {
                try {
                    while (!stopped) {
                        logger.debug("Parking " + Thread.currentThread().getName());
                        LockSupport.park(parkBlocker);
                        logger.debug(Thread.currentThread().getName() + " unparked");
                    }
                } finally {
                    stopLatch.countDown();
                }
            });
        }
    }

    public void start() {
        Arrays.stream(workers).forEach(t -> {
            t.start();
            logger.debug(t.getName() + " started");
        });
    }

    public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
        boolean stoppedSuccefully = false;
        this.stopped = true;
        unparkWorkers();
        if (stopLatch.await(timeout, unit)) {
            stoppedSuccefully = true;
        }
        return stoppedSuccefully;
    }

    private void unparkWorkers() {
        Arrays.stream(workers).forEach(w -> {
            LockSupport.unpark(w);
            logger.debug("Un-park call is done on " + w.getName());
        });
    }
}

The issue I faced with was that if I then test this service as follows:

public static void main(String[] args) = {
  while(true) {
    TestService service = new TestService(2);
    service.start();
    if (!service.stop(10000, TimeUnit.MILLISECONDS))
      throw new RuntimeException();
  }
}

I sometimes got the following behavior:

14:58:55.226 [main] DEBUG com.pack.age.TestService - Thread-648 started
14:58:55.227 [Thread-648] DEBUG com.pack.age.TestService - Parking Thread-648
14:58:55.227 [main] DEBUG com.pack.age.TestService - Thread-649 started
14:58:55.227 [main] DEBUG com.pack.age.TestService - Un-park call is done on Thread-648
14:58:55.227 [Thread-648] DEBUG com.pack.age.TestService - Thread-648 unparked
14:58:55.227 [main] DEBUG com.pack.age.TestService - Un-park call is done on Thread-649
14:58:55.227 [Thread-649] DEBUG com.pack.age.TestService - Parking Thread-649
Exception in thread "main" java.lang.RuntimeException
    at com.pack.age.Test$.main(Test.scala:12)
    at com.pack.age.Test.main(Test.scala)

The thread is hanging out on parking:

"Thread-649" #659 prio=5 os_prio=0 tid=0x00007efe4433f000 nid=0x7691 waiting on condition [0x00007efe211c8000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000720739a68> (a java.lang.Object)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at com.pack.age.TestService.lambda$new$0(TestService.java:27)
    at com.pack.age.TestService$$Lambda$1/1327763628.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:748)

I don't see any race in park-unpark in the service. Moreover if the unpark is called before park, the park is guaranteed no to block (that's what javadocs say).

Maybe I misuse LockSupport::park. Can you suggest any fix?

1

There are 1 answers

0
Eugene On BEST ANSWER

This has nothing to do with logger, though it's usage brings the problem to the surface. You have a race condition, as simple as that. Before explaining that race condition you need to understand a few things from LockSupport::unpark documentation first:

Makes available the permit for the given thread, if it was not already available. If the thread was blocked on park then it will unblock. Otherwise, its next call to park is guaranteed not to block.

The first point is explain here. The short version is : if you have a thread that has already been started, but has not yet called park, and within this period of time (between the start of the thread and park), some other thread calls unpark on the first one : that thread will not park, at all. The permit will be available immediately. May be this little drawing will make it more clear:

(ThreadA)  start ------------------ park --------- ....

(ThreadB)  start ----- unpark -----

Notice how ThreadB calls unpark(ThreadA) between the period where ThreadA has called start and park. As such, when ThreadA reaches park: it is guaranteed not to block, exactly like the documentation says.

The second point from the same documentation is:

This operation is not guaranteed to have any effect at all if the given thread has not been started.

Let's see that via a drawing:

Thread B calls unpark(ThreadA) --- Thread A starts --- Thread A calls park 

After ThreadA calls park, it will hang forever, since ThreadB never calls unpark on it again. Notice that the call to unpark was made before ThreadA has started (unlike the previous example).

And this is exactly what happens in your case:

LockSupport.unpark(w); (from unparkWorkers) is called before t.start(); from public void start(){...}. In simpler words - your code calls unpark on both workers before they even start, as such when they ultimately reach park - they are stuck, no one is able to unpark them. The fact that you see this with a logger and not with System::out has most probably to do with the face that when you println - there is a synchronized method under the hood.


As a matter of fact, LockSupport offers exactly the semantics needed to prove this. For this we need (for simplicity : SOProblem service = new SOProblem(1);)

static class ParkBlocker {

    private volatile int x;

    public ParkBlocker(int x) {
        this.x = x;
    }

    public int getX() {
        return x;
    }
}

And now we need to insert this in proper methods. First flag that fact that we have called unpark:

private void unparkWorkers() {
    Arrays.stream(workers).forEach(w -> {
        LockSupport.unpark(w);
        logger.debug("Un-park call is done on " + w.getName());
    });
    /*
     * add "1" to whatever there is already in pb.x, meaning
     * we have done unparking _also_
     */
    int y = pb.x;
    y = y + 1;
    pb.x = y;
}

Then reset the flag after a cycle has ended:

public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
    boolean stoppedSuccefully = false;
    stopped = true;
    unparkWorkers();
    if (stopLatch.await(timeout, unit)) {
        stoppedSuccefully = true;
        // reset the flag
        pb.x = 0;
    }
    return stoppedSuccefully;
}

Then change the constructor to flag that the thread has started:

  .....
  while (!stopped) {
       logger.debug("Parking " + Thread.currentThread().getName());
       // flag the fact that thread has started. add "2", meaning
       // thread has started
       int y = pb.x;
       y = y + 2;
       pb.x = y;
       LockSupport.park(pb);
       logger.debug(Thread.currentThread().getName() + " unparked");
  }

Then, when your thread freezes you need to inspect the flag:

 public static void main(String[] args) throws InterruptedException {
    while (true) {
        SOProblem service = new SOProblem(1); // <-- notice a single worker, for simplicity
        service.start();
        if (!service.stop(10000, TimeUnit.MILLISECONDS)) {
            service.debug();
            throw new RuntimeException();
        }
    }
}

where debug method is:

public void debug() {
    Arrays.stream(workers)
          .forEach(x -> {
              ParkBlocker pb = (ParkBlocker) LockSupport.getBlocker(x);
              if (pb != null) {
                  System.out.println("x = " + pb.getX());
              }
          });
}

When the issue re-produces, you have called unpark before you called park, that happens when x = 3 as the output.