Not able to figure out why the following code is getting deadlocked

103 views Asked by At

The following Java code reads a large file input.txt and breaks it into chunks and then concurrently reads it. Finally it dumps the contents to output.txt. This is an academic exercise and not on a project. So there would be better approaches. But in the following code, somehow I am not able to figure out why it's getting deadlocked.

Any idea?

package org.sid.misc;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.*;


class FileChunk {
    private final int order;
    private final byte[] data;

    public FileChunk(int order, byte[] data) {
        this.order = order;
        this.data = data;
    }

    public int getOrder() {
        return order;
    }

    public byte[] getData() {
        return data;
    }
}


class FileReaderTask implements Runnable {
    private final String inputFilePath;
    private final BlockingQueue<FileChunk> queue;
    private final int chunkSize;
    private final int order;

    public FileReaderTask(String inputFilePath, BlockingQueue<FileChunk> queue, int chunkSize, int order) {
        this.inputFilePath = inputFilePath;
        this.queue = queue;
        this.chunkSize = chunkSize;
        this.order = order;
    }

    @Override
    public void run() {
        try (FileInputStream fis = new FileInputStream(inputFilePath);
             FileChannel channel = fis.getChannel()) {
            long position = order * chunkSize;
            ByteBuffer buffer = ByteBuffer.allocate(chunkSize);
            if (channel.read(buffer, position) >  0) {
                buffer.flip();
                byte[] data = new byte[buffer.remaining()];
                buffer.get(data);
                queue.put(new FileChunk(order, data));
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
            System.err.println("Error reading file chunk: " + e.getMessage());
        }
    }
}


class FileWriterTask implements Runnable {
    private final String outputFilePath;
    private final BlockingQueue<FileChunk> queue;

    public FileWriterTask(String outputFilePath, BlockingQueue<FileChunk> queue) {
        this.outputFilePath = outputFilePath;
        this.queue = queue;
    }

    @Override
    public void run() {
        try (FileOutputStream fos = new FileOutputStream(outputFilePath, true)) {
            while (true) {
                FileChunk chunk = queue.take();
                if (chunk.getData().length ==  0) {
                    break; // Signal to stop writing
                }
                fos.write(chunk.getData());
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
            System.err.println("Error writing file chunk: " + e.getMessage());
        }
    }
}

class Test {
    private static final int CHUNK_SIZE =  1024 *  1024; //  1MB chunks
    private static final int NUM_THREADS =  Runtime.getRuntime().availableProcessors();

    public static void main(String[] args) throws InterruptedException {
        String inputFilePath = "input.txt";
        String outputFilePath = "output.txt";

        ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
        BlockingQueue<FileChunk> queue = new LinkedBlockingQueue<>();

        long startTime = System.currentTimeMillis();

        // Start the writer task
        executor.submit(new FileWriterTask(outputFilePath, queue));

        // Start the reader tasks
        for (int i =  0; i < NUM_THREADS; i++) {
            executor.submit(new FileReaderTask(inputFilePath, queue, CHUNK_SIZE, i));
        }

        // Wait for all tasks to complete
        executor.shutdown();
        executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);

        long endTime = System.currentTimeMillis();
        System.out.println("Execution time: " + (endTime - startTime) + " ms");
    }
}
2

There are 2 answers

4
Sagar On

You are using blocking queue implementation and as mentioned in docs the ‘take()’ will get blocked until an element is available.

take() Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.

The thread which is waiting for the element continues to wait indefinitely and hence executor won’t be able to shutdown.

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

class FileChunk {
    private final int order;
    private final byte[] data;

    public FileChunk(int order, byte[] data) {
        this.order = order;
        this.data = data;
    }

    public int getOrder() {
        return order;
    }

    public byte[] getData() {
        return data;
    }
}


class FileReaderTask implements Runnable {
    private final String inputFilePath;
    private final BlockingQueue<FileChunk> queue;
    private final int chunkSize;
    private final int order;
    
    private CountDownLatch doneReadingLatch;

    public FileReaderTask(String inputFilePath, BlockingQueue<FileChunk> queue, int chunkSize, int order,CountDownLatch doneReadingLatch) {
        this.inputFilePath = inputFilePath;
        this.queue = queue;
        this.chunkSize = chunkSize;
        this.order = order;
        this.doneReadingLatch = doneReadingLatch;
    }

    

    @Override
    public void run() {
        try (FileInputStream fis = new FileInputStream(inputFilePath);
             FileChannel channel = fis.getChannel()) {
            long position = order * chunkSize;
            ByteBuffer buffer = ByteBuffer.allocate(chunkSize);
            if (channel.read(buffer, position) >  0) {
                buffer.flip();
                byte[] data = new byte[buffer.remaining()];
                buffer.get(data);
                queue.put(new FileChunk(order, data));
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
            System.err.println("Error reading file chunk: " + e.getMessage());
        }
        this.doneReadingLatch.countDown();
    }
}


class FileWriterTask implements Runnable {
    private final String outputFilePath;
    private final BlockingQueue<FileChunk> queue;
    private CountDownLatch doneReadingLatch;

    public FileWriterTask(String outputFilePath, BlockingQueue<FileChunk> queue, CountDownLatch doneReadingLatch) {
        this.outputFilePath = outputFilePath;
        this.queue = queue;
        this.doneReadingLatch = doneReadingLatch;
    }

   

    @Override
    public void run() {
        try (FileOutputStream fos = new FileOutputStream(outputFilePath, true)) {
            while (true) {
                //since only one writer task is present
                FileChunk chunk = null;
                if (doneReadingLatch.getCount()>0) {
                    chunk = queue.take();
                }else {
                    break;
                }
                
                fos.write(chunk.getData());
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
            System.err.println("Error writing file chunk: " + e.getMessage());
        }
    }
}

class Scratch {
    private static final int CHUNK_SIZE =  1024 *  1024; //  1MB chunks
    private static final int NUM_THREADS =  Runtime.getRuntime().availableProcessors();

    public static void main(String[] args) throws InterruptedException {
        String inputFilePath = "input.txt";
        String outputFilePath = "output.txt";

        ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
        BlockingQueue<FileChunk> queue = new LinkedBlockingQueue<>();

        long startTime = System.currentTimeMillis();

        CountDownLatch doneReadingLatch = new CountDownLatch(NUM_THREADS);
        // Start the writer task
        executor.submit(new FileWriterTask(outputFilePath, queue,doneReadingLatch));
        // Start the reader tasks
        for (int i =  0; i < NUM_THREADS; i++) {
            executor.submit(new FileReaderTask(inputFilePath, queue, CHUNK_SIZE, i,doneReadingLatch));
        }

        // Wait for all tasks to complete
        executor.shutdown();
        executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);

        long endTime = System.currentTimeMillis();
        System.out.println("Execution time: " + (endTime - startTime) + " ms");
    }
}
0
DuncG On

Your writer code never stops as you don't send it any message or indicator to tell it when to end. As in @Sagar answer, the issue is caused by queue.take() which is never going to process another item from the queue.

There are many ways to arrange this, but as this is only an exercise, you can make a simple change to the producer-consumers to use a special queue item to mark the end of processing.

class FileChunk {
    public static final FileChunk EOF = new FileChunk(-1, null);
    ...
}

Adjust FileWriterTask.run() so that it exits when EOF is send down the queue.

FileChunk chunk = queue.take();
// Replaces: if (chunk.getData().length ==  0) {
if (chunk == FileChunk.EOF) {
    break; // Signal to stop writing
}

Now you just need to end FileWriterTask by sending EOF. The logic is simpler if you run FileWriterTask in different executor so that you can use reader executor shutdown to determine that they are ended before ending the writer:

// Start the writer task by itself
ExecutorService executorW = Executors.newFixedThreadPool(1);
executorW.submit(new FileWriterTask(outputFilePath, queue));

// Start the reader tasks
for (int i =  0; i < NUM_THREADS; i++) {
    executor.submit(new FileReaderTask(inputFilePath, queue, CHUNK_SIZE, i));
}

// Wait for all reader tasks to complete
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);

// There are no readers, queue.take() will block so put EOF to end the writer
queue.put(FileChunk.EOF);

// Wait for all writer task
executorW.shutdown();
executorW.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);

Note: it is better to use try...finally for each ExecutorService in later JDKs, each of those submit calls returns Future<?> which you could also use to check the outcome of the tasks.