Unable to write non parititoned table using Apache Hudi

1.7k views Asked by At

I'm using Apache Hudi to write non partitioned table to AWS S3 and sync that to hive. Here's the DataSourceWriteOptions being used.

val hudiOptions: Map[String, String] = Map[String, String](
      DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "MERGE_ON_READ",
      DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "PERSON_ID",
      DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "",
      DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "UPDATED_DATE",
      DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "",
      DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[NonPartitionedExtractor].getName,
      DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true",
      DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator"
    )

Table is being written successfully if partitioned but gives error in case I try to write non-partitioned table. Here's the error output snippet

Caused by: java.lang.NullPointerException
        at org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getTableMetaClientForBasePath(HoodieInputFormatUtils.java:283)
        at org.apache.hudi.hadoop.InputPathHandler.parseInputPaths(InputPathHandler.java:100)
        at org.apache.hudi.hadoop.InputPathHandler.<init>(InputPathHandler.java:60)
        at org.apache.hudi.hadoop.HoodieParquetInputFormat.listStatus(HoodieParquetInputFormat.java:81)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:288)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
        at org.apache.spark.rdd.RDD.getNumPartitions(RDD.scala:289)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:83)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:82)
        at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.cancel(QueryStageExec.scala:152)
        at org.apache.spark.sql.execution.adaptive.MaterializeExecutable.cancel(AdaptiveExecutable.scala:357)
        at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.fail(AdaptiveExecutor.scala:280)
        ... 41 more

Here's the code for HoodieInputFormatUtils.getTableMetaClientForBasePath()

/**
   * Extract HoodieTableMetaClient from a partition path(not base path).
   * @param fs
   * @param dataPath
   * @return
   * @throws IOException
   */
  public static HoodieTableMetaClient getTableMetaClientForBasePath(FileSystem fs, Path dataPath) throws IOException {
    int levels = HoodieHiveUtils.DEFAULT_LEVELS_TO_BASEPATH;
    if (HoodiePartitionMetadata.hasPartitionMetadata(fs, dataPath)) {
      HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, dataPath);
      metadata.readFromFS();
      levels = metadata.getPartitionDepth();
    }
    Path baseDir = HoodieHiveUtils.getNthParent(dataPath, levels);
    LOG.info("Reading hoodie metadata from path " + baseDir.toString());
    return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
  }

Line 283 is LOG.info() which is causing NullPointerException. So it looks like that config values provided for partitioning have been messed up. This code is being run on AWS EMR.

Release label:emr-5.30.1
Hadoop distribution:Amazon 2.8.5
Applications:Hive 2.3.6, Spark 2.4.5
1

There are 1 answers

0
Vyshnav On

I doubt PARTITIONPATH_FIELD_OPT_KEY and HIVE_PARTITION_FIELDS_OPT_KEY should be left undefined. To validate your config, I suggest going to https://doc.hcs.huawei.com/usermanual/mrs/mrs_01_24035.html

hoodie.datasource.write.partitionpath.field and hoodie.datasource.hive_sync.partition_fields are supposed be blank

hoodie.datasource.write.keygenerator.class -> org.apache.hudi.keygen.NonpartitionedKeyGenerator

hoodie.datasource.hive_sync.partition_extractor_class->org.apache.hudi.hive.NonPartitionedExtractor

I was facing hive sync issue on pySpark with Hudi 0.9.0, the above documentation helped.