I am trying to create a luigi pipeline in the following format:
Task A
Takes in as input a list of sentences, applies a function to each sentence in the list and returns a modified list of said sentences
Task B
Takes in as input the output modified list of Task A, but also has a requirement that Task A be ran before this task can run. It will apply a function to each element in the list and return a modified list as an output.
Below is some code that I thought would work but for some reason Task B will unsuccesfully run, and I am not sure why?
import luigi
class TaskA(luigi.Task):
sentence = Luigi.ListParameter()
def output(self):
# Return the generated list as output
return luigi.ListParameter(default=[])
def run(self):
# Simulating the generation of a list
output_list = FunctionA(self.sentence)
self.output().dump(output_list)
def FunctionA(self):
# Code goes here....
class TaskB(luigi.Task):
sentence = Luigi.ListParameter()
def requires(self):
return TaskA()
def output(self):
# Return the modified list as output
return luigi.ListParameter(default=[])
def run(self):
# Retrieve the output of TaskA
input_list_from_taskA = self.input().load()
# Modify the list further (for example, add 10 to each element)
modified_list_for_taskB = FunctionB(modified_list_from_taskA)
# Return the modified list as output
self.output().dump(modified_list_for_taskB)
def FunctionB(self):
# Code goes here...
if __name__ == '__main__':
sentences_to_process = [
"No conjunctions, but I am in love",
"The network is good but the food is better",
"Good customer support network not great"
]
luigi.build([TaskB(sentence=s) for s in sentences_to_process], local_scheduler=True)
Ive tried the above code and get a succesfull run on Task A but unsuccesful run on Task B
I also dont want the outputs to be LocalTargets, instead I want them to remain as lists