I'm attempting to write a Delta table without employing Spark, and I've chosen to use the delta-rs library. I've encountered an issue when trying to generate a Delta table using a Parquet file.
Here is the error message I get:
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Generic("Updating table schema not yet implemented")'
I can create the table, and I've tried to configure the schema of the Delta table to mirror that of the Parquet one as closely as possible. However, it's not working as expected.
Here is my code:
use std::fs::File;
use arrow::record_batch::RecordBatch;
use deltalake::{SchemaField, DeltaOps, DeltaTableBuilder};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use deltalake::operations::create::CreateBuilder;
use deltalake::{SchemaDataType, DeltaTable};
#[tokio::main(flavor = "current_thread")]
async fn main(){
let file = File::open("./data/userdata1.parquet").unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
println!("Converted arrow schema is: {}", builder.schema());
let mut vec: Vec<SchemaField> = Vec::new();
for item in builder.schema().fields(){
let data_type = get_value_based_on_string(item.data_type().to_string().to_lowercase());
println!("{}: {}", item.name(), data_type);
let schema_field = SchemaField::new(
item.name().to_string(),
SchemaDataType::primitive(data_type),
true,
Default::default()
);
vec.push(schema_field);
//println!("{}", item);
}
let mut reader = builder.build().unwrap();
let record_batch: arrow::record_batch::RecordBatch = reader.next().unwrap().unwrap();
println!("Read {} records.", record_batch.num_rows());
create_delta_table(vec).await;
append_to_table("./delta".to_string(), record_batch).await;
}
async fn create_delta_table(columns: Vec<SchemaField>) -> DeltaTable{
let builder: CreateBuilder = CreateBuilder::new().with_location("./delta").with_columns(columns); //.with_save_mode(SaveMode::Overwrite);
builder.await.unwrap()
}
async fn append_to_table(
path: String,
batch: RecordBatch,
) -> DeltaTable {
let table = DeltaTableBuilder::from_uri(path)
.build()
.unwrap();
let ops: DeltaOps = DeltaOps::from(table);
let commit_result = ops.write(vec![batch.clone()]).await.unwrap();
commit_result
}
fn get_value_based_on_string(input: String) -> String {
println!("{}", input);
if input.starts_with("timestamp") {
String::from("timestamp")
}
else if input.starts_with("utf8") {
String::from("string")
}
else if input.starts_with("boolean") {
String::from("boolean")
}
else if input.starts_with("int8") | input.starts_with("UInt8") {
String::from("byte")
}
else if input.starts_with("int16") | input.starts_with("UInt16") {
String::from("short")
}
else if input.starts_with("int32") | input.starts_with("UInt32") {
String::from("integer")
}
else if input.starts_with("int64") | input.starts_with("UInt64"){
String::from("long")
}
else if input.starts_with("float16") | input.starts_with("float32") {
String::from("float")
}
else if input.starts_with("float") {
String::from("double")
}
else if input.starts_with("date") {
String::from("date")
}
else if input.starts_with("binary") {
String::from("binary")
}
else {
String::from("string")
}
}