saveAsTable in Spark 1.4 is not working as expected

2k views Asked by At

I want to save a DataFrame as table, using the following commands:

>>> access_df ="hdfs://", format="parquet") 
>>> df_writer = pyspark.sql.DataFrameWriter(access_df)
>>> df_writer.saveAsTable('test_access', format='parquet', mode='overwrite')   

But when I try the last line I got the following stacktrace:

15/06/24 13:21:38 INFO HiveMetaStore: 0: get_table : db=default tbl=test_access
15/06/24 13:21:38 INFO audit: ugi=nanounanue    ip=unknown-ip-addr      cmd=get_table : db=default tbl=test_access
15/06/24 13:21:38 INFO HiveMetaStore: 0: get_table : db=default tbl=test_access
15/06/24 13:21:38 INFO audit: ugi=nanounanue    ip=unknown-ip-addr      cmd=get_table : db=default tbl=test_access
15/06/24 13:21:38 INFO HiveMetaStore: 0: get_database: default
15/06/24 13:21:38 INFO audit: ugi=nanounanue    ip=unknown-ip-addr      cmd=get_database: default
15/06/24 13:21:38 INFO HiveMetaStore: 0: get_table : db=default tbl=test_access
15/06/24 13:21:38 INFO audit: ugi=nanounanue    ip=unknown-ip-addr      cmd=get_table : db=default tbl=test_access
15/06/24 13:21:38 INFO MemoryStore: ensureFreeSpace(231024) called with curMem=343523, maxMem=278302556
15/06/24 13:21:38 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 225.6 KB, free 264.9 MB)
15/06/24 13:21:38 INFO MemoryStore: ensureFreeSpace(19848) called with curMem=574547, maxMem=278302556
15/06/24 13:21:38 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 19.4 KB, free 264.8 MB)
15/06/24 13:21:38 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:44271 (size: 19.4 KB, free: 265.3 MB)
15/06/24 13:21:38 INFO SparkContext: Created broadcast 2 from saveAsTable at
15/06/24 13:21:38 ERROR FileOutputCommitter: Mkdirs failed to create file:/user/hive/warehouse/test_access/_temporary/0
15/06/24 13:21:39 INFO ParquetRelation2$$anonfun$buildScan$1$$anon$1$$anon$2: Using Task Side Metadata Split Strategy
15/06/24 13:21:39 INFO SparkContext: Starting job: saveAsTable at
15/06/24 13:21:39 INFO DAGScheduler: Got job 1 (saveAsTable at with 2 output partitions (allowLocal=false)
15/06/24 13:21:39 INFO DAGScheduler: Final stage: ResultStage 1(saveAsTable at
15/06/24 13:21:39 INFO DAGScheduler: Parents of final stage: List()
15/06/24 13:21:39 INFO DAGScheduler: Missing parents: List()
15/06/24 13:21:39 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[3] at ), which has no missing parents
15/06/24 13:21:39 INFO MemoryStore: ensureFreeSpace(68616) called with curMem=594395, maxMem=278302556
15/06/24 13:21:39 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 67.0 KB, free 264.8 MB)
15/06/24 13:21:39 INFO MemoryStore: ensureFreeSpace(24003) called with curMem=663011, maxMem=278302556
15/06/24 13:21:39 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 23.4 KB, free 264.8 MB)
15/06/24 13:21:39 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:44271 (size: 23.4 KB, free: 265.3 MB)
15/06/24 13:21:39 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:874
15/06/24 13:21:39 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[3] at )
15/06/24 13:21:39 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
15/06/24 13:21:39 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, ANY, 1777 bytes)
15/06/24 13:21:39 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, ANY, 1778 bytes)
15/06/24 13:21:39 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
15/06/24 13:21:39 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)
15/06/24 13:21:39 INFO ParquetRelation2$$anonfun$buildScan$1$$anon$1: Input split: ParquetInputSplit{part: hdfs:// start: 0 end: 259022 length: 259022 hosts: [] requestedSchema: message root {
  optional binary client_ident (UTF8);
  optional binary content_size (UTF8);
  optional binary date_time (UTF8);
  optional binary endpoint (UTF8);
  optional binary ip_address (UTF8);
  optional binary method (UTF8);
  optional binary protocol (UTF8);
  optional binary referer (UTF8);
  optional binary response_code (UTF8);
  optional binary response_time (UTF8);
  optional binary user_agent (UTF8);
  optional binary user_id (UTF8);
 readSupportMetadata: {org.apache.spark.sql.parquet.row.metadata={"type":"struct","fields":[{"name":"client_ident","type":"string","nullable":true,"metadata":{}},{"name":"content_size","type":"string","nullable":true,"metadata":{}},{"name":"date_time","type":"string","nullable":true,"metadata":{}},{"name":"endpoint","type":"string","nullable":true,"metadata":{}},{"name":"ip_addres
{}},{"name":"user_agent","type":"string","nullable":true,"metadata":{}},{"name":"user_id","type":"string","nullable":true,"metadata":{}}]}, org.apache.spark.sql.parquet.row.requested_schema={"type":"struct","fields":[{"name":"client_ident","type":"string","nullable":true,"metadata":{}},{"name":"content_size","type":"string","nullable":true,"metadata":{}},{"name":"date_time","type"
15/06/24 13:21:39 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
15/06/24 13:21:39 INFO ParquetRelation2$$anonfun$buildScan$1$$anon$1: Input split: ParquetInputSplit{part: hdfs:// start: 0 end: 315140 length: 315140 hosts: [] requestedSchema: message root {
  optional binary client_ident (UTF8);
  optional binary content_size (UTF8);
  optional binary date_time (UTF8);
  optional binary endpoint (UTF8);
  optional binary ip_address (UTF8);
  optional binary method (UTF8);
  optional binary protocol (UTF8);
  optional binary referer (UTF8);
  optional binary response_code (UTF8);
  optional binary response_time (UTF8);
  optional binary user_agent (UTF8);
  optional binary user_id (UTF8);
 readSupportMetadata: {org.apache.spark.sql.parquet.row.metadata={"type":"struct","fields":[{"name":"client_ident","type":"string","nullable":true,"metadata":{}},{"name":"content_size","type":"string","nullable":true,"metadata":{}},{"name":"date_time","type":"string","nullable":true,"metadata":{}},{"name":"endpoint","type":"string","nullable":true,"metadata":{}},{"name":"ip_addres
{}},{"name":"user_agent","type":"string","nullable":true,"metadata":{}},{"name":"user_id","type":"string","nullable":true,"metadata":{}}]}, org.apache.spark.sql.parquet.row.requested_schema={"type":"struct","fields":[{"name":"client_ident","type":"string","nullable":true,"metadata":{}},{"name":"content_size","type":"string","nullable":true,"metadata":{}},{"name":"date_time","type"
15/06/24 13:21:39 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
15/06/24 13:21:39 INFO InternalParquetRecordReader: RecordReader initialized will read a total of 47428 records.
15/06/24 13:21:39 INFO CodecConfig: Compression: GZIP
15/06/24 13:21:39 INFO ParquetOutputFormat: Parquet block size to 134217728
15/06/24 13:21:39 INFO ParquetOutputFormat: Parquet page size to 1048576
15/06/24 13:21:39 INFO ParquetOutputFormat: Parquet dictionary page size to 1048576
15/06/24 13:21:39 INFO ParquetOutputFormat: Dictionary is on
15/06/24 13:21:39 INFO ParquetOutputFormat: Validation is off
15/06/24 13:21:39 INFO ParquetOutputFormat: Writer version is: PARQUET_1_0
15/06/24 13:21:39 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 3) Mkdirs failed to create file:/user/hive/warehouse/test_access/_temporary/0/_temporary/attempt_201506241321_0001_m_000001_0 (exists=false, cwd=file:/home/nanounanue)
        at org.apache.hadoop.fs.ChecksumFileSystem.create(
        at org.apache.hadoop.fs.ChecksumFileSystem.create(
        at org.apache.hadoop.fs.FileSystem.create(
        at org.apache.hadoop.fs.FileSystem.create(
        at org.apache.hadoop.fs.FileSystem.create(
        at parquet.hadoop.ParquetFileWriter.<init>(
        at parquet.hadoop.ParquetOutputFormat.getRecordWriter(
        at parquet.hadoop.ParquetOutputFormat.getRecordWriter(
        at org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:111)

The user nanounanue has write permission in that directory:

[hdfs@ip-10-0-0-209 ec2-user]$ hadoop fs -ls -R /user/hive/ | grep warehouse
drwxrwxrwt   - hive hive          0 2015-06-23 21:16 /user/hive/warehouse

What is missing?


There are 3 answers

Zia Kiyani On

I've also encounter this issue. When I've moved from Spark 1.2 to Spark 1.3, It is actually permissions issues. Try to use Apache Spark instead of Cloudera, Spark, As this solved my problem.

Leet-Falcon On

This seems like a bug related to the creation of new directories under Hive meta-store directory
(in your case /user/hive/warehouse).
As a workaround, try changing default permissions for your meta-store directory granting your user with rwx permissions recursively.

poseidon On

based on your log : file:/user/hive/warehouse/test_access/_temporary/0/_temporary/attempt_201506241321_0001_m_000001_0 (exists=false, cwd=file:/home/nanounanue)

Spark is trying to create file in path /user/hive/warehouse/test_access/

when you use default settings by spark , which use derby as hivemetastore will lead to to this default local path /user/hive/warehouse/ which your process do not have the privilege to do so.