Optimizing multithreaded queue processing code in Java

5.9k views Asked by At

I have a code that creates 10 objects of a class, that implement runnable. Each object is kept in hashmap for later usage. Each object is running on a separate thread. Each object has a public method where items can be added into the queue. The object processes the queue with infinite loop.

I want to know if this solution is OK or is there something that is totally wrong/useless/missing(especially the use of volatile and syncronized keywords)?

MultithreadingTest.class

package multithreadingtest;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Multithreading example.
 *
 * @author lkallas
 */
public class MultithreadingTest {

    private static final int NUM_OF_THREADS = 10;
    private static String name;
    private static final Map<Integer, ThreadWorker> objectMap = new HashMap<>();    //Map or storing Threadworker objects

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(NUM_OF_THREADS);
        //Creating threads
        for (int i = 0; i < NUM_OF_THREADS; i++) {
            name = "ThreadWorker" + String.valueOf(i);
            ThreadWorker thread = new ThreadWorker(name);
            objectMap.put(i, thread);   //Add objects to map            
            executor.execute(thread);
        }
        for (int i = 0; i < 10; i++) {
            ThreadWorker worker = objectMap.get(i);
            for (int j = 0; j < 10; j++) {
                worker.addToQueue("Test1");
            }
        }
    }
}

ThreadWorker.class

package multithreadingtest;

import java.util.LinkedList;
import java.util.Queue;

/**
 * Worker class that performs operations in another thread.
 *
 * @author lkallas
 */
public class ThreadWorker implements Runnable {

    private final String threadName;
    private volatile Queue workQueue;   //Does this have to volatile??

    /**
     * Class constructor.
     *
     * @param threadName Name of the thread for identifying.
     *
     */
    public ThreadWorker(String threadName) {
        this.threadName = threadName;
        this.workQueue = new LinkedList();
        System.out.println(String.format("Thread %s started!", threadName));
    }

    /**
     * Adds items to the queue.
     *
     * @param object Object to be added to the queue.
     */
    public synchronized void addToQueue(String object) {
        workQueue.add(object); //Does it have to be syncronized void
    }

    @Override
    public void run() {
        while (true) {
            if (!workQueue.isEmpty()) {
                System.out.println("Queue size: " + workQueue.size());
                String item = (String) workQueue.peek();
                //Process item
                System.out.println(threadName + " just processed " + item);
                workQueue.remove();
            }
        }
    }
}

Any help and suggestions are much appreciated!

1

There are 1 answers

1
Zim-Zam O'Pootertoot On BEST ANSWER
  1. workQueue is thread-local and does not need to be volatile (it is private and doesn't have a public setter method)
  2. Make workQueue a BlockingQueue - this queue is thread-safe, so you don't need to synchronize addToQueue. In addition, you don't need to spin inside of run - instead you call take() on the queue, and the thread blocks until an item is available.
  3. You appear to be doing too much work inside of MultithreadingTest - rather than adding items to individual queues, you can instead have all workers share the same BlockingQueue, then main just needs to add items to that single BlockingQueue and the workers will take care of load balancing themselves. Note that even though the BlockingQueue is shared, it still doesn't need to be volatile because the reference to the BlockingQueue doesn't change once a worker is initialized (make the field private final BlockingQueue<String> workQueue - a final field never needs to be volatile).