I've been thinking how to create a bridge between IObservable<byte> and Stream, but I'm completely lost.
The use case for this is when a function needs you to provide a Stream, but you have your data in an IObservable<Byte>.
Since the Stream is pull-based and IObservable is push-based, I don't even know if it's feasible to do such a thing without trying calling byteSequence.ToEnumerable().
The problem is that, in order to create a Stream out of it, I'd need to do
new MemoryStream(byteSequence.ToEnumerable().ToArray());
This will work, but will need to allocate ALL the bytes in memory and that is a deal breaker for many use cases.
UPDATE
I've been inspired by Marc Gravell's comment and I tried my luck up with this code that is using Pipes. Apparently, it works! I don't know if it's completely OK, but I've leave here for reference.
public static Stream ToStream(this IObservable<byte> observable, int bufferSize = 4096)
{
var pipe = new System.IO.Pipelines.Pipe();
var reader = pipe.Reader;
var writer = pipe.Writer;
observable
.Buffer(bufferSize)
.Select(buffer => Observable.FromAsync(async () => await writer.WriteAsync(buffer.ToArray())))
.Concat()
.Subscribe(_ => { }, onCompleted: () => { writer.Complete(); }, onError: exception => writer.Complete(exception));
return reader.AsStream();
}
Feel free to improve it.