Indexing in datafusion

650 views Asked by At

Context: I am using datafusion to build a data validator for a csv file input.

Requirement: I want to add row number where the error occurred in output report. In pandas, I have ability to add row index which can be used for this purpose. Is there a way to achieve similar result in datafusion.

2

There are 2 answers

0
Ian Graham On BEST ANSWER

There doesn't appear to be any easy way to do this within datafusion after opening the CSV file. But you could instead open the CSV file directly with arrow, produce a new RecordBatch that incorporates the index column, and then feed this to datafusion using a MemTable. Here's the example assuming we are only processing one batch ...

use datafusion::prelude::*;
use datafusion::datasource::MemTable;
use arrow::util::pretty::print_batches;
use arrow::record_batch::RecordBatch;
use arrow::array::{UInt32Array, Int64Array};
use arrow::datatypes::{Schema, Field, DataType};
use arrow::csv;

use std::fs::File;
use std::sync::Arc;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {


    let schema = Schema::new(vec![
        Field::new("a", DataType::Int64, false),
        Field::new("b", DataType::Int64, false),
    ]);
    
    let file = File::open("tests/example.csv")?;
    
    let mut csv = csv::Reader::new(file, Arc::new(schema), true, None, 1024, None, None);
    let batch = csv.next().unwrap()?;

    let length = batch.num_rows() as u32;
    let idx_array = UInt32Array::from((0..length).collect::<Vec<u32>>());
    let a_array = Int64Array::from(batch.column(0).as_any().downcast_ref::<Int64Array>().unwrap().values().to_vec());
    let b_array = Int64Array::from(batch.column(1).as_any().downcast_ref::<Int64Array>().unwrap().values().to_vec());
    let new_schema = Schema::new(vec![
        Field::new("idx", DataType::UInt32, true),
        Field::new("a", DataType::Int64, false),
        Field::new("b", DataType::Int64, false),
    ]);

    let new_batch = RecordBatch::try_new(Arc::new(new_schema),
        vec![Arc::new(idx_array), Arc::new(a_array), Arc::new(b_array)])?;
    let mem_table = MemTable::try_new(new_batch.schema(), vec![vec![new_batch]])?;
    
    let mut ctx = ExecutionContext::new();

    // create the dataframe
    let df = ctx.read_table(Arc::new(mem_table))?;

    let results = df.collect().await?;

    print_batches(&results).unwrap();

    // do whatever you need to do
    // do whatever you need to do
    // do whatever you need to do
    
    Ok(())
}

My example.csv looks like this ...

a,b
1,2
1,3
4,2
2,6
3,7

And the output should be ...

+-----+---+---+
| idx | a | b |
+-----+---+---+
| 0   | 1 | 2 |
| 1   | 1 | 3 |
| 2   | 4 | 2 |
| 3   | 2 | 6 |
| 4   | 3 | 7 |
+-----+---+---+

Though if you're really just in search of a crate with functionality like pandas in python, I'd urge you to checkout polars.

2
ritchie46 On

Given Ian Grahams advice to checkout polars, I thought I give an example on how this could be achieved in polars as well:

use polars::prelude::*;
use std::io::Cursor;

fn main() -> Result<()> {

    // use an in memory repr for the csv
    let csv = Cursor::new(
        "a,b
1,2
1,3
4,2
2,6
3,7
",
    );

    // parse the csv into a DataFrame
    let mut df = CsvReader::new(csv).finish()?;

    // create the index column based on the dataframes height
    // note that we use the `NoNull` wrapper to create from `T` instead of `Option<T>`
    let mut idx: NoNull<UInt32Chunked> = (0..df.height() as u32).collect();
    idx.rename("idx");

    // add the index column to the DataFrame
    df.insert_at_idx(0, idx.into_inner().into_series())?;


    // print output
    dbg!(df);

    Ok(())
}

Outputs:

+-----+-----+-----+
| idx | a   | b   |
| --- | --- | --- |
| u32 | i64 | i64 |
+=====+=====+=====+
| 0   | 1   | 2   |
+-----+-----+-----+
| 1   | 1   | 3   |
+-----+-----+-----+
| 2   | 4   | 2   |
+-----+-----+-----+
| 3   | 2   | 6   |
+-----+-----+-----+
| 4   | 3   | 7   |
+-----+-----+-----+