Automating workflows using the luigi batch workflow system
Luigi is a batch workflow system written in Python and developed at Spotify. Luigi is one of a few batch workflow systems that supports running both normal command line jobs and hadoop jobs in the same.
Installing Luigi
- First, follow this guide to get a custom python installation set up:
How to install your own Python modules or specific Python version - Install tornado and docopt:
pip install tornado
pip install docopt - Install luigi
pip install luigi - Re-load our ~/.bash_profile file, to make sure the right version of all binaries are used:
source ~/.bash_profile - Done!
Defining a workflow
Luigi workflows consists of tasks, that are connected together into a dependency graph, that specifies which tasks have to be run in order for another task to be run, etc.
A task then, is defined by some action that it will execute, and the dependencies between different tasks. Technically, a task is implemented as a python class, as a subclass of the luigi.Task() meta class. The tasks action is is to be implemented in the task’s run() method.
The dependencies on the other hand, can be specified in at least two different ways. The default, and initially simpler, way, is to hard-code the upstream dependency of every task inside every task. Technically this is done in the requires() function inside a task class.
This can also be done in a more dynamic and flexible way though, by sending the upstream object as a parameter to the class when it is initialized. This is beneficial if you want to re-use tasks within other workflows, or somewhere else within your current workflow.
Defining dependencies statically inside task classes
Defining dependencies in this way is already covered in the luigi documentation, so we will not go into lengths about that here, but basically, you will implement the requires() function, and let it return an instantiated object of the upstream class, something like:
class SomeTask(luigi.Task): ... def requires(self. some_parameter): # This will return an instantiated task object # of the class "SomeUpstreamTask" return SomeUpstreamTask() ...
Defining dependencies dynamically from a script
Dependencies between tasks can also be defined in a more dynamic way, from outside of the task classes themselves, by instantiating all the tasks and “injecting” upstream task objects into downstream task objects, upon it’s creation inside the external script. We will look closer at how this looks in the script further below, but first let’s have a look at how it looks inside a task class:
class SomeTask(luigi.Task):
upstream_task = luigi.Parameter()
...
def requires(self):
# This will return the upstream task object
return self.upstream_task
...
... or, with a more complete code example:
import luigi class ExampleTask(luigi.Task): upstream_task = luigi.Parameter() def requires(self): # This will return the upstream task object return self.upstream_task def output(self): # A simple file naming scheme where the input file name is used, # and just combined with an additional extension. return luigi.LocalTarget(self.input().path + ".newextension") def run(self): # Loop over the input file, do something to each line, and # write to the output file. with self.input().open() as infile, self.output().open() as outfile: for line in infile: outfile.writeline(do_something(line)) def do_something(self, line): pass # TODO: implement function
We see in the code above how the return statement returns whatever has been set in the class variable “upstream_task”. We will see below how we will use this parameter to “inject” this task object’s upstream task object.
An example (very simplified) workflow could look like this:
# Some required luigi imports import luigi import luigi.scheduler import luigi.worker # Here we are importing our own tasks, provided they are # all saved in a file or other module container, named # "components": from components import * # We store all the task in a dictionary, for easy retrieveing later ... tasks = {} tasks["some_task_a"] = SomeTaskA() tasks["some_task_b"] = SomeTaskB(upstream_task=tasks["some_task_a"]) tasks["some_task_c"] = SomeTaskC(upstream_task=tasks["some_task_b"]) # We decide to run the last one (task C) of our tasks, so that the whole # workflow is "exercised" ... the_task_to_run = tasks["some_task_c"] # Build (and run) the workflow luigi.build(the_task_to_run, workers=16, local_scheduler=True)
Wrapping workflows in luigi tasks
A powerful way to work with the dynamic sort of workflows displayed above, is to wrap the dynamic workflow definition into a containing task, so that you only need to execute that task in order to execute the whole workflow!
This also makes it possible to re-use Luigis powerful automatic command line API, rather than coding one's own command line parsing.
Wrapping the above workflow in a workflow container task could look like this, in code:
import luigi import luigi.scheduler import luigi.worker from components import * class MyWorkflow(luigi.Task): ''' An example luigi workflow task, which runs a whole workflow of other luigi tasks when executed. ''' # Here we define the whole workflow in the requires function def requires(self): task_a = TaskA() task_b = TaskB(upstream_task=task_a) task_c = TaskC(upstream_task=task_b) return task_c # Define a simple marker file to show that the workflow has completed def output(self): return luigi.LocalTarget(self.input().path + '.workflow_complete') # Just write some text to the marker file def run(self): with self.output().open('w') as outfile: outfile.write('Workflow complete!') # If this file is executed as a script, let luigi take care of it: if __name__ == '__main__': luigi.run()