Avoiding wait/notify in a utility to suspend/resume threads

632 views Asked by At

I'm implementing the following Java interface to allow threads to be paused and resumed. I've a working version that uses wait()/notifyAll(), but I wondered if there was an easier way to do it (say, using some nifty widget in java.util.concurrent)?

public interface Suspender {

    /**
     * Go into pause mode: any threads which subsequently call maybePause()
     * will block. Calling pause() if already in pause mode has no effect.
     */
    void pause();

    /**
     * Leave pause mode: any threads which call maybePause() will not block,
     * and any currently paused threads will be unblocked. Calling resume()
     * if not in pause mode has no effect.
     */
    void resume();

    /**
     * If the Suspender is in pause mode, block, and only resume execution
     * when the Suspender is resumed. Otherwise, do nothing.
     */
    void maybePause();

}
2

There are 2 answers

1
TwoThe On

Yes there is, but via architecture.

class Suspender {

  protected static final ScheduledExecutorService executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
  protected final Runnable task;
  protected final long period;
  protected final TimeUnit unit;
  private Future t;

  public Suspender(final Runnable task, final long initialDelay, final long period, final TimeUnit unit) {
    this.task = task;
    this.period = period;
    this.unit = unit;
    t = executor.scheduleAtFixedRate(task, initialDelay, period, unit);
  }

  public boolean pause() {
    if (t == null) {
      return false;
    }
    if (t.cancel(true)) {
      t = null;
      return true;
    }
    return false;
  }

  public boolean resume() {
    if (t == null) {
      t = executor.scheduleAtFixedRate(task, 0, period, unit);
      return true;
    }
    return false;
  }
}

So what it does is to basically schedule and cancel the runnable based on your pause/resume calls.

Obviously that does not pause a thread mid-task, and this is a good thing, because if you could pause a thread during execution, it could potentially cause all kinds of issues. This starts with unreleased locks, but also includes half-open network connections, half-written files and so on.

So whenever you have a thread that has a single task, do not pause it. Only if you have a thread that does a repeated task (hence the ScheduledExecutorService), you can skip further invocations of that task. Your thread can then internally decide whether or not it can pause at this very moment by querying the Thread.interrupted() flag during code states where a pause/cancel would be reasonably possible.

8
bowmore On

An implemetation of this can easily be built on top of a java.uitl.concurrent.Semaphore holding 1 permit.

  • pause() takes the permit (blocking until it gets it)
  • resume() releases the permit again.
  • maybePause() takes the permit and blocks until it gets it. Then releases it again.

To handle multiple pause requests on the same Suspender, you could use another semaphore that acts as a permission to change the pause state, and use that in both the pause and resume methods.

Edit

Ok, once I got to writing I realized it wasn't as simple as I imagined. Nevertheless I did manage to write an implementation using (even just one) Semaphore :

@ThreadSafe
public class SuspenderImpl implements Suspender {

    @GuardedBy("stateLock")
    private boolean paused;
    @GuardedBy("stateLock")
    private int pausedThreads;
    private final Semaphore token = new Semaphore(0);
    private final Object stateLock = new Object();

    @Override
    public void pause() {
        synchronized (stateLock) {
            paused = true;
        }
    }

    @Override
    public void resume() {
        synchronized (stateLock) {
            paused = false;
            token.release(pausedThreads);
            pausedThreads = 0;
        }
    }

    @Override
    public void maybePause() {
        synchronized (stateLock) {
            if (paused) {
                pausedThreads++;
            } else {
                token.release();
            }
        }
        try {
            token.acquire();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

The basic idea differs significantly from my original. In this implementation the maybePause() method decides whether to immediately release a permit or not, based on a paused boolean flag. If set, a permit is added to the Semaphore and the immediately acquired. If not set the number of paused threads is increased and the acquire on the semaphore will block.

The pause() method simply sets the flag. The resume() method sets the flag to false, releases permits equal to the number of paused threads and sets that count to zero.

All mutable state is guarded by an internal lock.