memory leak when use VectorSchemaSlice slice

21 views Asked by At

In my Flink process function, I receive serialized VectorSchemaRoot data which needs to be deserialized for further processing. As I need to operate on it row by row, I utilized the slice function. However, this approach can lead to an increase in direct memory usage which in turn reduces the heap memory space in Flink. Ultimately, this can result in an Out Of Memory (OOM) exception as the heap memory space becomes insufficient. However, if I serialize the sliced VectorSchemaRoot and pass it as a parameter to the downstream function, which then deserializes it again, there will be no more OOM issues.

  public void processElement(I value, ProcessFunction<I, Object>.Context context, Collector<Object> collector) throws Exception {
        if (this.writerHelper == null) {
            initWriterHelper();
        }
       reader = new ArrowStreamReader(new ByteArrayInputStream((byte[]) value), ArrowUtil.rootAllocator);
        try {
            while (reader.loadNextBatch()) {
                VectorSchemaRoot vsr = reader.getVectorSchemaRoot();
                int rowCount = vsr.getRowCount();
                for (int i = 0; i < rowCount; i++) {
                    //split to row
                    VectorSchemaRoot row = vsr.slice(i, 1);
                    // this.writerHelper.write(row); This approach can result in a memory leak, whereas the following method will not.

                    ByteArrayOutputStream out = new ByteArrayOutputStream();
                    ArrowStreamWriter writer =
                            new ArrowStreamWriter(row, null, Channels.newChannel(out));
                    writer.start();
                    writer.writeBatch();
                    this.writerHelper.write(out.toByteArray());
                    row.clear();
                    row.close();
                }
                vsr.clear();
                vsr.close();
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            reader.close();
        }
    }

I hope to use slice to split VectorSchemaRoot into row units and avoid memory leaks. I already tried to use try-with-resource to ensure VectorSchemaRoot will be closed, and the rootAllocator is a global static object.

0

There are 0 answers