Parallel Execution Options
Distributing jobs over multiple cores/machines is a commonly encountered need when working with even moderately large datasets. In fact, the whole big data ecosystem sprouted around this use case.
If we don’t need the power of a distributing a job over multiple machines and can instead get away with simply using multiple cores on one machine, then we can use something like Python’s multithreading and multiprocessing modules. Both of these modules require a fair amount of boilerplate code to even do simple things. As an alternative, below I show examples in two tools that are relatively new to me: Luigi and GNU Parallel.
A simple use case might be that we have hundreds of log files and we need to map and filter out only lines of the log files that meet a certain criteria. If you need to do something more versatile than maps and filters, or you want to have a degree of error recovery for subsets of the job that might go bad, GNU Parallel is not the best option. Instead Hadoop or Spark are the tools to use.
For fun, let’s code up a simple example in each of these methods. As a toy problem, I made a small script that creates multiple files of jsons that contain two fields of random numbers:
I want to find all rows where the sum of
total is greater than
1.99. The following bit of python code (
json_mapper.py) takes input on stdin and prints the jsons that pass our criteria to stdout.
If I run this code over 4 million lines of json, it takes 19s to execute.
A use case of GNU Parallel is to process a large file or many files with a mapper that takes a line in and returns a value that only depends on that line. GNU Parallel spins up an instance of the mapper per core and takes care of partitioning the input file(s) across the jobs. We can also provide GNU Parallel with a list of remote machines and GNU Parallel will divvy out the jobs across the machines.
My Macbook has 4 cores and this operation of filtering json rows is well-suited to GNU Parallel, so let’s see how fast it runs.
It is only slightly faster, which is a bit surprising. For bigger jobs, I have found that the speed increases linearly with the number of cores. This is really useful on larger machines like EC2 8xlarge instances. Below is a screenshot of
htop for a big job running on 32 cores.
A year ago I was on a command-line kick and a friend suggested that I give jq a try. It is an analog to
grep, but is made specifically for processing and manipulating json files. Like those other GNU Linux CLI tools, jq has its own quirky syntax, that is initially intimidating, but is not hard to get the hang of. Specifically, we can replicate the functionality of out
map_line function with the following bit of jq that we keep in a file called
Now, we can call jq directly with the following.
Or we can pass the jq command to GNU Parallel.
PySpark’s API is really nice which makes running this job in PySpark easy.
Running the job is easy, but fairly slow since we have to wait for Spark to start the JVM. Like GNU Parallel, the advantages in speed for PySpark aren’t apparent until we have much more data or we are trying to scale the job across a cluster of machines.
We can see the progress in Spark’s web interface at
http://0.0.0.0:4040. It looks something like this:
Multiple Workers in Luigi
Luigi is a ETL pipeline scheduler that Spotify open sourced in 2012. Parallel execution is not Luigi’s main use case, but it is a nice feature that I thought I’d highlight here today. Below is a bit of code that defines two Luigi tasks. The
TestTask class executes the mapper for one file and the
LotsOTasks class loops through all 4 files and runs
TestTask for each file.
To run these tasks, we can either use the
local-scheduler option, or pass them to the
luigid server daemon.
To run the Luigi using the local scheduler, execute the following at the command line.
LotsOTasks will run each job serially so we’d expect the runtime to be similar to the 19 second
Running tasks in local scheduler mode is easy, but for more complicated workflows, you might be better off submitting the task to the Luigi server. To start the server, run
luigid. Once that executes, you will be able to see the task queue in your browser by going to
http://0.0.0.0:8082. The queue will be empty until you submit a task by running something like the following code.
By not specifying
--local-scheduler, Luigi reverts to the default behavior of submitting tasks to the task server. Also, by specifying
--workers 4, we tell luigi to execute 4 tasks in parallel, which makes the overall job finish much faster. Below is a screenshot of the Luigi server UI.
Hadoop Streaming in Luigi
Luigi has lots of interfaces to other modules. The one I most commonly use is Hadoop. The paradigm is that you subclass your task from
luigi.contrib.hadoop.JobTask and create
mapper, and (optionally)
reducer methods in your class. Luigi then packages up your mapper and reducer and passes them to the Hadoop Streaming binary. If we implement our mapper example in Luigi Hadoop, it looks like the following.
We also have to make a
client.cfg file that tells Luigi where the Hadoop binary is. This looks like:
With that, we can pass the task to Luigi.
You can see that a few things happen. First, Luigi checks to see if the
output from each of the required jobs is already complete. If you get an error at this stage, it is probably because Hadoop is not properly installed. An easy way to test this is to see if
hadoop -fs ls data lists the files in the data folder. If that returns an error, then you need to hunt down how to install Hadoop.
Next, Luigi shows us the command it uses to call Hadoop Streaming, i.e.
hadoop jar .... Finally, after a bunch of info lines I don’t show here, Luigi tells us that our job is done. It took 35 seconds.
On AWS EMR, Hadoop is already provisioned so Luigi can easily be used distribute a job over a cluster. In doing that, the only change that needs to be made is that the file system should be
s3://. For instance, the output target might be
s3://bucket/out_data/ instead of the local path I show above.
PySpark in Luigi
Lastly, we can do the same sort of thing with a PySpark job. The code and execution code look like the following two code blocks.
All of the code from this post is availble on GitHub here. You can run all variations of the mapper by calling
parallel_demo.sh from the
parallel folder in that repo.