The goal is to catch an event when a Luigi task had failed dependencies and do some cleanup.
I have a boilerplate code where I raise an Exception when a tmp-file is being created. As a result of this we get the event 1 had failed dependencies
. Having had on_failure
function for ProcessData
I expected that data.txt
and data_tmp.txt
have to be removed. But on_failure
function is not triggered and the file data.txt
remains in a folder. How can I handle the event had failed dependencies
properly?
Code:
import luigi
import os
import subprocess
class GenerateData(luigi.Task):
output_path = luigi.Parameter(default='data.txt')
def output(self):
return luigi.LocalTarget(self.output_path)
def run(self):
with self.output().open('w') as f:
for i in range(10):
f.write("{}n".format(i))
class GenerateTmpData(luigi.Task):
output_path = luigi.Parameter(default='data_tmp.txt')
def output(self):
return luigi.LocalTarget(self.output_path)
def run(self):
raise BaseException("Simulated error in GenerateTmpData")
with self.output().open('w') as f:
for i in range(10):
f.write("{}n".format(i))
class ProcessData(luigi.Task):
input_path = luigi.Parameter(default='data.txt')
input_tmp_path = luigi.Parameter(default='data_tmp.txt')
output_path = luigi.Parameter(default='processed_data.txt')
def requires(self):
yield GenerateTmpData(output_path=self.input_tmp_path)
yield GenerateData(output_path=self.input_path)
def output(self):
return luigi.LocalTarget(self.output_path)
def run(self):
with self.input()[1].open('r') as infile, self.output().open('w') as outfile:
for line in infile:
number = int(line.strip())
processed_number = number * 2
outfile.write("{}n".format(processed_number))
def on_failure(self, exception):
for item in self.input():
if os.path.exists(item.path):
os.remove(item.path)
super(ProcessData, self).on_failure(exception)
def run_luigi_task():
command = (
'python -m luigi '
'--module test ProcessData --local-scheduler '
)
subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, executable='/bin/bash')
if __name__ == '__main__':
run_luigi_task()
Run as:
python test.py
Output:
DEBUG: Checking if ProcessData(input_path=data.txt, input_tmp_path=data_tmp.txt, output_path=processed_data.txt) is complete
DEBUG: Checking if GenerateTmpData(output_path=data_tmp.txt) is complete
DEBUG: Checking if GenerateData(output_path=data.txt) is complete
INFO: Informed scheduler that task ProcessData_data_txt_data_tmp_txt_processed_data_t_ec6640af99 has status PENDING
INFO: Informed scheduler that task GenerateData_data_txt_1287d398ef has status PENDING
INFO: Informed scheduler that task GenerateTmpData_data_tmp_txt_941f24e4a7 has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 763] Worker Worker(salt=208102894, workers=1, host=8b0c89a2a184, username=root, pid=763) running GenerateData(output_path=data.txt)
INFO: [pid 763] Worker Worker(salt=208102894, workers=1, host=8b0c89a2a184, username=root, pid=763) done GenerateData(output_path=data.txt)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task GenerateData_data_txt_1287d398ef has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 763] Worker Worker(salt=208102894, workers=1, host=8b0c89a2a184, username=root, pid=763) running GenerateTmpData(output_path=data_tmp.txt)
ERROR: [pid 763] Worker Worker(salt=208102894, workers=1, host=8b0c89a2a184, username=root, pid=763) failed GenerateTmpData(output_path=data_tmp.txt)
Traceback (most recent call last):
File "/app/env/lib/python3.5/site-packages/luigi/worker.py", line 199, in run
new_deps = self._run_get_new_deps()
File "/app/env/lib/python3.5/site-packages/luigi/worker.py", line 141, in _run_get_new_deps
task_gen = self.task.run()
File "test.py", line 25, in run
raise BaseException("Simulated error in GenerateTmpData")
BaseException: Simulated error in GenerateTmpData
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task GenerateTmpData_data_tmp_txt_941f24e4a7 has status FAILED
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers
DEBUG: There are 2 pending tasks unique to this worker
DEBUG: There are 2 pending tasks last scheduled by this worker
INFO: Worker Worker(salt=208102894, workers=1, host=8b0c89a2a184, username=root, pid=763) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 3 tasks of which:
* 1 ran successfully:
- 1 GenerateData(output_path=data.txt)
* 1 failed:
- 1 GenerateTmpData(output_path=data_tmp.txt)
* 1 were left pending, among these:
* 1 had failed dependencies:
- 1 ProcessData(input_path=data.txt, input_tmp_path=data_tmp.txt, output_path=processed_data.txt)
This progress looks :( because there were failed tasks
===== Luigi Execution Summary =====