I am pulling messages out of kafka and writing them to parquet
files. These messages in the kafka topic contains these keys: offset
key
value
timestamp
. As far as i can tell, only value
can be nullable and everything should contain some value. In my hoodie application, i am using the timestamp
to be my partitions by extracting the year
month
day
and hour
parts of the timestamp
key. The application writes some data (i can see parquet
files written to my specified directory) and then it crashes with the error
23/10/24 20:35:07 ERROR HoodieStreamingSink: Micro batch id=0 threw following exception:
org.apache.hudi.exception.HoodieException: Could not sync using the meta sync class org.apache.hudi.hive.HiveSyncTool
at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:61) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$metaSync$2(HoodieSparkSqlWriter.scala:888) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) ~[scala-library-2.12.15.jar:?]
at org.apache.hudi.HoodieSparkSqlWriter$.metaSync(HoodieSparkSqlWriter.scala:886) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:984) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:381) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$2(HoodieStreamingSink.scala:122) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
at scala.util.Try$.apply(Try.scala:213) ~[scala-library-2.12.15.jar:?]
at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$1(HoodieStreamingSink.scala:120) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
at org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:223) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
at org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:119) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:665) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) ~[spark-catalyst_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) ~[spark-catalyst_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:663) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:663) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208) ~[spark-sql_2.12-3.3.2-amzn-0.1.jar:3.3.2-amzn-0.1]
Caused by: org.apache.hudi.exception.HoodieException: Got runtime exception when hive syncing my_table
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:165) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:59) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
... 40 more
Caused by: org.apache.hudi.hive.HoodieHiveSyncException: Failed to sync partitions for table my_table
at org.apache.hudi.hive.HiveSyncTool.syncAllPartitions(HiveSyncTool.java:403) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:272) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
at org.apache.hudi.hive.HiveSyncTool.doSync(HiveSyncTool.java:174) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:162) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:59) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
... 40 more
Caused by: java.lang.IllegalArgumentException: Number of table partition keys must match number of partition values
at com.google.common.base.Preconditions.checkArgument(Preconditions.java:92) ~[guava-14.0.1.jar:?]
at com.amazonaws.glue.catalog.metastore.GlueMetastoreClientDelegate.validateInputForBatchCreatePartitions(GlueMetastoreClientDelegate.java:832) ~[aws-glue-datacatalog-spark-client-3.9.1.jar:?]
at com.amazonaws.glue.catalog.metastore.GlueMetastoreClientDelegate.batchCreatePartitions(GlueMetastoreClientDelegate.java:766) ~[aws-glue-datacatalog-spark-client-3.9.1.jar:?]
at com.amazonaws.glue.catalog.metastore.GlueMetastoreClientDelegate.addPartitions(GlueMetastoreClientDelegate.java:748) ~[aws-glue-datacatalog-spark-client-3.9.1.jar:?]
at com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient.add_partitions(AWSCatalogMetastoreClient.java:339) ~[aws-glue-datacatalog-spark-client-3.9.1.jar:?]
at org.apache.hudi.hive.ddl.HMSDDLExecutor.addPartitionsToTable(HMSDDLExecutor.java:221) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
at org.apache.hudi.hive.HoodieHiveSyncClient.addPartitionsToTable(HoodieHiveSyncClient.java:109) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
at org.apache.hudi.hive.HiveSyncTool.syncPartitions(HiveSyncTool.java:445) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
at org.apache.hudi.hive.HiveSyncTool.syncAllPartitions(HiveSyncTool.java:399) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:272) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
at org.apache.hudi.hive.HiveSyncTool.doSync(HiveSyncTool.java:174) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:162) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:59) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
... 40 more
Since i am observing the behavior that some parquet files were created with the expected partitions, so this mean that some of my Kafka messages are missing the timestamp
key?