BlockingQueue does not handle interrupt just before the call

591 views Asked by At

I'm using RabbitMQ which uses LinkedBlockingQueue by default for consumer. It has a blocking nextDelivery() method which basically calls take() on queue.

But it doesn't catch the interrupt if it was interrupted just before calling that method.

if (Thread.interrupted()) {
    throw new InterruptedException();
}
// If interrupted here, call below will not throw interrupted exception
rabbit.nextDelivery();

It only works if interrupt hits while it is waiting -which is what is written in javadoc too- or if hits if block first.

Throws:
    InterruptedException - if interrupted while waiting

I actually have a case where interrupt hits just where I marked. It works if I put a sleep to beginning or in between but it's still not safe to assume it will always work.

Is there an alternative BlockingQueue implementation addresses this issue? I don't know if interrupt it consumed or not, maybe there is a race condition in static method it returns false but clears the set value somehow?

Edit: It is not about Thread.interrupted() call or not setting flags. If you comment out if block, it is same issue again. Queue method doesn't throw InterruptedException as soon as it enters, it just blocks

1

There are 1 answers

8
Gray On BEST ANSWER

But it doesn't catch the interrupt if it was interrupted just before calling that method.

So if I'm understanding, your question, then something is swallowing an InterruptedException or clearing the interrupt flag. The following code always throws InterruptedException for me.

Thread.currentThread().interrupt();
new LinkedBlockingQueue<String>().take();

It is important to realize that when the InterruptException is thrown, the interrupt flag is cleared. You should always do something like:

try {
    // other methods where this is needed are Object.wait(...), Thread.join(...)
    Thread.sleep(100);
} catch (InterruptedException ie) {
    // re-interrupt the thread
    Thread.currentThread().interrupt();
    // deal with the interrupt by returning or something
    ...
}

See: Why invoke Thread.currentThread.interrupt() when catch any InterruptException?

What often happens is that 3rd party code does not propagate the interrupt status because of bad code. Then you are often SOL since the interrupt will have been swallowed and the take() method will not throw.

Also, it is important to realize that Thread.interrupted() clears the interrupt flag. Typically you want to use Thread.currentThread().isInterrupted() to test for the status of the interrupt flag.

Is there an alternative BlockingQueue implementation addresses this issue?

I'm not sure there is an issue.