How to write the dataframe to S3 after filter

4.6k views Asked by At

I am trying to write the data-frame after filtering to S3 in CVS format in script editing with below Scala code.

Current status:

  • Does not show any error after run but just not writing to S3.

  • The logs screen print Start, however cannot see print End.

  • No particular error message indicating the problem.

  • Stops at temp.count.

Environment condition: I have admin rights to all S3.

import com.amazonaws.services.glue.GlueContext
import <others>

object GlueApp {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    // @params: [JOB_NAME]
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    
    val datasource0 = glueContext.getCatalogSource(database = "db", tableName = "table", redshiftTmpDir = "", transformationContext = "datasource0").getDynamicFrame()
    val appymapping1 = datasource0.appyMapping(mapping=........)

    val temp=appymapping1.toDF.filter(some filtering rules)
    print("start")
    if (temp.count() <= 0) {
    temp.write.format("csv").option("sep", ",").save("s3://directory/error.csv")
  }
    print("End")
     
1

There are 1 answers

0
Achyut Vyas On BEST ANSWER

you're writing Dataframe to S3 using if condition (If condition is to check whether dataframe has one or more row), but your If condition is invert. It's only true if dataframe has 0 (or lesser) row. so Change that.

Advance: Spark always saves files as "part-" name. so change S3 path as s3://directory/. and add .mode("overwrite") .

so your write df query should be

temp.write.format("csv").option("sep", ",").mode("overwrite").save("s3://directory")