I'm using Dataroc Metastore, Dataproc Batch and Pyspark. Although I'm using GCP, I beleive this is general Apache Iceberg question.
I run My spark job and created Iceberg trips table with automatic snapshot expiration in 1 hour history.expire.max-snapshot-age-ms=3600000
and wrote from content of CSV file into table.
conf = (
SparkConf()
.setAppName('read_from_iceberg')
.set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
.set('spark.sql.catalog.spark_catalog', 'org.apache.iceberg.spark.SparkSessionCatalog')
.set('spark.sql.catalog.spark_catalog.type', 'hive')
.set(f'spark.sql.catalog.dev', 'org.apache.iceberg.spark.SparkCatalog')
.set(f'spark.sql.catalog.dev.type', 'hive')
.set(f'spark.sql.warehouse.dir', lake_bucket)
)
spark = SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate()
schema = StructType([
StructField("vendor_id", LongType(), True),
StructField("trip_id", LongType(), True),
StructField("trip_distance", FloatType(), True),
StructField("fare_amount", DoubleType(), True),
StructField("store_and_fwd_flag", StringType(), True)
])
# Create database if not exists
spark.sql("CREATE DATABASE IF NOT EXISTS dev.lakehouse")
# Create table if doesn't exist.
# df = spark.createDataFrame([], schema)
df.writeTo("dev.lakehouse.trips").partitionedBy("vendor_id")
.tableProperty('format-version', '2')
.tableProperty("history.expire.max-snapshot-age-ms","3600000").createOrReplace())
df3 = spark.read.option("delimiter", ",").schema(schema).option("header", True).csv(
"gs://my-super-bucket/csv-input/bulk/*")
df3.write.mode('append').format("iceberg").insertInto("dev.lakehouse.trips")
I repeated batch execution for a several times and as the result I have 45 millions objects from writes.
+---------+
| count(1)|
+---------+
|450000000|
+---------+
Now I would like to see the table history.
spark.sql("SELECT * FROM dev.lakehouse.trips4.history").show()
And the result is the following:
+--------------------+-------------------+-------------------+-------------------+
| made_current_at| snapshot_id| parent_id|is_current_ancestor|
+--------------------+-------------------+-------------------+-------------------+
|2023-11-08 09:05:...|3365635318905728444| null| true|
|2023-11-08 09:07:...|8818173850344394660|3365635318905728444| true|
|2023-11-08 09:18:...|7080281147456503211|8818173850344394660| true|
|2023-11-08 09:26:...|1124704647664806615|7080281147456503211| true|
|2023-11-08 09:43:...|1410379929547885531|1124704647664806615| true|
|2023-11-08 09:44:...|2828602979849095888|1410379929547885531| true|
|2023-11-08 11:59:...|3836167930220261494|2828602979849095888| true|
|2023-11-08 12:09:...|7872321137982208330|3836167930220261494| true|
+--------------------+-------------------+-------------------+-------------------+
Although, the expiration is set to one hour, I still see all other snapshots that supposed to be removed.
I know that I can always use
spark.sql("CALL dev.system.expire_snapshots('dev.lakehouse.trips4', TIMESTAMP '2023-11-08 11:00:00.000', 1)")
and this will remove older then a given timestamp, but should't it be done automatic?
Expired Iceberg table snapshots are not automatically deleted. The doc says
The config
history.expire.max-snapshot-age-ms
isThe code
spark.sql("CALL dev.system.expire_snapshots('dev.lakehouse.trips4', TIMESTAMP '2023-11-08 11:00:00.000', 1)")
you show is the Spark SQL extension for expiring snapshots.