Using BlockingQueue's

577 views Asked by At

I am trying to implement the use of a queue into my code. The point is that I want it to print out the total amount of Words in the files, which means I somehow need it to add all the results together when they're done.

Currently, what my program does, is that I have a reader that runs through files, and returns a string containg the name of the file, and the amount of words in it. Then I use my main method to run through a for-loop for each argument given in the args array. Everytime we go through a new document to check how many words there is, we make it a new thread.

public static void main(final String[] args) {
    Thread t = null;
    if (args.length >= 1) {
        String destinationFileName = args[(args.length-1)];
            for (int l = 0; l < (args.length); l++) {
                final int q = l;
                final Thread y = t;
                Runnable r = new Runnable() {
                    public void run() {
                        String res = readTextFile(args[q]);
                        System.out.println(res);
                    }
                };
                t = new Thread(r);
                t.start();
            }
    } else {
        System.err.println("Not enough input files");
    }
}

So, how do I make a queue that somehow makes them wait for each other so that it doesn't make the mistake of adding to the result on the exact same time?

2

There are 2 answers

0
KSR On

A blocking queue seems unnecessary here. Just have each thread add its results to a thread-safe list, which can be constructed like this:

final List<String> results = 
        Collections.synchronizedList(new ArrayList<String>());

Next you want to wait until all threads are done before aggregating the results. You can do this by calling join on each thread. Add each of your threads to a list called threads, then once all the threads have been started, call this:

for(Thread t : threads) {
    t.join();
}

This code will effectively wait for every thread to finish before moving on.

0
Guy Gavriely On

this is a very common example when multiple threads are necessary to process IO operation like reading files from the disk, I guess this is for tutoring purpose, for real life example consider looking at map reduce frameworks like Hadoop

see how a similar task is done using hadoop here

However, following a pseudo example:

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

class ConsumerProducer implements Runnable {
    private final BlockingQueue<String> map;
    private final BlockingQueue<Map<String, Integer>> reduce;

    ConsumerProducer(BlockingQueue<String> map,
            BlockingQueue<Map<String, Integer>> reduce) {
        this.map = map;
        this.reduce = reduce;
    }

    public void run() {
        try {
            while (true) {
                Map<String, Integer> wordToOccurrences = this.consume(map
                        .take());
                this.produce(wordToOccurrences);
            }
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    private void produce(Map<String, Integer> wordToOccurrences)
            throws InterruptedException {
        reduce.put(wordToOccurrences);
    }

    public Map<String, Integer> consume(String fileName) {
        // read the file and return 'word' -> number of occurrences
        return new HashMap<String, Integer>();
    }
}

class Setup {

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> map = new LinkedBlockingQueue<String>();
        BlockingQueue<Map<String, Integer>> reduce = new LinkedBlockingQueue<Map<String, Integer>>();

        for (String fileName : args) {
            map.put(fileName);
            // assuming every thread process single file, for other options see
            // http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html
            ConsumerProducer c = new ConsumerProducer(map, reduce);
            new Thread(c).start();
        }

        for (int i = 0; i < args.length; i++) {
            Map<String, Integer> wordToOccurrences = reduce.take();
            // start consuming results
        }

        // print merged map of total results
    }
}