flink sink to parquet file with AvroParquetWriter is not writing data to file

3.3k views Asked by At

I am trying to write a parquet file as sink using AvroParquetWriter. The file is created but with 0 length (no data is written). am I doing something wrong ? couldn't figure out what is the problem

import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ParquetFileWriter, ParquetWriter}
import org.apache.parquet.hadoop.metadata.CompressionCodecName

import scala.io.Source
import org.apache.flink.streaming.api.scala._

object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
  def now = System.currentTimeMillis()
  val path = new Path(s"/tmp/test-$now.parquet")
  val schemaString = Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
  val schema: Schema = new Schema.Parser().parse(schemaString)
  val compressionCodecName = CompressionCodecName.SNAPPY
  val config = ParquetWriterConfig()
  val genericReocrd: GenericRecord = new GenericData.Record(schema)
  genericReocrd.put("name", "test_b")
  genericReocrd.put("code", "NoError")
  genericReocrd.put("ts", 100L)
  val stream = env.fromElements(genericReocrd)
  val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
    .withSchema(schema)
    .withCompressionCodec(compressionCodecName)
    .withPageSize(config.pageSize)
    .withRowGroupSize(config.blockSize)
    .withDictionaryEncoding(config.enableDictionary)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .withValidation(config.validating)
    .build()

  writer.write(genericReocrd)
  stream.addSink{r =>
    writer.write(r)
  }
  env.execute()
1

There are 1 answers

3
Till Rohrmann On

The problem is that you don't close the ParquetWriter. This is necessary to flush pending elements to disk. You could solve the problem by defining your own RichSinkFunction where you close the ParquetWriter in the close method:

class ParquetWriterSink(val path: String, val schema: String, val compressionCodecName: CompressionCodecName, val config: ParquetWriterConfig) extends RichSinkFunction[GenericRecord] {
  var parquetWriter: ParquetWriter[GenericRecord] = null

  override def open(parameters: Configuration): Unit = {
    parquetWriter = AvroParquetWriter.builder[GenericRecord](new Path(path))
      .withSchema(new Schema.Parser().parse(schema))
      .withCompressionCodec(compressionCodecName)
      .withPageSize(config.pageSize)
      .withRowGroupSize(config.blockSize)
      .withDictionaryEncoding(config.enableDictionary)
      .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
      .withValidation(config.validating)
      .build()
  }

  override def close(): Unit = {
    parquetWriter.close()
  }

  override def invoke(value: GenericRecord, context: SinkFunction.Context[_]): Unit = {
    parquetWriter.write(value)
  }
}