Convert from IObservable<byte> to Stream?

32 views Asked by At

I was looking to convert from IObservable<byte> to Stream and came up with this code that uses System.IO.Pipelines.Pipe.

public static Stream ToStream(this IObservable<byte> observable, int bufferSize = 4096)
{
    var pipe = new System.IO.Pipelines.Pipe();

    var reader = pipe.Reader;
    var writer = pipe.Writer;

    observable
        .Buffer(bufferSize)
        .Select(buffer => Observable.FromAsync(async () => await writer.WriteAsync(buffer.ToArray())))
        .Concat()
        .Subscribe(_ => { }, onCompleted: () => { writer.Complete(); }, onError: exception => writer.Complete(exception));

    return reader.AsStream();
}

I've tried to write the resulting Stream to disk and the performance is terrible (under 10 MB/s). I'm starting to think that something is wrong. I hope someone can shed light to this and provide a better method to proxy observable of byte to Stream.

0

There are 0 answers