Delete HdfsTarget before running a SparkSubmitTask

152 views Asked by At

Community, I'd like to delete the HdfsTarget folder before running a SparkSubmitTask. What is the best practice? So far I tried two options mentioned in the code attached without success:

  1. Dependent/required job doesn't get executed if HdfsTarget already exists
  2. Tasks would be executed in parallel if called with yield
import luigi
import luigi.format
import luigi.contrib.hdfs
from luigi.contrib.spark import SparkSubmitTask


class CleanUp(luigi.Task):
    path = luigi.Parameter()

    def run(self):
        self.target = luigi.contrib.hdfs.HdfsTarget(self.path, format=luigi.format.Gzip)
        if self.target.exists():
            self.target.remove(skip_trash=True)


class MySparkTask(SparkSubmitTask):
    output = luigi.Parameter()

    driver_memory = '8g'
    executor_memory = '3g'
    num_executors = 5

    app = 'my-app.jar'
    entry_class = 'com.company.MyJob'

    def app_options(self):
        return ['/input', self.output]

    def requires(self):
        (1)

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget(self.output, format=luigi.format.Gzip)


class RunAll(luigi.Task):
    result_dir = '/output'

    ''' Dummy task that triggers execution of a other tasks'''
    def requires(self):
        (2)
        return MySparkTask(self.result_dir)
0

There are 0 answers