Community, I'd like to delete the HdfsTarget
folder before running a SparkSubmitTask
. What is the best practice? So far I tried two options mentioned in the code attached without success:
- Dependent/required job doesn't get executed if
HdfsTarget
already exists - Tasks would be executed in parallel if called with
yield
import luigi
import luigi.format
import luigi.contrib.hdfs
from luigi.contrib.spark import SparkSubmitTask
class CleanUp(luigi.Task):
path = luigi.Parameter()
def run(self):
self.target = luigi.contrib.hdfs.HdfsTarget(self.path, format=luigi.format.Gzip)
if self.target.exists():
self.target.remove(skip_trash=True)
class MySparkTask(SparkSubmitTask):
output = luigi.Parameter()
driver_memory = '8g'
executor_memory = '3g'
num_executors = 5
app = 'my-app.jar'
entry_class = 'com.company.MyJob'
def app_options(self):
return ['/input', self.output]
def requires(self):
(1)
def output(self):
return luigi.contrib.hdfs.HdfsTarget(self.output, format=luigi.format.Gzip)
class RunAll(luigi.Task):
result_dir = '/output'
''' Dummy task that triggers execution of a other tasks'''
def requires(self):
(2)
return MySparkTask(self.result_dir)