Trying to solve consumer-producer in java with multithreading

799 views Asked by At

I'm trying to solve the producer consumer problem with threads in java, but the code won't run in parallell/concurrently. The producer always fills up the buffer completely before the consumer starts to consume, and I don't get why. The point is trying to do it using only synchronized blocks, wait() and notify().

Main :

    String [] data = {"Fisk", "Katt", "Hund", "Sau", "Fugl", "Elg", "Tiger", 
               "Kameleon", "Isbjørn", "Puma"};
    ProducerConsumer pc = new ProducerConsumer(5);
    Thread[] thrds = new Thread[2];
    thrds[0] = new Thread(new MyThread1(pc, data)); // producer
    thrds[1] = new Thread(new MyThread2(pc)); // consumer
    thrds[0].start();
    thrds[1].start();
    for(int i = 0; i < 2; i++) { // wait for all threads to die
        try { 
            thrds[i].join(); 
        } 
        catch (InterruptedException ie) {}
    }
    System.exit(0);

ProducerConsumer.java:

import java.util.LinkedList;
import java.util.Queue;

public class ProducerConsumer implements Runnable {
    private int bufferSize;
    private Queue<String> buffer;

public ProducerConsumer(int size) {

    bufferSize = size;
    buffer = new LinkedList<String>();
}

public void produce(String item) throws InterruptedException {
        synchronized(buffer) {
            while (buffer.size() >= bufferSize) {
                try {
                    System.out.println("Full buffer. Waiting for consumer...");
                    buffer.wait();
                }catch (Exception e) {}
            }
            buffer.add(item);
            System.out.println("Producer is putting " + item + " in the buffer");
            buffer.notify();
        }   
}

public void consume() throws InterruptedException {
    synchronized (buffer) {
        while (buffer.size() == 0) {
            try {
                System.out.println("Empty buffer. Waiting for production...");
                buffer.wait();
            }catch (Exception e) {}
        }
        System.out.println("Consumer is consuming " +  buffer.remove() + ".");
        buffer.notify();
    }
}

@Override
public void run() {
}

}

MyThread1 :

/*
 * PRODUCER - Thread
 */
public class MyThread1 implements Runnable {

private String [] data;
private ProducerConsumer pc;

public MyThread1(ProducerConsumer pc, String [] data) {
    this.pc = pc;
    this.data = data;
}
@Override
public void run() {
    for (int i = 0; i < data.length; i++) {
        try {
            pc.produce(data[i]);
        } catch (InterruptedException ex) {}
    }
}

}

MyThread2:

//THE CONSUMER - Thread

public class MyThread2 implements Runnable{

private ProducerConsumer pc;

public MyThread2(ProducerConsumer pc) {
    this.pc = pc;
}

//Run consume
@Override
public void run() {
    while (true) {
        try {
            pc.consume();
            Thread.sleep(2);
        }
        catch(InterruptedException e) {}

    }

}
}
2

There are 2 answers

0
Ralf H On

On recent machines, with short queues like this, you will never see actual multithreading effects like, in this case, producer and consumer taking turns unless you slow both of them down a bit. You only slowed down the consumer. Instead of using a short array, put a million Integers in a queue and see what happens.

4
Viswanath Redddy On
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class ProduserConsumerDemo {

    public static void main(String[] args) {
        List<Integer> list = new CopyOnWriteArrayList<>();
        int size = 5;
        Producer producer = new Producer(list, size);
        Consumer consumer = new Consumer(list);
        Thread t1 = new Thread(producer, "Producer");
        Thread t2 = new Thread(consumer, "Consumer");
        t1.start();
        t2.start();
    }
}

class Producer implements Runnable {
    private final List<Integer> list;
    private final int size;

    public Producer(List<Integer> list, final int size) {
        this.list = list;
        this.size = size;
    }

    public void run() {
        try {
            produce();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void produce() throws InterruptedException {
        int i = 0;
        while (i >= 0) {
            synchronized (list) {
                while (list.size() == size) {
                    System.out.println(
                            "List is full." + Thread.currentThread().getName() + " is waiting. Size:" + list.size());
                    list.wait();
                }
                System.out.println("Produce :" + i);
                list.add(i++);
                Thread.sleep(50);
                list.notify();
            }
        }
    }
}

class Consumer implements Runnable {
    private final List<Integer> list;

    public Consumer(List<Integer> list) {
        this.list = list;
    }

    public void run() {
        try {
            consume();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void consume() throws InterruptedException {
        while (true) {
            synchronized (list) {
                while (list.isEmpty()) {
                    System.out.println(
                            "List is empty. " + Thread.currentThread().getName() + " is waiting. Size:" + list.size());
                    list.wait();
                }
                System.out.println("Consumed item:" + list.remove(0));
                Thread.sleep(50);
                list.notify();
            }
        }
    }
}