C# - Using protobuf to write directly into a zip file

762 views Asked by At

I am working on a simulator program that writes small data (<1kB) frequently (> 10 times a second) in a file. I used a Queue to store these data and when this queue had 300 members or more, it'll write data to a file stored in a zip file and clear the queue.

The problem is after pausing or stopping simulation (in both cases flush will be called) when i extract the zip file and check output files (more specificly: Offsets.sdo) only data from the last step of writing (in my code, calling flush) is inside the file. Also during the simulation Zipfile that i create (with outFile field) has size of 0.

My file writer code is :

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;
using System.IO.Packaging;
using System.Net.Mime;
using ProtoBuf;

namespace SEWS.History
{
    class HistoryWriterEngine : HistoryEngine ,HistoryWriter
    {
        public static readonly string OUTPUT_DATA_FILE_NAME = "SimulationOutdata.sod";
        public static readonly string OFFSETS_FILE_NAME = "Offsets.sdo";

        #region buffer
        private static readonly int MAXIMUM_BUFFER_SIZE = 300; // half a minute of simulation with 0.1s steps
        private Queue<SEWS.SimulationEngine.SimulationOutputData> buffer = new Queue<SEWS.SimulationEngine.SimulationOutputData>();
        private Dictionary<string, PackagePart> packageParts = new Dictionary<string, PackagePart>();
        #endregion

        #region IO
        private ZipPackage outFile;
        public override void initFile(string address)
        {
            if (outFile != null)
            {
                flush();
                closeFile();
            }
            workingFile = address;

            outFile = (ZipPackage)ZipPackage.Open(workingFile, FileMode.Create);

            getNewStream(OUTPUT_DATA_FILE_NAME);
            getNewStream(OFFSETS_FILE_NAME, MediaTypeNames.Text.Plain);
        }

        public override void closeFile()
        {
            flush();
            outFile.Flush();
            outFile.Close();
            outFile = null;
        }

        public Stream getNewStream(string fileName, string type = MediaTypeNames.Application.Octet)
        {
            PackagePart packagePart;
            packagePart = outFile.CreatePart(
                PackUriHelper.CreatePartUri(new Uri(fileName, UriKind.Relative)),
                type,
                CompressionOption.SuperFast);

            packageParts.Add(fileName, packagePart);
            return packagePart.GetStream();
        }

        public Stream getStream(string fileName)
        {
            if (packageParts.ContainsKey(fileName))
            {
                return (packageParts[fileName] as PackagePart).GetStream();
            }
            else
            {
                throw new NullReferenceException("No such file as " + fileName + " found.");
            }
        }
        #endregion

        #region HistoryWriterImpl
        public void writeOutputData(SEWS.SimulationEngine.SimulationOutputData data)
        {
            buffer.Enqueue(data);
            if (buffer.Count >= MAXIMUM_BUFFER_SIZE)
            {
                flush();
            }
        }
        System.Diagnostics.Stopwatch s = new System.Diagnostics.Stopwatch();
        public void flush()
        {
            Stream outStream = getStream(OUTPUT_DATA_FILE_NAME);
            Stream offsetsStream = getStream(OFFSETS_FILE_NAME);
            StreamWriter offsetsWriter = new StreamWriter(offsetsStream);

            SEWS.SimulationEngine.SimulationOutputData currentData;
            Console.WriteLine("Writing " + buffer.Count + " records");
            s.Restart();
            while (buffer.Count > 0)
            {
                currentData = buffer.Dequeue();
                Serializer.SerializeWithLengthPrefix(outStream, currentData, PrefixStyle.Base128, 1);
                offsetsWriter.Write(
                    new StringBuilder()
                    .Append(currentData.CurrentStep.ToString())
                    .Append(' ')
                    .Append(currentData.CurrentTime.TimeSinceStart.ToString())
                    .Append(' ')
                    .Append(outStream.Position)
                    .Append("\r\n")
                    .ToString()
                );
            }
            s.Stop();
            Console.WriteLine("Took " + s.ElapsedMilliseconds + " ms.");

            outStream.Flush();
            offsetsWriter.Flush();
            offsetsStream.Flush();
            outStream.Close();
            offsetsWriter.Close();
            offsetsStream.Close();

            outFile.Flush();
        }
        #endregion
    }
}

Output is:

Writing 300 records
Took 138 ms.
Writing 300 records
Took 18 ms.
Writing 300 records
Took 14 ms.
Writing 300 records
Took 14 ms.
Writing 300 records
Took 14 ms.
Writing 41 records
Took 5 ms.

First 5 writes are automatic and the last one (41 records) is regarding to a pause in simulation.

1

There are 1 answers

0
Henk Holterman On BEST ANSWER

Your code repeatedly calls flush() and the intention inside flush() seems to be to reuse a file through getStream() .

But at the tail end of flush() there is a call to outStream.Close(); and from that it follows that getStream() must always be creating a new file. In Overwrite mode apparently.