Streaming compressed IDataReader using protobuf

673 views Asked by At

We have a need to greatly reduce the bandwidth our back-end services use when pulling and pushing data to sql. The TDS Stream used by SqlClient is fairly bloated. For years, people have requested a compression option when pulling from sql, but Microsoft has not added it.

I wanted to see if anyone had any thought on the best way to handle this. Here is what I've tried so far:

  1. I modified https://github.com/MindFlavor/TDSBridge to add compression and decompression right at the socket layer. Because the payload is SSL encrypted, it didn't make much difference.

  2. Next I took the IDataReader to Protobuf library found: https://github.com/dotarj/protobuf-net-data and TCP framework found at https://github.com/jchristn/WatsonTcp in an attempt to create a client server proxy of sorts to stream the IDataReader over the wire by converting it to protobuf, then compressing this stream, and doing the opposite on the other end.

I got a proof of concept to work here, and actually got between 84% and 98% reduction in raw bytes over the network vs plain TDS Streams. The downside is the WatsonTcp wants you to pass in the content length when assigning the stream. But there is no way to know that until you create the whole protobuf stream. We sometimes transfer hundreds of gigs in one swoop, so that won't work.

I didn't see how protobuf-net-data could stream over grpc, and even if it could, I fear the granular nature of the records in an IAsyncEnumerable may slow down a large transfer.

No doubt I can sit and write a fully custom raw socket compressed protobuf over TCP streaming implementation with the surface area for the clients being close to the SqlCommand, I just know this is notoriously difficult to get right.

Any time saving ideas? If not maybe I'll make an open source project out of it.

2

There are 2 answers

0
David Browne - Microsoft On

Here's a pattern you can use to take a large query and transmit it as a series of batches, where each batch is a compressed, binary-serialized DataTable. After transmission and deserialization each DataTable can be directly used by SqlBulk Copy. The same pattern can work with other formats, but would require an additional converter before passing to SqlBulkCopy.

using System.Data.SqlClient;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;

namespace BatchingDataReader
{
    class BatchingDataReader : IDataReader
    {
        private int batchSize;
        private IDataReader rdr;
        private int rowsRead;
        private bool atEnd;
        private int batchesRead;

        public BatchingDataReader(IDataReader rdr, int batchSize)
        {
            this.batchSize = batchSize;
            this.rdr = rdr;
        }

        public object this[int i] => rdr[i];

        public object this[string name] => rdr[name];

        public int Depth => rdr.Depth;

        public bool IsClosed => rdr.IsClosed;

        public int RecordsAffected => rdr.RecordsAffected;

        public int FieldCount => rdr.FieldCount;

        public void Close()
        {
            if (!atEnd)
                return;
            rdr.Close();
        }

        public void Dispose()
        {
            if (!atEnd)
                return;

            rdr.Dispose();
        }

        public bool GetBoolean(int i)
        {
            return rdr.GetBoolean(i);
        }

        public byte GetByte(int i)
        {
            return rdr.GetByte(i);
        }

        public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length)
        {
            return rdr.GetBytes(i, fieldOffset, buffer, bufferoffset, length);
        }

        public char GetChar(int i)
        {
            return rdr.GetChar(i);
        }

        public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length)
        {
            return rdr.GetChars(i, fieldoffset, buffer, bufferoffset, length);
        }

        public IDataReader GetData(int i)
        {
            return rdr.GetData(i);
        }

        public string GetDataTypeName(int i)
        {
            return rdr.GetDataTypeName(i);
        }

        public DateTime GetDateTime(int i)
        {
            return rdr.GetDateTime(i);
        }

        public decimal GetDecimal(int i)
        {
            return rdr.GetDecimal(i);
        }

        public double GetDouble(int i)
        {
            return rdr.GetDouble(i);
        }

        public Type GetFieldType(int i)
        {
            return rdr.GetFieldType(i);
        }

        public float GetFloat(int i)
        {
            return rdr.GetFloat(i);
        }

        public Guid GetGuid(int i)
        {
            return rdr.GetGuid(i);
        }

        public short GetInt16(int i)
        {
            return rdr.GetInt16(i);
        }

        public int GetInt32(int i)
        {
            return rdr.GetInt32(i);
        }

        public long GetInt64(int i)
        {
            return rdr.GetInt64(i);
        }

        public string GetName(int i)
        {
            return rdr.GetName(i);
        }

        public int GetOrdinal(string name)
        {
            return rdr.GetOrdinal(name);
        }

        public DataTable GetSchemaTable()
        {
            return rdr.GetSchemaTable();
        }

        public string GetString(int i)
        {
            return rdr.GetString(i);
        }

        public object GetValue(int i)
        {
            return rdr.GetValue(i);
        }

        public int GetValues(object[] values)
        {
            return rdr.GetValues(values);
        }

        public bool IsDBNull(int i)
        {
            return rdr.IsDBNull(i);
        }

        public bool NextResult()
        {
            if (!atEnd)
            {
                batchesRead += 1;
                rowsRead = 0;
                return true;
            }

            if (IsClosed)
                return false;

            return rdr.NextResult();
        }

        public bool Read()
        {
            if (rowsRead >= batchSize)
                return false;
            rowsRead += 1;

            atEnd = !rdr.Read();
            return !atEnd;

        }

        public static IEnumerable<DataTable> Read(SqlDataReader r, int batchSize)
        {
            var rdr = new BatchingDataReader(r, batchSize);
            do
            {
                var dt = new DataTable();
                dt.TableName = "table";
                dt.Load(rdr);
                yield return dt;
            } while (rdr.NextResult());
        }
    }
    class Program
    {

        static void Main(string[] args)
        {
            var constr = "server=localhost;database=master;integrated security=true";
            var outfile = "c:\\temp\\out.bin";

            if (File.Exists(outfile))
                File.Delete(outfile);

            using (var con = new SqlConnection(constr))
            {
                //322,162,200  TDS raw
                //235,355,311  binary uncompressed out.bin
                // 52,755,181  binary GZ Fastest
                // 43,061,121  binary GZ optimal
                // 65,282,624  XML GZ fastest
                // 41,892,056  binary GZ optimal 100,000 row batches

                con.Open();

                var bin = new BinaryFormatter();

                var cmd = new SqlCommand("select top (1000000) * from sys.messages m, sys.objects o", con);
                using (SqlDataReader rdr = cmd.ExecuteReader())
                using (var destFile = File.OpenWrite(outfile))
                using (var zipStream = new System.IO.Compression.GZipStream(destFile,System.IO.Compression.CompressionLevel.Optimal))
                {
                    foreach (var dt in BatchingDataReader.Read(rdr, 10000))
                    {
                        Console.WriteLine(dt.Rows.Count);

                        dt.RemotingFormat = SerializationFormat.Binary;
                        bin.Serialize(zipStream, dt);
                    }
                }
            }
        }
    }

}
0
gordy On

You can use this technique to get SQL Server to format the results as gzipped csv (adjust lines-per-result in the group by - 1000 is about where the gzip overhead diminishes):

with csv as (
    select n = row_number() over (order by (select null)),
        line = convert(nvarchar(max), concat(
            message_id, ',', language_id, ',', severity, ',',
            is_event_logged, ',', '"' + replace([text], '"', '""') + '"'))
    from sys.messages)

select compress(string_agg(line, char(13)) within group (order by n))
from csv group by n / 1000

..this should help if you're facing an actual egress bottleneck at your SQL Server. It would be interesting to implement this as a TDSBridge that rewrites the query and then transforms the results back to what the client was expecting.