I was looking to convert from IObservable<byte> to Stream and came up with this code that uses System.IO.Pipelines.Pipe.
public static Stream ToStream(this IObservable<byte> observable, int bufferSize = 4096)
{
var pipe = new System.IO.Pipelines.Pipe();
var reader = pipe.Reader;
var writer = pipe.Writer;
observable
.Buffer(bufferSize)
.Select(buffer => Observable.FromAsync(async () => await writer.WriteAsync(buffer.ToArray())))
.Concat()
.Subscribe(_ => { }, onCompleted: () => { writer.Complete(); }, onError: exception => writer.Complete(exception));
return reader.AsStream();
}
I've tried to write the resulting Stream to disk and the performance is terrible (under 10 MB/s). I'm starting to think that something is wrong. I hope someone can shed light to this and provide a better method to proxy observable of byte to Stream.