Scale Your Machine Learning Pipeline

How to parallelize and distribute your Python machine learning pipelines with Luigi, Docker, and Kubernetes

This article presents the easiest way to turn your machine learning application from a simple Python program into a scalable pipeline that runs on a cluster.

Check out the Github repository for ready-to-use example code.


What you will learn:
  • How to use luigi to manage tasks
  • How to easily create Command Line Interface for python script with click
  • How to run the pipeline in multiple Docker containers
  • How to deploy a small cluster on your machine
  • How to run your application on the cluster

Don’t calculate things twice – Luigify your pipeline

Some of the functions in your application may be time consuming and return huge outputs, so if your pipeline fails along the way, for any reason, it’s gonna cost you a lot of time and frustration to fix a small bug and rerun everything.

Let’s do something about it.

Suppose your pipeline needs to do the following things:

  1. fetch the data for the last 10 days;
  2. transform the data;
  3. make predictions with two different models.

You could code the workflow like this:

<p> CODE:</p>

But this code is quite prone to errors, such as may occur while downloading the data – one network error, and all the work you’ve done is lost. Moreover, if you download data for the last ten days today and you’re planning to run the same pipeline again tomorrow, it doesn’t make much sense to download 90% of necessary data all over again.

So how can you avoid doing the same thing twice?

Sure, you can come up with ideas on how to save and reuse intermediate results, but there’s no need for you to code it yourself.

I recommend using the luigi package. It lets you easily divide your code into separate data-processing units – called tasks – each with concrete requirements and output.

One of your tasks could look like this:

<p> CODE:</p>

From this snippet, we can see that:

  • The name of the task is TransformData;
  • The task has one parameter, namely date;
  • It depends on ten tasks from the FetchData class, one for each of the ten previous days;
  • It saves its output in a CSV file named after the ‘date’ parameter.

I’ve given a complete example of a dummy pipeline below. Take a moment to analyse how the tasks’ dependencies create a logical pipeline:

<p> CODE:</p>

Now, when you try to run the ‘MakePredictions’ task, Luigi will make sure all the upstream tasks run beforehand. Try installing Luigi with pip install luigi, save the above example in, and run this command:

<p> CODE:</p>

Furthermore, Luigi won’t run any task when its output is already present. Try running the same command again – Luigi will report that ‘MakePredictions’ for a given date has already been done.

Here you can find another good example that will help you get started with Luigi.

Parallelism for free – Luigi workers

Can I run multiple tasks at the same time?

Yes, you can! Luigi provides this functionality out of the box. Just by adding --workers 4 to the command, for example, you’re letting Luigi run four tasks simultaneously.

Let’s use this opportunity to present Luigi’s graphical interface.

Open a second terminal and run the following command:

<p> CODE:</p>

This will start a so-called central Luigi scheduler that listens on a default port 8082. You can check its pretty dashboard on your browser at: http://localhost:8082.

Now go back to the first terminal – you can run the Luigi command again, this time without the --local-scheduler argument (don’t forget to delete the files you’ve already created or choose another date if you want to see the tasks executing). If you want parallelism, add --workers 4 to the command. After refreshing the dashboard page, you should see a list of scheduled tasks. Click on the tree icon next to the MakePredictions task to see all its dependencies (Isn’t it pretty?):

Parallelism at scale – Moving to a cluster

Now we’re getting serious – if one machine is not enough for you to run your tasks in parallel, you can take your pipeline to the next level and deploy it on a cluster. I’ll walk you through all the necessary steps.

Share files between machines – Use AWS S3

In the previous example, all the files were saved locally on the same machine the tasks were executed on.

So how can I share files between multiple machines in the cluster?

There are many answers to this question, but we’ll focus on one of the possible ways – using Amazon’s S3.

AWS S3 is a Simple Storage Service. It lets you store your files in the cloud. Instead of /home/user/data/file.csv, you save your file under s3://bucket/data/file.csv. Python provides tools that make it easy to switch from local storage to S3.

Info: For this simplicity of this tutorial, if you want to follow the instructions below, I need you to set up a free AWS trial account you’ll use for storing your files. You can do it here and it’s completely free of charge for one year. You can opt out after this period if you don’t need it anymore.

After creating the account, go here and create a bucket. Think of a bucket as a partition on a hard drive.

To read and write data from S3, we’re gonna use luigi.contrib.s3.S3Target class. You can modify the dummy example by simply adding a proper import and replacing the LocalTarget in-task definitions as I’ve done here:

<p> CODE:</p>

You’ll also need to remove all self.output().makedirs() calls, because you don’t need to create folders on S3.

To use Luigi’s S3 functionality, you must pip install boto3.

You’ll also need to give your application credentials for S3 authentication. Let’s use the simplest approach: you can create a new Access Key on this site. You’ll get an Access Key ID and a Secret Access Key – save them in the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY, respectively.

Now your application should be able to read and write data to AWS S3. Try it out by running the pipeline again.

Containerize your pipeline to put it on a cluster

Sadly, you can’t put your Python code on a cluster and just execute it. However, you can run a certain command in a certain container.

I’ll show you how to refactor your pipeline to run each task in a separate Docker container.

Turn your tasks into mini programs – Add a simple CLI with Click

The first step towards running tasks in containers is making them runnable from the command line.

What’s the fastest way to write a CLI in Python?

Answer: Click. Click is an awesome package that makes creating command line interfaces very easy.

Let’s get back to the TransformData task example (not the dummy one). Its run() method calls two functions – namely, transform_data and save_result. Let’s say these functions are defined in the file called

<p> CODE:</p>

Now let’s enable the running of these functions from the command line:

<p> CODE:</p>

Here, we defined a function (cli) that will be called when we run this script from a command line. We specified that the first argument will be an output path, and all further arguments will make up a tuple of input paths. After running ‘pip install click’, we can invoke data transformation from the command line:

<p> CODE:</p>

For convenience, let’s call our project tac. If you add to your project and pip install it (take a look at example project to see how a project should be structured, and don’t forget to add to the package directory), you should be able to run your script with:

<p> CODE:</p>

Make things portable – Run tasks in containers

Now the question is:

How can I easily create a Docker container in which to run my command?

Well, that’s simple.

First, create a requirements.txt file in the project root dir. You’ll need the following packages:

<p> CODE:</p>

Now, create a Dockerfile in the project root dir and put this inside:

<p> CODE:</p>

Let’s break it down:

  • FROM python gives us a base image with python installed.
  • COPY requirements.txt /requirements.txt and RUN pip install -r /requirements.txt install all the required packages.
  • COPY . /tac and RUN pip install /tac install our project.
  • The last four lines will let us set AWS credentials inside the image on build time (it’s not a good practice, but let’s keep this tutorial simple).

Now you can build a Docker image containing your project by executing this from your project root dir (assuming you still have your AWS credentials in env variables):

<p> CODE:</p>

So you’ve just built a Docker image tagged tac-example:v1. Let’s see if it works:

<p> CODE:</p>

This should save a docker-output.csv file in your S3 bucket.

Talk to the cluster – Prepare your tasks to be run in Kubernetes

If you want to run all – or just some – of your pipeline tasks in a cluster, Luigi comes with a solution.

Take a look at luigi.contrib.kubernetes.KubernetesJobTask.

Long story short, Kubernetes is a system that can manage a cluster. If you want to interact with a cluster, talk to Kubernetes.

To run a piece of code in a cluster, you need to provide the following information to Kubernetes:

  • the image that should be used to create a container;
  • the name that container should be given;
  • the command that should be executed in the container.

Let’s modify our good old ‘TransformData’ task from the dummy pipeline to conform to these requirements.

  • First, change the base class to ‘KubernetesJobTask’:

<p> CODE:</p>

  • Give it a name:

<p> CODE:</p>

  • Define the command that should be run:

<p> CODE:</p>

  • Provide the information to be passed on to Kubernetes:

<p> CODE:</p>

  • And delete the run() method, since this is implemented by KubernetesJobTask.
  • Also, run pip install pykube, since it’s a requirement for KubernetesJobTask.

You should end up with something similar to what you can see in the example project.

But we can’t run it until we connect to a cluster. Keep reading.

Cluster at home – Kubernetes and Minikube
How can I run my pipeline in a cluster – without having access to a cluster?

The cool thing is, you actually can run a mini version of a real cluster on your laptop!

You can do this with Minikube. Minikube runs a single-node (single-machine) cluster inside a Virtual Machine on your computer.

Take a moment now to install Minikube. You can find instructions here. You’re gonna need all the components mentioned in these instructions.

After installation, you should be able to run

<p> CODE:</p>

to spin up your local cluster. Be patient, as this may take a while, especially when you do it for the first time. Verify that your cluster is running with

<p> CODE:</p>

You should see something similar to:

<p> CODE:</p>

If everything is okay, you should be able to access Kubernetes’ dashboard, which shows the current status of your cluster:

<p> CODE:</p>

A new browser tab will open and show you this:

Since the cluster runs in a separate (virtual) machine, it doesn’t have access to your Docker image (since you haven’t pushed it to any online registry). We’ll use a little trick to overcome this.

The following command will set your current console session to execute docker commands using not your local Docker Engine, but the cluster VM’s Docker Engine:

<p> CODE:</p>

Now all you need to do is call the ‘docker build’ command again. This time, your image will be built inside the VM:

<p> CODE:</p>

And here comes the moment of truth.

We’re gonna execute our pipeline inside the cluster we’ve just configured.

If everything went well, just calling the Luigi command should be enough. Minikube has already set the proper configuration, so KubernetesJobTask knows where the target Kubernetes is running.

So try executing this command from the directory where task-dummy lives:

<p> CODE:</p>

and watch how your TransformTask job runs in the cluster:


  • If KubernetesJobTask reports a message like this: No pod scheduled by transform-data-20180716075521-bc4f349a74f44ddf and fails to run, it’s probably a bug, and not your fault. Check the dashboard to see if the transform-data-... pod has the status Terminated:Completed. If so, then the task is actually finished and running your pipeline again should solve the problem. It’s probably Minikube’s fault.
  • Consider Google Kubernetes Engine for spinning up a real cluster.
  • When using Google’s cluster, consider switching from AWS S3 to Google Cloud Storage to significantly speed up data access. This module should be helpful.
  • Read more about speeding up your pipeline with Dask and integrating it with Kubernetes.

Energy Transmission

Anticipating and Preventing Power Grid Failure

Massive power outages cause chaos for the general public, and they cost utility providers roughly $49 billion a year.

This wouldn’t be much of a problem if massive power outages were rare, but outages affecting more than 50,000 people have increased dramatically in recent years. This means utility companies need to find new ways of anticipating and managing these outages.

These days, smart grids are producing massive amounts of data, which means predicting and managing outages is easier than ever. Unlike traditional power grids, which are one-directional (meaning they only transmit power in one direction), smart grids are two-directional. They can capture data from every possible source in the grid at the same time as they’re providing electricity. They collect and monitor data from sources like smart meters, IoT devices, and power generation stations, providing a clear, real-time look at power usage.

Machine learning can use this data to anticipate and prevent massive power outages in the grid. Machine learning helps identify non-obvious patterns in the data that can be a precursor to grid failure, which helps maintenance teams preempt failure.

Balancing the Grid

Balancing the grid — making sure energy supply matches energy demand — is one of the most important jobs a transmission operator has. But renewable energy sources depend heavily on the weather, making them harder to predict.

Transmission operators spend millions each year fixing planning mistakes that lead to producing too much or too little power. In hybrid systems — which rely on both renewable energy sources and fossil fuels to generate electricity — these mistakes have to be corrected at the last minute by buying more energy or compensating power plants for the excess.

Machine learning is the most accurate method available to forecast the output of renewable energy. Advanced methods, like Long Short-Term Neural Networks (LSTMs), can weigh the many factors involved — wind, temperature, sunlight, and humidity forecasts — and make the best predictions. This saves money for operators and preserves resources for power plants.

Preventing Blackouts and Brownouts With Real-time Monitoring and AI Prediction

Power grids have a lot of obstacles to overcome in providing continuous energy to customers. Weather patterns, usage, internal failure, even wildcard incidents like lightning strikes and interference from wild animals can all affect power delivery.

Machine learning is increasingly being used to help predict potential brownout and blackout conditions. By feeding historical data into the AI and running Monte Carlo simulations to predict potential outcomes, grid operators can use machine learning to identify conditions that could lead to grid failure. And they can act accordingly.

Sensors like phase measurement units (PMU) and smart meters can provide usage information in real-time. When combined with both historical and simulation data, AI can help mitigate potential grid failure, using techniques like grid balancing and demand response optimization. Incidents that would otherwise have affected millions of people can be contained to a smaller area and fixed faster for less money.

Differentiate Power System Disturbances from Cyber Attacks

Cyber attacks are increasingly used to target important infrastructure, like shutting down hospitals with Ransomware attacks (when attackers break into the system and lock legitimate users out until a ransom is paid). With utility grids, a cyber attack can have widespread consequences and affect millions of users.

Detecting these attacks is critical.

Developers are using machine learning to differentiate between a fault (a short-circuit, for example) or a disturbance (such as line maintenance) in the grid and an intelligent cyber attack (like a data injection).

Since deception is a huge component of these attacks, the model needs to be trained to look for suspicious activity – things like malicious code or bots – that get left behind after the deception has occurred.

One such method uses feature extraction with Symbolic Dynamic Filtering (an information theory-based pattern recognition tool) to discover causal interactions between the subsystems, without overburdening computer systems. In testing, it accurately detected 99% of cyber attacks, with a true-positive rate of 98% and a false-positive rate of less than 2%. This low false-positive rate is significant because false alarms are one of the biggest concerns in detecting cyber attacks.

Balance Supply and Demand

Utility providers are looking for ways to better predict power usage while maintaining maintaining energy supply at all times. This becomes critical when renewable power sources (like solar or wind) are introduced into the grid.

Because these renewable power sources rely on elements beyond human control (like the weather), utility providers know they can’t always rely on renewables for continuous production. Knowing precisely when demand levels will peak allows utility providers to connect to secondary power sources (like conventionally generated electricity) to bolster the available resources and ensure constant service provision.

More and more utility providers are turning to machine learning for help. We can feed historical data into machine learning algorithms -- like Support Vector Machines (SVM) -- to accurately forecast energy usage and ensure sufficient levels and constant supply.

Detect Power Grid Faults

Current methods for detecting faults in the grid consume a lot of unnecessary time and resources. This creates a situation where power transmission is interrupted and customers are without electricity while faults are first located, then fixed.  

Machine learning can find faults quickly and more accurately helping you minimize service interruption for your customers.. Support Vector Machines (SVM) are combined with Discrete Wavelet Transformation (DWT) to locate faults in the lines using a traveling wave-based location method.

When we apply  DWT (a form of numerical and functional analysis that captures both frequency and location information) to the transient voltage recorded on the transmission line, we can determine the location of the fault by calculating aerial and ground mode voltage wavelets. So far, this method has detected fault inception angles, fault locations, loading levels, and non-linear high-impedance faults for both aerial and underground transmission lines.

Detect Non-Technical Power Grid Losses

In the energy world, “non-technical losses” means energy theft or fraud from the system.

There are two common types of non-technical losses. The first is when a customer uses more energy than the meter reports. The second involves rogue connections stealing energy from paying customers. To pull off this theft or fraud, bad actors can bypass smart meters completely or insert chips into the system that change how meters track energy use. Meter readers can also be bribed to report lower numbers (though thanks to smart meters, this is increasingly hard to do).

Because these non-technical losses cost $96 billion annually, utility providers are turning to machine learning to combat the problem.

We can help utility providers mine historical customer data to discover irregularities that indicate theft or fraud. These can be things like unusual spikes in usage, differences between reported and actual usage, and even evidence of equipment tampering.

Energy Distribution

Better Predict Energy Demand

Accurately predicting customers’ energy needs is critical for any utility provider. To date, we haven’t found an adequate solution for bulk energy storage, which means energy needs to be transmitted and consumed almost as soon as it’s produced.

We're using machine learning to increase the accuracy of these predictions. Historical energy use data, weather forecasts, and the types of businesses or buildings operating on a given day all play a role in determining how much energy is used.

For example, a hot summer day mid-week means more energy usage because office buildings run air conditioning at a high capacity. Weather forecasts and historical data can help identify those patterns in time to prevent rolling blackouts caused by air conditioners in the summer.

Machine Learning finds complicated patterns in the various influencing factors (such as day, time, predicted wind and solar radiation, major sports events, past demand, mean demand, air temperature, moisture and pressure, wind direction, day of the week, etc.) to explain the development of demand. Because machine learning finds more intricate patterns, its predictions are more accurate. This means energy distributors can increase efficiency and decrease costs when they buy energy – without having to make expensive adjustments.

Energy Generation

Predict Turbine Malfunction

Wind is a great renewable energy source, but wind turbine maintenance is notoriously expensive. It accounts for up to 25% of the cost per kWh. And fixing problems after they occur can be even more expensive.

Machine learning can help you get ahead of this problem. The goal is to reduce maintenance costs by catching problems before the turbine malfunctions. This is particularly important when wind farms are located in hard-to-access places, such as the middle of the ocean, which makes repair costs even higher.

Real-time data gathered with Supervisory Control and Data Acquisition (SCADA) can help identify possible malfunctions in the system far enough in advance to prevent failure.

For example, data from sensors found within the turbines – such as oil, grease, and vibration sensors – have been used to train machine learning models to identify precursors to failure, such as low levels of lubricant.

This method can train machine learning models to predict failures up to 60 days in advance.

Consumption / Retail

Accurately Predict Energy Prices

As personal power generation (using solar or wind power) gets easier and cheaper, consumers and businesses are increasingly producing their own power.

Personal power generation allows people to make, consume, and store their own energy. Depending on where they live, they may even be able to sell surplus power back to the local power utility.

Machine learning can help find the best time to produce, store, or sell this energy. Ideally, energy should be consumed or stored when prices are low and sold back to the grid when prices are high.

By looking at historical data, usage trends, and weather forecasts, machine learning models have made accurate predictions on an hourly basis. People with personal and business energy generation systems can use these predictions to make strategic decisions about whether to use, store, or sell their energy.

For example, Adaptive Neural Fuzzy Inference System (ANFIS) has been used to predict short-term wind patterns for wind power generation. This allows producers to maximize energy production and sell it when energy prices are at their peak.

Reduce Customer Churn

In open energy markets, where customers have a choice of utility providers, understanding which customers are going to churn out can be critical. Churn rates, which is the percentage of customers who stop using your service in a year, can be as high as 25%. Being able to predict churn and stay ahead of it is essential to survival.

Machine learning is helping utility owners predict when a customer is getting ready to churn out. By using techniques such as Cross-industry Standard Process for Data Mining (CRISP-DM), AdaBoost, and Support Vector Machines, as well as historical usage data, utility providers can identify key indicators of whether or not a customer is going to churn. These indicators include things like customer satisfaction, employment status, energy consumption, home ownership or rental status. A change in any of these can indicate a customer is getting ready to terminate their service.

When these indicators are identified far enough in advance, it’s possible to avoid churn by working with customers to solve any problems they’re experiencing.

Energy Trading

Predict Energy Prices

Just like natural gas and oil, wholesale energy is a market commodity. So naturally it's important for traders to be aware of market fluctuations and pricing when it comes to buying and selling energy.

To help make sense of the massive amounts of data used to make trading decisions, traders are increasingly turning to machine learning.

A mix of statistical analysis and machine learning can help commodity traders make better predictions. Classical statistical analysis techniques like time series analysis, Seasonal Autoregressive Integrated Moving Average (SARIMA), and regression models are used to deal with the data. And machine learning makes connections between the various data points.

What’s more, machine learning trains itself to make increasingly accurate predictions using the constant flow of real-time data.

Keep reading

No items found.
No blog posts found.