Automating workflows using the luigi batch workflow system

Luigi is a batch workflow system written in Python and developed at Spotify. Luigi is one of a few batch workflow systems that supports running both normal command line jobs and hadoop jobs in the same.

Installing Luigi

For installing luigi, you need a custom python installation, so that you can easiliy install any python packages you want, and then (for this guide) you need to install two python packages: tornado, and docopt (you can do without them, but it is highly recommended to install them). The steps below will guide you through this process:
  1. First, follow this guide to get a custom python installation set up:
    How to install your own Python modules or specific Python version
  2. Install tornado and docopt:
    pip install tornado
    pip install docopt
  3. Install luigi
    pip install luigi
  4. Re-load our ~/.bash_profile file, to make sure the right version of all binaries are used:
    source ~/.bash_profile
  5. Done!

Defining a workflow

Luigi workflows consists of tasks, that are connected together into a dependency graph, that specifies which tasks have to be run in order for another task to be run, etc.

A task then, is defined by some action that it will execute, and the dependencies between different tasks. Technically, a task is implemented as a python class, as a subclass of the luigi.Task() meta class. The tasks action is is to be implemented in the task’s run() method.

The dependencies on the other hand, can be specified in at least two different ways. The default, and initially simpler, way, is to hard-code the upstream dependency of every task inside every task. Technically this is done in the requires() function inside a task class.

This can also be done in a more dynamic and flexible way though, by sending the upstream object as a parameter to the class when it is initialized. This is beneficial if you want to re-use tasks within other workflows, or somewhere else within your current workflow.

Defining dependencies statically inside task classes

Defining dependencies in this way is already covered in the luigi documentation, so we will not go into lengths about that here, but basically, you will implement the requires() function, and let it return an instantiated object of the upstream class, something like:

class SomeTask(luigi.Task):
def requires(self. some_parameter):
# This will return an instantiated task object 
# of the class "SomeUpstreamTask"
return SomeUpstreamTask()

Defining dependencies dynamically from a script

Dependencies between tasks can also be defined in a more dynamic way, from outside of the task classes themselves, by instantiating all the tasks and “injecting” upstream task objects into downstream task objects, upon it’s creation inside the external script. We will look closer at how this looks in the script further below, but first let’s have a look at how it looks inside a task class:

class SomeTask(luigi.Task):
    upstream_task = luigi.Parameter()
    def requires(self):
    # This will return the upstream task object
    return self.upstream_task

... or, with a more complete code example:

import luigi
class ExampleTask(luigi.Task):
upstream_task = luigi.Parameter()
def requires(self):
# This will return the upstream task object
return self.upstream_task
def output(self):
# A simple file naming scheme where the input file name is used,
# and just combined with an additional extension.
return luigi.LocalTarget(self.input().path + ".newextension")
def run(self):
# Loop over the input file, do something to each line, and
# write to the output file.
with self.input().open() as infile, self.output().open() as outfile:
for line in infile:
def do_something(self, line):
pass # TODO: implement function  

We see in the code above how the return statement returns whatever has been set in the class variable “upstream_task”. We will see below how we will use this parameter to “inject” this task object’s upstream task object.

An example (very simplified) workflow could look like this:

# Some required luigi imports
import luigi
import luigi.scheduler
import luigi.worker
# Here we are importing our own tasks, provided they are 
# all saved in a file or other module container, named
# "components":
from components import *
# We store all the task in a dictionary, for easy retrieveing later ...
tasks = {}
tasks["some_task_a"] = SomeTaskA()
tasks["some_task_b"] = SomeTaskB(upstream_task=tasks["some_task_a"])
tasks["some_task_c"] = SomeTaskC(upstream_task=tasks["some_task_b"])
# We decide to run the last one (task C) of our tasks, so that the whole 
# workflow is "exercised" ...
the_task_to_run = tasks["some_task_c"]
# Build (and run) the workflow, workers=16, local_scheduler=True) 

Wrapping workflows in luigi tasks

A powerful way to work with the dynamic sort of workflows displayed above, is to wrap the dynamic workflow definition into a containing task, so that you only need to execute that task in order to execute the whole workflow!

This also makes it possible to re-use Luigis powerful automatic command line API, rather than coding one's own command line parsing.

Wrapping the above workflow in a workflow container task could look like this, in code:

import luigi
import luigi.scheduler
import luigi.worker
from components import *
class MyWorkflow(luigi.Task):
An example luigi workflow task, which runs a whole workflow of other luigi
tasks when executed.
# Here we define the whole workflow in the requires function
def requires(self):
task_a = TaskA()
task_b = TaskB(upstream_task=task_a)
task_c = TaskC(upstream_task=task_b)
return task_c
# Define a simple marker file to show that the workflow has completed
def output(self):
return luigi.LocalTarget(self.input().path + '.workflow_complete')
# Just write some text to the marker file
def run(self):
with self.output().open('w') as outfile:
outfile.write('Workflow complete!')
# If this file is executed as a script, let luigi take care of it:
if __name__ == '__main__':


External links