Transform a large jsonl file with unknown json properties into csv using apache beam google dataflow and java

663 views Asked by At

How to convert a large jsonl file with unknown json properties into csv using Apache Beam, google dataflow and java

Here is my scenario:

  1. A large jsonl file is in google storage
  2. Json properties are unknown, so using Apache Beam's Schema can not be defined in Beam's pipeline.
  3. Use Apache beam, google dataflow and java to convert jsonl to csv
  4. Once transformation is done, store csv in google storage (same bucket where jsonl is stored)
  5. Notify by some means, like transformation_done=true if possible (rest api or event)

Any help or guidance would be helpful, as I am new to Apache beam, though I am reading the doc from Apache Beam.

I have edited the question with an example JSONL data

{"Name":"Gilbert", "Session":"2013", "Score":"24", "Completed":"true"}
{"Name":"Alexa", "Session":"2013", "Score":"29", "Completed":"true"}
{"Name":"May", "Session":"2012B", "Score":"14", "Completed":"false"}
{"Name":"Deloise", "Session":"2012A", "Score":"19", "Completed":"true"} 

While json key's are there in an input file but it's not known while transforming. I'll explain that by an example, suppose I have three clients and each got it's own google storage, so each upload their own jsonl file with different json properties.

Client 1: Input Jsonl File

{"city":"Mumbai", "pincode":"2012A"} 
{"city":"Delhi", "pincode":"2012N"} 

Client 2: Input Jsonl File

{"Relation":"Finance", "Code":"2012A"} 
{"Relation":"Production", "Code":"20XXX"} 

Client 3: Input Jsonl File

{"Name":"Gilbert", "Session":"2013", "Score":"24", "Completed":"true"}
{"Name":"Alexa", "Session":"2013", "Score":"29", "Completed":"true"}

Question: How could I write A Generic beam pipeline which transforms all three as shown below

Client 1: Output CSV file

["city", "pincode"] 
["Mumbai","2012A"] 
["Delhi", "2012N"] 

Client 2: Output CSV file

["Relation", "Code"] 
["Finance", "2012A"] 
["Production","20XXX"] 

Client 3: Output CSV file

["Name", "Session", "Score", "true"]
["Gilbert", "2013", "24", "true"]
["Alexa", "2013", "29", "true"]
1

There are 1 answers

9
Sayan Bhattacharya On

Edit: Removed the previous ans as questions have been modified with examples.

There is no generic way provided by anyone to achieve such result. You have to write the logic yourself depending on your requirements and how you are handling the pipeline.

Below there are some examples but you need to verify these for your case as I have only tried these on a small JSONL file.

TextIO


Approach 1
If you can collect the header value of the output csv then it will be much easier. But getting the header beforehand itself another challenge.

//pipeline
pipeline.apply("ReadJSONLines",
                TextIO.read().from("FILE URL"))
                .apply(ParDo.of(new DoFn<String, String>() {
                    @ProcessElement
                    public void processLines(@Element String line, OutputReceiver<String> receiver) {                        
                        String values = getCsvLine(line, false);
                        receiver.output(values);

                    }
                }))
            .apply("WriteCSV",
                    TextIO.write().to("FileName")
                            .withSuffix(".csv")
                            .withoutSharding()
                            .withDelimiter(new char[] { '\r', '\n' })
                            .withHeader(getHeader()));
 private static String getHeader() {
        String header = "";
        //your logic to get the header line.
        return header;
    }

probable ways to get the header line(Only assumptions may not work in your case) :

  • You can have a text file in GCS which will store the header of a particular JSON File. And in your logic you can fetch the header by reading the file , check this SO thread about how to read files from GCS
  • You can try to pass the header as a runtime argument but that depends how you are configuring and executing your pipeline.

Approach 2
This is a workaround I found for small JsonFiles(~10k lines). This below example may not work for large files.

final int[] count = { 0 };
pipeline.apply(//read file)
                .apply(ParDo.of(new DoFn<String, String>() {
                    @ProcessElement
                    public void processLines(@Element String line, OutputReceiver<String> receiver) {

                        // check if its the first processing element. If yes then create the header
                        if (count[0] == 0) {
                            String header = getCsvLine(line, true);
                            receiver.output(header);
                            count[0]++;
                        }
                        String values = getCsvLine(line, false);
                        receiver.output(values);

                    }
                }))
            .apply(//write file)

FileIO


As mentioned by Saransh in comments by using FileIO all you have to do is read the JSONL line by line manually and then convert those into comma separated format.EG:

pipeline.apply(FileIO.match().filepattern("FILE PATH"))
        .apply(FileIO.readMatches())
        .apply(FlatMapElements
                .into(TypeDescriptors.strings())
                .via((FileIO.ReadableFile f) -> {

                    List<String> output = new ArrayList<>();
                    try (BufferedReader br = new BufferedReader(Channels.newReader(f.open(), "UTF-8"))) {
                        String line = br.readLine();
                        while (line != null) {
                            
                            if (output.size() == 0) {
                                String header = getCsvLine(line, true);
                                output.add(header);
                            }
                            String result = getCsvLine(line, false);
                            output.add(result);
                            line = br.readLine();
                        }
                    } catch (IOException e) {
                        throw new RuntimeException("Error while reading", e);
                    }
                    return output;
                }))
            .apply(//write to gcs)

In the above examples I have used a getCsvLine method(created for code usability) which takes a single line from the file and converts it into a comma separated format.To parse the JSON object I have used GSON.

/**
 * @param line     take each JSONL line
 * @param isHeader true : Returns output combining the JSON keys || false:
 *                 Returns output combining the JSON values
 **/
public static String getCsvLine(String line, boolean isHeader) {
    List<String> values = new ArrayList<>();
    // convert the line into jsonobject
    JsonObject jsonObject = JsonParser.parseString(line).getAsJsonObject();
    // iterate json object and collect all values
    for (Map.Entry<String, JsonElement> entry : jsonObject.entrySet()) {
        if (isHeader)
            values.add(entry.getKey());
        else
            values.add(entry.getValue().getAsString());
    }
    String result = String.join(",", values);
    return result;
}