Reading CSV header with Dataflow

7.4k views Asked by At

I have a CSV file, and I don't know the column names ahead of time. I need to output the data in JSON after some transformations in Google Dataflow.

What's the best way to take the header row and permeate the labels through all the rows?

For example:

a,b,c
1,2,3
4,5,6

...becomes (approximately):

{a:1, b:2, c:3}
{a:4, b:5, c:6}
3

There are 3 answers

2
robosoul On

You should implement custom FileBasedSource (similar to TextIO.TextSource), that will read the first line and store header data

    @Override
    protected void startReading(final ReadableByteChannel channel)
    throws IOException {
        lineReader = new LineReader(channel);

        if (lineReader.readNextLine()) {
            final String headerLine = lineReader.getCurrent().trim();
            header = headerLine.split(",");
            readingStarted = true;
        }
    }

and latter, while reading other lines prepend it to current line data:

    @Override
    protected boolean readNextRecord() throws IOException {
        if (!lineReader.readNextLine()) {
            return false;
        }

        final String line = lineReader.getCurrent();
        final String[] data = line.split(",");

        // assumes all lines are valid
        final StringBuilder record = new StringBuilder();
        for (int i = 0; i < header.length; i++) {
            record.append(header[i]).append(":").append(data[i]).append(", ");
        }

        currentRecord = record.toString();
        return true;
    }

I've implemented a quick (complete) solution, available on github. I also added a dataflow unit test to demonstrate reading:

@Test
public void test_reading() throws Exception {
    final File file =
            new File(getClass().getResource("/sample.csv").toURI());
    assertThat(file.exists()).isTrue();

    final Pipeline pipeline = TestPipeline.create();

    final PCollection<String> output =
            pipeline.apply(Read.from(CsvWithHeaderFileSource.from(file.getAbsolutePath())));

    DataflowAssert
            .that(output)
            .containsInAnyOrder("a:1, b:2, c:3, ", "a:4, b:5, c:6, ");

    pipeline.run();
}

where sample.csv has following content:

a,b,c
1,2,3
4,5,6
0
Simon Kitching On

I have created a solution based on Luka's source code (see previous answer). Luka's code in github is for dataflow-1.x, and implements a FileBasedSource which extracts the first line and caches it, then prepends it to every following line. This requires the entire file to be processed on a single node (not splittable).

My variant of the FileBasedSource instead just returns the first line of a file; as described in the class javadoc this line can then be split (as desired) and used as a side-input to the logic which processes the complete file (which can then be done in parallel). The code is compatible with Beam 2.x (tested on Beam 2.4.0).

See http://moi.vonos.net/cloud/beam-read-header/

0
balamuruganv On

I am using Luka's reader, it is reading entire csv file before starting other chained pipeline. is it possible to define chunk size like read 10 line process the write, then read next 10 lines

 PCollection<String> input = pipeline.apply(Read.from(CustomCsvReader.from(options.getInput())));
PCollection<Map<String,String>> mapOutput = input.apply(MapElements.via(new SimpleFunction<String, Map<String,String>>() {
        @Override
        public Map<String,String> apply(String input) {
          String[] entrys = input.split(",");
          return Stream.of(entrys).map(t -> t.split(":",2)).collect(Collectors.toMap(a -> a[0], a -> a.length > 1 ? a[1]: ""));

        }
    }));
PCollection<String> output = mapOutput.apply(ParDo.of(new CSVToXMLConverter()));
 output.apply(TextIO.write().to(options.getOutput()).withFooter(Constants.CCR_FOOTER));
pipeline.run().waitUntilFinish();