Using cyclic barrier does not wait till all threads finish

1.9k views Asked by At

Here is what I am trying to do. I have a number of threads which should all wait at a common point before they proceed, so obvious solution is to use CyclicBarrier. But I want to also compute the total time taken by the threads to execute. I defined the following utility method in class ConcurrentExecutionActionTimer.

    public static long elapsedTimeUsingCyclicBarrier(Executor executor, int concurrency, final Runnable action) throws InterruptedException
        {
            final Runnable barrierAction = new Runnable() {
                @Override
                public void run() {
                    System.out.println("Condition of barrier is met.");
                }
            };

            final

 CyclicBarrier barrier = new CyclicBarrier(concurrency, barrierAction);
        final CountDownLatch done = new CountDownLatch(concurrency);

        for(int i=0; i<concurrency; i++ ){
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("Waiting at barrier.");
                        barrier.await();
                        action.run();
                        //Cyclic barrier gets reset automatically. Again wait for them to finish.
                        barrier.await();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    } finally {
                        done.countDown();
                    }
                }
            });
        }
        long startNanoTime = System.nanoTime();
        done.await();
        return System.nanoTime() - startNanoTime;
    }

Then I called it up like:

public static void main(String[] args) {
        //Executor is replacement for common thread idiom: (new Thread(r)).start() to e.execute(r)
        ExecutorService executor = Executors.newFixedThreadPool(10);
        Worker action = new Worker();
        int concurrency = 5;
        try {
        long elapsedTime = ConcurrentExecutionActionTimer.elapsedTimeUsingCyclicBarrier(executor, concurrency, action);
        double seconds = (double)elapsedTime / 1000000000.0;
        System.out.println("Time Taken approximately: " + seconds + "seconds.");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

Here Worker is suppose my thread that does some work. For example:

class Worker implements Runnable {
    @Override
    public void run() {
        System.out.println("Doing work.");
        for(int i=0; i<20; i++) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        System.out.println("Finished.");
    }
}

As I wanted to print the time taken I had to use CountDownLatch to make sure the control does not return back to main before all the threads are finished. Do we have any other way to make sure the same functionality?

2

There are 2 answers

1
Claudio Corsi On BEST ANSWER

You should use the same CyclicBarrier instance. The only difference is that you make the cyclic barrier count be #threads + 1. You can then use that barrier to calculate the time it took all of the threads to complete. The start time is calculated when the first barrier has been reached and the end time is calculated when the second barrier has been reached. This way you know approximately when all the threads have been started and when all of them have completed.

Therefore this:

long startNanoTime = System.nanoTime();
done.await();
return System.nanoTime() - startNanoTime;

becomes:

barrier.await()
long startNanoTime = System.nanoTime();
barrier.await();
return System.nanoTime() - startNanoTime;
0
PHUWIT CHANTAFONG On
import java.io.*;

import java.util.*; import java.util.concurrent.CyclicBarrier;

class BusLine {

private String destination;
protected static int max_seat, checkpoint;

private ArrayList<Bus> BUSA = new ArrayList<Bus>();
private ArrayList<Bus> BUSC = new ArrayList<Bus>();
private ArrayList<Group> GROUP_A = new ArrayList<Group>();
private ArrayList<Group> GROUP_C = new ArrayList<Group>();

public BusLine(int ms, int cp, String d) {
    max_seat = ms;
    checkpoint = cp;
    destination = d;
}

public String getDestination() {
    return destination;
}

public void printAirportCheckpoint() {
    System.out.println();
    System.out.printf("%s >> %d airport-bound buses have been allocated.", Thread.currentThread().getName(), BUSA.size());
}

public void printCityCheckpoint() {
    System.out.println();
    System.out.printf("%s >> %d city-bound buses have been allocated.", Thread.currentThread().getName(), BUSC.size());
    System.out.println();
    System.out.println();
}

public void BusBoundA() {

    int temp = 0;
    for (int i = 0; i < BUSA.size(); i++) {
        if (BUSA.get(i).getName().equals("A" + i)) {
            temp++;
        }
    }
    System.out.println();
    System.out.printf("%s >> ==== Airport Bound ====", Thread.currentThread().getName());
    System.out.println();
    for (int i = 0; i < temp; i++) {
        System.out.printf("%s >> %s : ", Thread.currentThread().getName(), "A" + i);
        for (int j = 0; j < GROUP_A.size(); j++) {
            if (GROUP_A.get(j).getBusname().equals("A" + i)) {
                System.out.printf("  %-20s(%2d seats)", GROUP_A.get(j).getName(), GROUP_A.get(j).getSeat());
                //System.out.printf(",");
            }
        } System.out.println();
    }
}

public void BusBoundC() {

    int temp = 0;
    for (int i = 0; i < BUSC.size(); i++) {
        if (BUSC.get(i).getName().equals("C" + i)) {
            temp++;
        }
    }
    System.out.println();
    System.out.printf("%s >> ==== City Bound ====", Thread.currentThread().getName());
    System.out.println();
    for (int i = 0; i < temp; i++) {
        System.out.printf("%s >> %s : ", Thread.currentThread().getName(), "C" + i);
        for (int j = 0; j < GROUP_C.size(); j++) {
            if (GROUP_C.get(j).getBusname().equals("C" + i)) {
                System.out.printf("  %-20s(%2d seats)", GROUP_C.get(j).getName(), GROUP_C.get(j).getSeat());
                //System.out.printf(",");
            }
        } System.out.println();
    }
}

synchronized public void allocateBus(Data d) {
    
    TicketCounter T = (TicketCounter) (Thread.currentThread());
    while (d.getSeat() != 0) {
        if ("A".equals(d.getDestination())) {
            if (BUSA.size() == 0 || BUSA.get(BUSA.size() - 1).getAvailableSeat() == 0) {
                BUSA.add(new Bus("A" + BUSA.size()));
            }
            if (d.getSeat() <= BUSA.get(BUSA.size() - 1).getAvailableSeat()) {
                System.out.printf("%s >> Transaction %2d : %-20s(%2d seats) bus %s\n", T.getName(), d.getTransaction(), d.getName(), d.getSeat(), BUSA.get(BUSA.size() - 1).getName());
                GROUP_A.add(new Group(BUSA.get(BUSA.size() - 1).getName(), d.getName(), d.getSeat()));
                BUSA.get(BUSA.size() - 1).Bookingseat(d.getSeat());
                d.finishedBooking(d.getSeat());

            } else {
                System.out.printf("%s >> Transaction %2d : %-20s(%2d seats) bus %s\n", T.getName(), d.getTransaction(), d.getName(), BUSA.get(BUSA.size() - 1).getAvailableSeat(), BUSA.get(BUSA.size() - 1).getName());
                GROUP_A.add(new Group(BUSA.get(BUSA.size() - 1).getName(), d.getName(), BUSA.get(BUSA.size() - 1).getAvailableSeat()));
                d.finishedBooking(BUSA.get(BUSA.size() - 1).getAvailableSeat());
                BUSA.get(BUSA.size() - 1).Bookingseat(BUSA.get(BUSA.size() - 1).getAvailableSeat());
            }

        } else {
            if (BUSC.size() == 0 || BUSC.get(BUSC.size() - 1).getAvailableSeat() == 0) {
                BUSC.add(new Bus("C" + BUSC.size()));
            }
            if (d.getSeat() <= BUSC.get(BUSC.size() - 1).getAvailableSeat()) {
                System.out.printf("%s >> Transaction %2d : %-20s(%2d seats) bus %s\n", T.getName(), d.getTransaction(), d.getName(), d.getSeat(), BUSC.get(BUSC.size() - 1).getName());
                GROUP_C.add(new Group(BUSC.get(BUSC.size() - 1).getName(), d.getName(), d.getSeat()));
                BUSC.get(BUSC.size() - 1).Bookingseat(d.getSeat());
                d.finishedBooking(d.getSeat());
            } else {
                System.out.printf("%s >> Transaction %2d : %-20s(%2d seats) bus %s\n", T.getName(), d.getTransaction(), d.getName(), BUSC.get(BUSC.size() - 1).getAvailableSeat(), BUSC.get(BUSC.size() - 1).getName());
                GROUP_C.add(new Group(BUSC.get(BUSC.size() - 1).getName(), d.getName(), BUSC.get(BUSC.size() - 1).getAvailableSeat()));
                d.finishedBooking(BUSC.get(BUSC.size() - 1).getAvailableSeat());
                BUSC.get(BUSC.size() - 1).Bookingseat(BUSC.get(BUSC.size() - 1).getAvailableSeat());
            }
        }
    }
}

}

class Group {

private String busname, name;
private int seat;

public Group(String n, String b, int s) {
    busname = n;
    name = b;
    seat = s;
}

public String getName()     { return name; }
public String getBusname()  { return busname; }
public int getSeat()        { return seat; }

}

class Bus {

private String Busname, group_name;
private int availableseat, seat;

public int getAvailableSeat()   { return availableseat; }
public String getName()         { return Busname; }
public void Bookingseat(int s)  { availableseat -= s; }
public String getGroupname()    { return group_name; }

public Bus(String n) {
    availableseat = BusLine.max_seat;
    Busname = n;
}

public Bus(String n, int s) {
    group_name = n;
    seat = s;
}

public Bus(String n, String gn, int s) {
    Busname = n;
    group_name = gn;
    availableseat = s;
}

}

class Data {

private String name, destination;
private int seat, transaction, count = 0;

public int getCount()               { return count; }
public int getTransaction()         { return transaction; }
public String getName()             { return name; }
public int getSeat()                { return seat; }
public String getDestination()      { return destination; }
public void finishedBooking(int s)  { seat -= s; }

public Data(int t, String n, int s, String d) {
    transaction = t;
    name = n;
    seat = s;
    destination = d;
}

}

class TicketCounter extends Thread {

ArrayList<Data> transaction;
BusLine Airport, City;
private CyclicBarrier cfinish;

public void setCyclicBarrier(CyclicBarrier f) {
    cfinish = f;
}

public TicketCounter(String n, ArrayList<Data> d, BusLine a, BusLine c) {
    super(n);
    transaction = d;
    Airport = a;
    City = c;
}

public void run() {
    for (int i = 0; i < transaction.size(); i++) {
        if (transaction.get(i).getTransaction() == BusLine.checkpoint) {
            try {
                cfinish.await();
                cfinish.await();
            } catch (Exception e) {}

        }
        if ("A".equals(transaction.get(i).getDestination())) {
            Airport.allocateBus(transaction.get(i));
        } else {
            City.allocateBus(transaction.get(i));
        }
    }
}

}

class Userinput {

private ArrayList<Data> DATA1 = new ArrayList<Data>();
private ArrayList<Data> DATA2 = new ArrayList<Data>();
private ArrayList<Data> DATA3 = new ArrayList<Data>();

public Userinput() {}

public ArrayList<Data> getDATA1() { return DATA1; }
public ArrayList<Data> getDATA2() { return DATA2; }
public ArrayList<Data> getDATA3() { return DATA3; }

public void input() {

    String infile[] = {"T1.txt", "T2.txt", "T3.txt"};
    for (int i = 0; i < 3; i++) {
        boolean opensuccess = false;
        while (!opensuccess) {
            try ( Scanner scanfile = new Scanner(new File(infile[i]));) {
                while (scanfile.hasNext()) {
                    opensuccess = true;
                    String line = scanfile.nextLine();
                    String[] buf = line.split(",");
                    int transaction = Integer.parseInt(buf[0].trim());
                    String name = buf[1].trim();
                    int seat = Integer.parseInt(buf[2].trim());
                    String destination = buf[3].trim();
                    Data d = new Data(transaction, name, seat, destination);
                    switch (i) {
                        case 0: DATA1.add(d);
                            break;
                        case 1: DATA2.add(d);
                            break;
                        case 2: DATA3.add(d);
                            break;
                    }
                }
            } catch (FileNotFoundException e) {
                System.out.println(e);
                Scanner scan = new Scanner(System.in);
                System.out.println("Enter new file name : ");
                infile[i] = scan.nextLine();
            }
        }
    }
}

}

public class Simulation { public static void main(String[] args) {

    Scanner scan = new Scanner(System.in);  
    Thread Th = Thread.currentThread();
    System.out.printf("%s >> Enter max seats = ", Th.getName());
    System.out.println();
    int max_seat = scan.nextInt();
    System.out.printf("%s >> Enter checkpoints = ", Th.getName());
    System.out.println();
    int checkpoint = scan.nextInt();
    Userinput U = new Userinput();
    BusLine Airport = new BusLine(max_seat, checkpoint, "A");
    BusLine City = new BusLine(max_seat, checkpoint, "C");
    
    CyclicBarrier CB = new CyclicBarrier(4);
    U.input();

    TicketCounter[] T = {
        new TicketCounter("T1", U.getDATA1(), Airport, City),
        new TicketCounter("T2", U.getDATA2(), Airport, City),
        new TicketCounter("T3", U.getDATA3(), Airport, City)};

    for (int i = 0; i < 3; i++) {
        T[i].setCyclicBarrier(CB);
        T[i].start();
    }
    
    try {
        CB.await();
        Airport.printAirportCheckpoint();
        City.printCityCheckpoint();
        CB.await();
    }catch (Exception e){}
    
    for (int i = 0; i < 3; i++) {
        try {
            T[i].join();
        } catch (Exception e) {
            System.err.println(e);
        }
    }
    Airport.BusBoundA();
    City.BusBoundC();
}

}