spark write to iceberg table without repartition

1.3k views Asked by At

I am developing a process which will write to different iceberg table with different partition. Before, we write data into iceberg by spark, we need re-partition and sort with partition firstly. Now, I can't. I found that set iceberg table property write.distribution-mode = 'hash' will help, but it did not work when i tried.

Anyone knows why the property does not work, or Is there any way to make spark write to iceberg re-partition automatically?

1

There are 1 answers

0
DrJay On BEST ANSWER

write.distribution-mode, - property that controls the distribution mode for parallel writes.

https://iceberg.apache.org/docs/latest/configuration/

df.repartition - repartitioning.

https://sparkbyexamples.com/pyspark/pyspark-repartition-usage/

So, I’m hoping that you can use this example.

In Python

from pyspark.sql import SparkSession

def write_to_iceberg(df, table_path, partition_column):
    # Repartition based on partition_column
    df = df.repartition(partition_column)

    
    # Write DataFrame 
    df.write.format("iceberg") \
        .mode("append") \
        .option("write.distribution-mode", "hash") \
        .save(table_path)

# Create session
spark = SparkSession.builder.appName("IcebergExample").getOrCreate()

# Example DataFrame
df = spark.createDataFrame([("A", 1), ("B", 2), ("A", 3)], ["partition_column", "value"])

# Define list of table paths and partition columns
table_configs = [
    {"table_path": "path_to_table_1", "partition_column": "partition_column_1"},
    {"table_path": "path_to_table_2", "partition_column": "partition_column_2"},
    # Add more table configs as needed
]

# Iterate through the table configurations. Write DataFrame to each table
for config in table_configs:
    write_to_iceberg(df, config["table_path"], config["partition_column"])

# Stop 
spark.stop()