import os
import luigi
import pandas as pd
import requests as req
from bs4 import BeautifulSoup
class DownloadData(luigi.Task):
def run(self):
site = req.get("http://www.gutenberg.org/browse/scores/top").text
with self.output().open("w") as f:
f.write(site)
def output(self):
return luigi.LocalTarget("raw_data.txt")
def complete(self):
return os.path.exists(self.output().path)
class PrePData(luigi.Task):
def requires(self):
return DownloadData()
def run(self):
data = self.requires()
bs4ed_data = []
if data.contains("<!DOCTYPE html>"):
bs4ed_data.append()(data,"html.parser")
else:
print("can not found any problem in this data")
return bs4ed_data
def output(self):
return luigi.local_target("data.txt")
def complete(self):
return os.path.exists(self.output().path)
def on_success(self):
print("data preprocessing completed successfully")
def on_failure(self):
print("data preprocessing failed")
class RunAllTasks(luigi.WrapperTask):
def requires(self):
return [DownloadData(),PrePData()]
ı run this python file with this command in my terminal
python -m luigi --module PipeLineofETL-A RunAllTasks --local-scheduler --workers 4
and error
python -m luigi --module PipeLineofETL-A RunAllTasks --local-scheduler --workers 4
DEBUG: Checking if RunAllTasks() is complete
WARNING: Will not run RunAllTasks() or any dependencies due to error in complete() method:
Traceback (most recent call last):
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 429, in check_complete
is_complete = check_complete_cached(task, completion_cache)
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 414, in check_complete_cached
is_complete = task.complete()
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/task.py", line 845, in complete
return all(r.complete() for r in flatten(self.requires()))
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/task.py", line 845, in <genexpr>
return all(r.complete() for r in flatten(self.requires()))
File "/home/tuna/Belgeler/GitLab/extractdata/ChatGPT's Basic tasks/PipeLineofETL-A.py", line 40, in complete
return os.path.exists(self.output().path)
File "/home/tuna/Belgeler/GitLab/extractdata/ChatGPT's Basic tasks/PipeLineofETL-A.py", line 37, in output
return luigi.local_target("data.txt")
TypeError: 'module' object is not callable
INFO: Informed scheduler that task RunAllTasks__99914b932b has status UNKNOWN
INFO: Done scheduling tasks
INFO: Running Worker with 4 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=1404147006, workers=4, host=tunapc, username=tuna, pid=9077) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 failed scheduling:
- 1 RunAllTasks()
Did not run any tasks
This progress looks :( because there were tasks whose scheduling failed
===== Luigi Execution Summary =====
import os
import luigi
import pandas as pd
import requests as req
from bs4 import BeautifulSoup
class DownloadData(luigi.Task):
def run(self):
site = req.get("http://www.gutenberg.org/browse/scores/top").text
with self.output().open("w") as f:
f.write(site)
def output(self):
return luigi.LocalTarget("raw_data.txt")
def complete(self):
return os.path.exists(self.output().path)
class PrePData(luigi.Task):
def requires(self):
return DownloadData()
def run(self):
data = self.requires()
bs4ed_data = []
if data.contains("<!DOCTYPE html>"):
bs4ed_data.append()(data,"html.parser")
else:
print("can not found any problem in this data")
return bs4ed_data
class RunAllTasks(luigi.WrapperTask):
def requires(self):
return [DownloadData(),PrePData()]
ı write same command in terminal and ı get this error
DEBUG: Checking if RunAllTasks() is complete
/home/tuna/.local/lib/python3.10/site-packages/luigi/task.py:845: UserWarning: Task PrePData() without outputs has no custom complete() method
return all(r.complete() for r in flatten(self.requires()))
DEBUG: Checking if DownloadData() is complete
DEBUG: Checking if PrePData() is complete
/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py:414: UserWarning: Task PrePData() without outputs has no custom complete() method
is_complete = task.complete()
INFO: Informed scheduler that task RunAllTasks__99914b932b has status PENDING
INFO: Informed scheduler that task PrePData__99914b932b has status PENDING
INFO: Informed scheduler that task DownloadData__99914b932b has status DONE
INFO: Done scheduling tasks
INFO: Running Worker with 4 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: PrePData__99914b932b is currently run by worker Worker(salt=3997262702, workers=4, host=tunapc, username=tuna, pid=10617)
INFO: [pid 10624] Worker Worker(salt=3997262702, workers=4, host=tunapc, username=tuna, pid=10617) running PrePData()
ERROR: [pid 10624] Worker Worker(salt=3997262702, workers=4, host=tunapc, username=tuna, pid=10617) failed PrePData()
Traceback (most recent call last):
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 198, in run
new_deps = self._run_get_new_deps()
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 138, in _run_get_new_deps
task_gen = self.task.run()
File "/home/tuna/Belgeler/GitLab/extractdata/ChatGPT's Basic tasks/PipeLineofETL-A.py", line 28, in run
if data.contains("<!DOCTYPE html>"):
AttributeError: 'DownloadData' object has no attribute 'contains'
INFO: Informed scheduler that task PrePData__99914b932b has status FAILED
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers
DEBUG: There are 2 pending tasks unique to this worker
DEBUG: There are 2 pending tasks last scheduled by this worker
INFO: Worker Worker(salt=3997262702, workers=4, host=tunapc, username=tuna, pid=10617) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 3 tasks of which:
* 1 complete ones were encountered:
- 1 DownloadData()
* 1 failed:
- 1 PrePData()
* 1 were left pending, among these:
* 1 had failed dependencies:
- 1 RunAllTasks()
This progress looks :( because there were failed tasks
===== Luigi Execution Summary =====
when ı added output() method to DownloadData in requires function, ı get this error
DEBUG: Checking if RunAllTasks() is complete
/home/tuna/.local/lib/python3.10/site-packages/luigi/task.py:845: UserWarning: Task PrePData() without outputs has no custom complete() method
return all(r.complete() for r in flatten(self.requires()))
DEBUG: Checking if DownloadData() is complete
DEBUG: Checking if PrePData() is complete
/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py:414: UserWarning: Task PrePData() without outputs has no custom complete() method
is_complete = task.complete()
INFO: Informed scheduler that task RunAllTasks__99914b932b has status PENDING
ERROR: Luigi unexpected framework error while scheduling RunAllTasks()
Traceback (most recent call last):
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 794, in add
for next in self._add(item, is_complete):
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 892, in _add
self._validate_dependency(d)
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 917, in _validate_dependency
raise Exception('requires() can not return Target objects. Wrap it in an ExternalTask class')
Exception: requires() can not return Target objects. Wrap it in an ExternalTask class
INFO: Worker Worker(salt=6506578324, workers=4, host=tunapc, username=tuna, pid=10710) was stopped. Shutting down Keep-Alive thread
ERROR: Uncaught exception in luigi
Traceback (most recent call last):
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/retcodes.py", line 75, in run_with_retcodes
worker = luigi.interface._run(argv).worker
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/interface.py", line 213, in _run
return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory)
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/interface.py", line 171, in _schedule_and_run
success &= worker.add(t, env_params.parallel_scheduling, env_params.parallel_scheduling_processes)
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 794, in add
for next in self._add(item, is_complete):
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 892, in _add
self._validate_dependency(d)
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 917, in _validate_dependency
raise Exception('requires() can not return Target objects. Wrap it in an ExternalTask class')
Exception: requires() can not return Target objects. Wrap it in an ExternalTask class
You are getting the error in the first block because
luigi.local_targetis a module whileluigi.LocalTargetis the class you were looking for.The second error is because you most likely don't want to be using
self.requiresdirectly inPrePData.run, but instead want to useself.input()(take a look at https://luigi.readthedocs.io/en/stable/tasks.html#task-run).self.input()will return the outputs of the required task, which in this case isDownloadData.Finally, there are a couple optimizations you can make to your code:
LocalTargetis specified as an output, it's mere existence signifies that the task is complete. This is actually the default implementation ofTask.complete, so you don't need to reimplement it yourself.RunAllTasks. Luigi will automatically discover required tasks and construct the requirements tree before resolving the entire tree. Therefore, you only need to specify the top-level tasks, which in this case is just thePrePDatatask.