Given InputStream replace character and produce OutputStream

15.7k views Asked by At

I have a lot of massive files I need convert to CSV by replacing certain characters.

I am looking for reliable approach given InputStream return OutputStream and replace all characters c1 to c2.

Trick here is to read and write in parallel, I can't fit whole file in memory.

Do I need to run it in separate thread if I want read and write at the same time?

Thanks a lot for your advices.

3

There are 3 answers

3
CodeMonkey On

To copy data from an input stream to an output stream you write data while you're reading it either a byte (or character) or a line at a time.

Here is an example that reads in a file converting all 'x' characters to 'y'.

BufferedInputStream in = new BufferedInputStream(new FileInputStream("input.dat"));
BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream("output.dat"));
int ch;
while((ch = in.read()) != -1) {
        if (ch == 'x') ch = 'y';
        out.write(ch);
}
out.close();
in.close();

Or if can use a Reader and process a line at a time then can use this aproach:

BufferedReader reader = new BufferedReader(new FileReader("input.dat"));
PrintWriter writer = new PrintWriter(
      new BufferedOutputStream(new FileOutputStream("output.dat")));
String str;
while ((str = reader.readLine()) != null) {
    str = str.replace('x', 'y');     // replace character at a time
    str = str.replace("abc", "ABC"); // replace string sequence
    writer.println(str);
}
writer.close();
reader.close();

BufferedInputStream and BufferedReader read ahead and keep 8K of characters in a buffer for performance. Very large files can be processed while only keeping 8K of characters in memory at a time.

0
Tchopane On
FileWriter writer = new FileWriter("Report.csv");
BufferedReader reader = new BufferedReader(new InputStreamReader(YOURSOURCE, Charsets.UTF_8));
String line;
while ((line = reader.readLine()) != null) {
    line.replace('c1', 'c2');
    writer.append(line);
    writer.append('\n');
}
writer.flush();
writer.close();
0
Simon Liang On

You can find related answer here: Filter (search and replace) array of bytes in an InputStream

I took @aioobe's answer in that thread, and built the replacing input stream module in Java, which you can find it in my GitHub gist: https://gist.github.com/lhr0909/e6ac2d6dd6752871eb57c4b083799947

Putting the source code here as well:

import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;

/**
 * Created by simon on 8/29/17.
 */
public class ReplacingInputStream extends FilterInputStream {

    private Queue<Integer> inQueue, outQueue;
    private final byte[] search, replacement;

    public ReplacingInputStream(InputStream in, String search, String replacement) {
        super(in);

        this.inQueue = new LinkedList<>();
        this.outQueue = new LinkedList<>();

        this.search = search.getBytes();
        this.replacement = replacement.getBytes();
    }

    private boolean isMatchFound() {
        Iterator<Integer> iterator = inQueue.iterator();

        for (byte b : search) {
            if (!iterator.hasNext() || b != iterator.next()) {
                return false;
            }
        }

        return true;
    }

    private void readAhead() throws IOException {
        // Work up some look-ahead.
        while (inQueue.size() < search.length) {
            int next = super.read();
            inQueue.offer(next);

            if (next == -1) {
                break;
            }
        }
    }

    @Override
    public int read() throws IOException {
        // Next byte already determined.

        while (outQueue.isEmpty()) {
            readAhead();

            if (isMatchFound()) {
                for (byte a : search) {
                    inQueue.remove();
                }

                for (byte b : replacement) {
                    outQueue.offer((int) b);
                }
            } else {
                outQueue.add(inQueue.remove());
            }
        }

        return outQueue.remove();
    }

    @Override
    public int read(byte b[]) throws IOException {
        return read(b, 0, b.length);
    }

    // copied straight from InputStream inplementation, just needed to to use `read()` from this class
    @Override
    public int read(byte b[], int off, int len) throws IOException {
        if (b == null) {
            throw new NullPointerException();
        } else if (off < 0 || len < 0 || len > b.length - off) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return 0;
        }

        int c = read();
        if (c == -1) {
            return -1;
        }
        b[off] = (byte)c;

        int i = 1;
        try {
            for (; i < len ; i++) {
                c = read();
                if (c == -1) {
                    break;
                }
                b[off + i] = (byte)c;
            }
        } catch (IOException ee) {
        }
        return i;
    }
}