## Parallel Jobs in Luigi

### Parallel Execution Options

Distributing jobs over multiple cores/machines is a commonly encountered need when working with even moderately large datasets. In fact, the whole big data ecosystem sprouted around this use case.

If we don’t need the power of a distributing a job over multiple machines and can instead get away with simply using multiple cores on one machine, then we can use something like Python’s multithreading and multiprocessing modules. Both of these modules require a fair amount of boilerplate code to even do simple things. As an alternative, below I show examples in two tools that are relatively new to me: Luigi and GNU Parallel.

A simple use case might be that we have hundreds of log files and we need to map and filter out only lines of the log files that meet a certain criteria. If you need to do something more versatile than maps and filters, or you want to have a degree of error recovery for subsets of the job that might go bad, GNU Parallel is not the best option. Instead Hadoop or Spark are the tools to use.

### Example

For fun, let’s code up a simple example in each of these methods. As a toy problem, I made a small script that creates multiple files of jsons that contain two fields of random numbers:

I want to find all rows where the sum of index and total is greater than 1.99. The following bit of python code (json_mapper.py) takes input on stdin and prints the jsons that pass our criteria to stdout.

If I run this code over 4 million lines of json, it takes 19s to execute.

### GNU Parallel

A use case of GNU Parallel is to process a large file or many files with a mapper that takes a line in and returns a value that only depends on that line. GNU Parallel spins up an instance of the mapper per core and takes care of partitioning the input file(s) across the jobs. We can also provide GNU Parallel with a list of remote machines and GNU Parallel will divvy out the jobs across the machines.

My Macbook has 4 cores and this operation of filtering json rows is well-suited to GNU Parallel, so let’s see how fast it runs.

It is only slightly faster, which is a bit surprising. For bigger jobs, I have found that the speed increases linearly with the number of cores. This is really useful on larger machines like EC2 8xlarge instances. Below is a screenshot of htop for a big job running on 32 cores.

### JQ

A year ago I was on a command-line kick and a friend suggested that I give jq a try. It is an analog to sed, awk, and grep, but is made specifically for processing and manipulating json files. Like those other GNU Linux CLI tools, jq has its own quirky syntax, that is initially intimidating, but is not hard to get the hang of. Specifically, we can replicate the functionality of out map_line function with the following bit of jq that we keep in a file called map.jq.

Now, we can call jq directly with the following.

Or we can pass the jq command to GNU Parallel.

### PySpark

PySpark’s API is really nice which makes running this job in PySpark easy.

Running the job is easy, but fairly slow since we have to wait for Spark to start the JVM. Like GNU Parallel, the advantages in speed for PySpark aren’t apparent until we have much more data or we are trying to scale the job across a cluster of machines.

We can see the progress in Spark’s web interface at http://0.0.0.0:4040. It looks something like this:

### Multiple Workers in Luigi

Luigi is a ETL pipeline scheduler that Spotify open sourced in 2012. Parallel execution is not Luigi’s main use case, but it is a nice feature that I thought I’d highlight here today. Below is a bit of code that defines two Luigi tasks. The TestTask class executes the mapper for one file and the LotsOTasks class loops through all 4 files and runs TestTask for each file.

To run these tasks, we can either use the local-scheduler option, or pass them to the luigid server daemon.

#### Local Scheduler

To run the Luigi using the local scheduler, execute the following at the command line. LotsOTasks will run each job serially so we’d expect the runtime to be similar to the 19 second json_mapper.py runtime.

#### Luigi Server

Running tasks in local scheduler mode is easy, but for more complicated workflows, you might be better off submitting the task to the Luigi server. To start the server, run luigid. Once that executes, you will be able to see the task queue in your browser by going to http://0.0.0.0:8082. The queue will be empty until you submit a task by running something like the following code.

By not specifying --local-scheduler, Luigi reverts to the default behavior of submitting tasks to the task server. Also, by specifying --workers 4, we tell luigi to execute 4 tasks in parallel, which makes the overall job finish much faster. Below is a screenshot of the Luigi server UI.

Luigi has lots of interfaces to other modules. The one I most commonly use is Hadoop. The paradigm is that you subclass your task from luigi.contrib.hadoop.JobTask and create requires, output, mapper, and (optionally) reducer methods in your class. Luigi then packages up your mapper and reducer and passes them to the Hadoop Streaming binary. If we implement our mapper example in Luigi Hadoop, it looks like the following.

We also have to make a client.cfg file that tells Luigi where the Hadoop binary is. This looks like:

With that, we can pass the task to Luigi.

You can see that a few things happen. First, Luigi checks to see if the output from each of the required jobs is already complete. If you get an error at this stage, it is probably because Hadoop is not properly installed. An easy way to test this is to see if hadoop -fs ls data lists the files in the data folder. If that returns an error, then you need to hunt down how to install Hadoop.

Next, Luigi shows us the command it uses to call Hadoop Streaming, i.e. hadoop jar .... Finally, after a bunch of info lines I don’t show here, Luigi tells us that our job is done. It took 35 seconds.

On AWS EMR, Hadoop is already provisioned so Luigi can easily be used distribute a job over a cluster. In doing that, the only change that needs to be made is that the file system should be s3://. For instance, the output target might be s3://bucket/out_data/ instead of the local path I show above.

### PySpark in Luigi

Lastly, we can do the same sort of thing with a PySpark job. The code and execution code look like the following two code blocks.

### Code

All of the code from this post is availble on GitHub here. You can run all variations of the mapper by calling parallel_demo.sh from the parallel folder in that repo.