Google Cloud Data Fusion is not producing CSV output in GCS Bucket

2k views Asked by At

I have a pipeline that recursively reads many JSON files from a Google Cloud Storage (GCS) bucket, then parses each file into a record. Each record then goes through a "Python Transform" plugin for further processing (adding new fields and values), and finally it should be saved in a different GCS bucket (the sink).

All my attempts at playing with the parameters of the GCS sink, and adding a "Wrangler" transform before it, and/or adding a "CSV Formatter" transform before the "Wrangler" transform, have not helped produce a CSV file. The preview output is always correct, but when deployed, the output is not.

The file produced in my chosen path is always a file name I did not choose and it is always a file type "application/octet-stream".

The first attempt (full pipeline)

The second type of attempt

The third type of attempt

This is the GCS properties window, and nothing in it is different between the above attempts except for the schema.

This is the output, every time: Deployed pipeline output as octet-stream instead of CSV, and with file name I did not choose

How can I choose the file name, and what am I doing wrong with producing the output that it does not come out as a CSV in the GCS bucket?

3

There are 3 answers

0
Kadri On BEST ANSWER

At the time of writing this, after considering the comments and ideas proposed (@narendra, @Edwin, @Rally) I experimented with the different plugins and this is how I settled on a solution:

I used the Spark Sink plugin, and the FileDelete plugin, which can be placed after a sink.

enter image description here

The code for the Spark Sink is simple:

def sink(df: DataFrame, context: SparkExecutionPluginContext) : Unit = {
  val fillerVar = "${fillerVar}"
  val fullpath = "gs://somebucket/output/leader_board/"
  df.coalesce(1)
    .write.format("csv")
    .option("header", "true")
    .option("encoding", "UTF-8")
    .mode("append")
    .save(fullpath)

}

The output includes not only a CSV file, but also an empty "_SUCCESS" file. This is deleted using the FileDelete plugin:

enter image description here

I recognized that I (right now) cannot find a simple way to change the output file name (whether one file or multiple files merged) through the plugins. And since I don't know Scala/Java well enough, I couldn't figure it out that way either.

For my purposes, I'm using Google Data Fusion to produce output to use with Google Data Studio. Data Studio can take as a data source not just individual files, but you can point it to a GCS bucket path and it will read all the files therein. Therefore it doesn't bother me anymore that I can't control the filename ("part-00000-[random]).

1
Rally H On

I replicated this as well and I also cannot choose the name and the type of the file I want. Since there is no content type option to choose from the sink, the file is outputted as a default file which is part-r-00000 with a file type of application/octet-stream.

If the Content-Type is not specified by the uploader and cannot be determined, it is set to application/octet-stream. here

I have created a feature request for this which you can also track the progress.

https://issuetracker.google.com/171366470

I agree with @narendra's suggested workaround to add the filenames via Spark Scala code.

1
Edwin Elia On

Currently the GCS sink plugin do not support adding a name to the files written, since the files being written to the sink can be split into multiple parts. We can add a feature request to have a GCS action that you can run after the sink to concat the files into one and have specify names there.