I have an issue when i'm trying to set an AvroParquetWriter in RollingSink, sink path and writer path seems to be in conflict
- flink version : 1.1.3
- parquet-avro version : 1.8.1
error :
[...]
12/14/2016 11:19:34 Source: Custom Source -> Sink: Unnamed(8/8) switched to CANCELED
INFO JobManager - Status of job af0880ede809e0d699eb69eb385ca204 (Flink Streaming Job) changed to FAILED.
java.lang.RuntimeException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
at org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:161)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already exists: /home/user/data/file
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:264)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:257)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:386)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:447)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:426)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:223)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:266)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:217)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:183)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:153)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:119)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:92)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:66)
at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:54)
at fr.test.SpecificParquetWriter.open(SpecificParquetWriter.java:28) // line in code => writer = new AvroParquetWriter(new Path("/home/user/data/file"), schema, compressionCodecName, blockSize, pageSize);
at org.apache.flink.streaming.connectors.fs.RollingSink.openNewPartFile(RollingSink.java:451)
at org.apache.flink.streaming.connectors.fs.RollingSink.invoke(RollingSink.java:371)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
... 7 more
INFO JobClientActor - 12/14/2016 11:19:34 Job execution switched to status FAILED.
12/14/2016 11:19:34 Job execution switched to status FAILED.
INFO JobClientActor - Terminate JobClientActor.
[...]
main :
RollingSink sink = new RollingSink<String>("/home/user/data");
sink.setBucketer(new DateTimeBucketer("yyyy/MM/dd"));
sink.setWriter(new SpecificParquetWriter());
stream.addSink(sink);
SpecificParquetWriter :
public class SpecificParquetWriter<V> extends StreamWriterBase<V> {
private transient AvroParquetWriter writer;
private CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
private int blockSize = ParquetWriter.DEFAULT_BLOCK_SIZE;
private int pageSize = ParquetWriter.DEFAULT_PAGE_SIZE;
public static final String USER_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"myrecord\","
+ "\"fields\":["
+ " { \"name\":\"str1\", \"type\":\"string\" },"
+ " { \"name\":\"str2\", \"type\":\"string\" },"
+ " { \"name\":\"int1\", \"type\":\"int\" }"
+ "]}";
public SpecificParquetWriter(){
}
@Override
// workaround
public void open(FileSystem fs, Path path) throws IOException {
super.open(fs, path);
Schema schema = new Schema.Parser().parse(USER_SCHEMA);
writer = new AvroParquetWriter(new Path("/home/user/data/file"), schema, compressionCodecName, blockSize, pageSize);
}
@Override
public void write(Object element) throws IOException {
if(writer != null)
writer.write(element);
}
@Override
public Writer duplicate() {
return new SpecificParquetWriter();
}
}
I don't know if i'm doing it on the right way...
Is there a simple way to do this ?
This is problem with the base class that is Writer in case of RollingSink or StreamBaseWriter in case of Bucketing Sink as they only accept the Writers which can process OutputStream rather than saving them own their own.
writer= new AvroKeyValueWriter<K, V>(keySchema, valueSchema, compressionCodec, streamObject);
whereas AvroParquetWriter or ParquetWriter Accepts filePath
writer = AvroParquetWriter.<V>builder(new Path("filePath")) .withCompressionCodec(CompressionCodecName.SNAPPY) .withSchema(schema).build();
I went in deep to understand the ParquetWriter and realized that the stuff we are trying to do , does not make sense as Flink Being an event processing system like storm can't write a single record to a parquet whereas spark streaming can because it works on MicroBatch Principle.
Using Storm with Trident we can still write parquet files, but with FLink we can't until flink introduces something like MicroBatches.
So, for this type of usecase, Spark Streaming is a better choice.
Or go for batch processing if want to use Flink.