How can I read a reqwest::Response object's bytes_stream() with an implementer of arrow_array::RecordBatchReader?

57 views Asked by At

I am attempting to use the reqwest crate to stream binary data in arrow-IPC format from a REST API endpoint out of my control. The reqwest::Reponse object has a bytes_stream() method that returns a type implementing the trait Stream<Item = Result<Bytes>>. I'm hoping its possible to read this as a RecordBatch stream in some way, such as with arrow-ipc::reader::StreamReader or some other implementer of the arrow_array::RecordBatchReader trait. What's the best way to do this?

1

There are 1 answers

0
Chayim Friedman On BEST ANSWER

The arrow crate does not have support for async currently (although there is an open issue for that), but the alternative arrow2 does, and luckily it also defines a conversion layer between its types and arrow's types. So this is one option (the code is quite involved because arrow2's async support requires an AsyncRead, but what we have is a Stream giving Bytes):

use std::io;
use std::pin::Pin;
use std::task::{ready, Context, Poll};

use arrow2::datatypes::Schema;
use arrow2::io::ipc::read::stream_async::AsyncStreamReader;
use arrow2::io::ipc::read::StreamMetadata;
use arrow2::io::ipc::IpcSchema;
use arrow_format::ipc::MetadataVersion;
use bytes::Bytes;
use futures::stream::Fuse;
use futures::{AsyncRead, Stream, StreamExt};
use reqwest::Client;

struct StreamAsAsyncRead<St> {
    stream: Fuse<St>,
    last: Option<Bytes>,
}

impl<St: Stream> StreamAsAsyncRead<St> {
    fn new(stream: St) -> Self {
        Self {
            stream: stream.fuse(),
            last: None,
        }
    }
}

impl<St: Stream<Item = reqwest::Result<Bytes>> + Unpin> AsyncRead for StreamAsAsyncRead<St> {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        let this = &mut *self;
        let data = match &mut this.last {
            Some(data) if !data.is_empty() => data,
            last => {
                let Some(next_data) = ready!(this.stream.poll_next_unpin(cx)) else {
                    return Poll::Ready(Ok(0));
                };
                let next_data = next_data.map_err(|err| io::Error::other(err))?;
                last.insert(next_data)
            }
        };

        let fill_len = std::cmp::min(buf.len(), data.len());
        buf[..fill_len].copy_from_slice(&data[..fill_len]);
        data.advance(fill_len);
        Poll::Ready(Ok(fill_len))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Client::new();
    let response = client.get("<address>").send().await?;
    let data_metadata = StreamMetadata {
        schema: Schema::default(),
        version: MetadataVersion::V5,
        ipc_schema: IpcSchema {
            fields: Vec::new(),
            is_little_endian: true,
        },
    };
    let mut stream = AsyncStreamReader::new(
        StreamAsAsyncRead::new(response.bytes_stream()),
        data_metadata,
    );
    while let Some(item) = stream.next().await {
        let item = item?;
        for item in item.into_arrays() {
            let item = arrow::array::ArrayRef::from(item);
            // Do something with `item`.
        }
    }

    Ok(())
}

Cargo.toml:

[dependencies]
arrow = "50.0.0"
reqwest = { version = "0.11.24", features = ["stream"] }
tokio = { version = "1.36.0", features = ["full"] }
bytes = "1.5.0"
futures = "0.3.30"
arrow-format = "0.8.1"
arrow2 = { version = "0.18.0", features = [
    "io_ipc",
    "io_ipc_read_async",
    "arrow",
] }

Another, simpler option is to give up async support and just use reqwest's blocking interface:

use arrow::ipc::reader::StreamReader;
use reqwest::blocking::Client;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Client::new();
    let response = client.get("<address>").send()?;
    let reader = StreamReader::try_new(response, None)?;
    for item in reader {
        let item = item?;
        // Do something with `item`.
    }
    Ok(())
}