I'm using Apache Hudi to write non partitioned table to AWS S3 and sync that to hive. Here's the DataSourceWriteOptions
being used.
val hudiOptions: Map[String, String] = Map[String, String](
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "MERGE_ON_READ",
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "PERSON_ID",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "",
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "UPDATED_DATE",
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "",
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[NonPartitionedExtractor].getName,
DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true",
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator"
)
Table is being written successfully if partitioned but gives error in case I try to write non-partitioned table. Here's the error output snippet
Caused by: java.lang.NullPointerException
at org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getTableMetaClientForBasePath(HoodieInputFormatUtils.java:283)
at org.apache.hudi.hadoop.InputPathHandler.parseInputPaths(InputPathHandler.java:100)
at org.apache.hudi.hadoop.InputPathHandler.<init>(InputPathHandler.java:60)
at org.apache.hudi.hadoop.HoodieParquetInputFormat.listStatus(HoodieParquetInputFormat.java:81)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:288)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at org.apache.spark.rdd.RDD.getNumPartitions(RDD.scala:289)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:83)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:82)
at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.cancel(QueryStageExec.scala:152)
at org.apache.spark.sql.execution.adaptive.MaterializeExecutable.cancel(AdaptiveExecutable.scala:357)
at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.fail(AdaptiveExecutor.scala:280)
... 41 more
Here's the code for HoodieInputFormatUtils.getTableMetaClientForBasePath()
/**
* Extract HoodieTableMetaClient from a partition path(not base path).
* @param fs
* @param dataPath
* @return
* @throws IOException
*/
public static HoodieTableMetaClient getTableMetaClientForBasePath(FileSystem fs, Path dataPath) throws IOException {
int levels = HoodieHiveUtils.DEFAULT_LEVELS_TO_BASEPATH;
if (HoodiePartitionMetadata.hasPartitionMetadata(fs, dataPath)) {
HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, dataPath);
metadata.readFromFS();
levels = metadata.getPartitionDepth();
}
Path baseDir = HoodieHiveUtils.getNthParent(dataPath, levels);
LOG.info("Reading hoodie metadata from path " + baseDir.toString());
return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
}
Line 283 is LOG.info()
which is causing NullPointerException. So it looks like that config values provided for partitioning have been messed up. This code is being run on AWS EMR.
Release label:emr-5.30.1
Hadoop distribution:Amazon 2.8.5
Applications:Hive 2.3.6, Spark 2.4.5
I doubt PARTITIONPATH_FIELD_OPT_KEY and HIVE_PARTITION_FIELDS_OPT_KEY should be left undefined. To validate your config, I suggest going to https://doc.hcs.huawei.com/usermanual/mrs/mrs_01_24035.html
hoodie.datasource.write.partitionpath.field and hoodie.datasource.hive_sync.partition_fields are supposed be blank
hoodie.datasource.write.keygenerator.class -> org.apache.hudi.keygen.NonpartitionedKeyGenerator
hoodie.datasource.hive_sync.partition_extractor_class->org.apache.hudi.hive.NonPartitionedExtractor
I was facing hive sync issue on pySpark with Hudi 0.9.0, the above documentation helped.