I have been using custom blockingqueue inside ThreadExecutorPool
, but some times task workers do not take task and dispacher thread does not put a new task into queue.
I wonder following custom blocking queue implementation causes deadlock. Is there any wrong with this code?
Is is better to and synchronized
block for add()
and take()
methods.
import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.log4j.Logger;
import com.ttech.utils.alarm.Alarm;
import com.ttech.utils.alarm.AlarmInterface;
import com.ttech.utils.counter.Counter;
import com.ttech.utils.counter.SNMPAgent;
public class WorkerQueue<E> extends LinkedBlockingQueue<E> {
private static final long serialVersionUID = 1L;
public Integer lowThreshold;
public Integer highThreshold;
public Integer capacity;
public String name;
public String type;
public Counter counter = null;
public boolean writeAlarmLog;
public static final Logger logger = Logger.getLogger(WorkerQueue.class);
public static Alarm HighThresholdAlarm = null;
public static Alarm CapacityAlarm = null;
// Check the size here and clear capacity and high threshold alarms in case
public E take() throws InterruptedException {
E data = super.take();
counter.setNewValue(super.size());
if (super.size() == lowThreshold) {
if(!this.writeAlarmLog) {
HighThresholdAlarm.clear(name);
CapacityAlarm.clear(name);
} else {
HighThresholdAlarm.clearLog(name, "Queue High Threshold");
CapacityAlarm.clearLog(name, "Queue Capacity Overload");
}
}
return data;
}
public E poll() {
E data = super.poll();
counter.setNewValue(super.size());
if (super.size() == lowThreshold) {
if(!this.writeAlarmLog) {
HighThresholdAlarm.clear(name);
CapacityAlarm.clear(name);
} else {
HighThresholdAlarm.clearLog(name, "Queue High Threshold");
CapacityAlarm.clearLog(name, "Queue Capacity Overload");
}
}
return data;
}
public int drainTo(Collection<? super E> c, int maxElements){
int size = super.drainTo(c,maxElements);
counter.setNewValue(super.size());
return size;
}
// During adding the data to queue check capacity and high threshold raise alarm in case
public boolean add(E data) {
Boolean rc = true;
if (capacity > 0) {
if (this.size() >= capacity) {
logger.error("Queue " + name + " is over capacity");
if(!this.writeAlarmLog)
CapacityAlarm.raise(name);
else
CapacityAlarm.raiseLog(AlarmInterface.AS_CRITICAL, name, "Queue Capacity Overload");
return false;
}
}
if (!super.add(data)) {
logger.error("Cannot add data to queue:" + name);
rc = false;
} else {
counter.setNewValue(super.size());
}
if (highThreshold == super.size()) {
if(!this.writeAlarmLog)
HighThresholdAlarm.raise(name);
else
HighThresholdAlarm.raiseLog(AlarmInterface.AS_CRITICAL, name, "Queue High Threshold");
}
return rc;
}
}
ThreadPoolExecutor
does notadd
tasks to its work queue. It offers them and if not accepted passes them to the configured RejectedExecutionHandler. By default this is the abort policy handler, which causes aRejectedExecutionException
to be thrown.The
add
method in your custom queue will never be called.If you want to track changes in the number of in-flight tasks you have, I would suggest overriding the
beforeExecute
orafterExecute
method of the executor itself. The number of active tasks can be obtained fromgetActiveCount
.