AudienceProject makes daily identity-based predictions on a scale of hundreds of millions. Our previous prediction pipeline was designed more than five years ago. The quality of the predictions, the fast availability of data and the competitive costs of running it has allowed us to be first-in-class when it came to delivering audience segments to our customers. However, current and future developments in the AdTech domain represent challenges that have to be tackled differently than in the past.

Our new range of models offers both increased overall accuracy and high reach. This is achieved by training against high dimensionality data that capture relationships between the different features. For this, deep learning models with increased capacity had to be used, as shallow models were unable to fully utilise the data.

Using deep learning models gives great results when used correctly, but the unfortunate side effect is that they are more expensive to use than shallow models. This is true, especially at inference time.

Our next task was simple to formulate, create a new Machine Learning pipeline that leverages the new models while at the same time keeps costs down.

Pipeline requirements

The new pipeline has some hard operational requirements that need to be fulfilled and some variable performance indicators that need to be satisfied to the largest extent possible and ideally improved all the time. Commercial requirements, such as high reach, are fulfilled in the fitting step, not outlined in this article.

Reproducibility

This is the single most important requirement that absolutely needs to be satisfied all the time. The output of running inference against a pair of model version and data instance needs to be the same every time. That is, the result of running prediction needs to be deterministic when the same data instance and model are used.

What this means in practice is that for a known subset of data used in testing the model (the 3rd slice of the train/validation/test split) the result of the inference is always the same. That means that even when the model is wrong, it needs to be wrong about the same prediction all the time. The same goes for when the prediction is correct.

Our models do not contain any randomness when loaded for prediction so pinning random seed is not necessary. What we need to do is simply run a test for every new model about to be published.

The fulfilment of this requirement is important to us because it guarantees that no mistakes have been made, the quality of the output is controlled and it allows us to audit our pipeline easier.

Availability of new data

This is also considered to be a hard requirement. When under normal operations, new predictions must be available for all models every day. What that means is that we need to go through all the billions of data samples for all of the hundreds of models in less than 24 hours. In practice, new predictions are available in just a few hours. Lowering the total pipeline processing time is one of the performance metrics we continuously improve.

Costs

Running Machine Learning pipelines based on deep learning models is known to be resource-demanding. With that in mind, our design choices had to focus on reducing overhead and redundant operations. More specifically, the inference is run in a multi-model scenario where many inferences are executed at the same time against a batch of data.

Software Stack

Our pipeline runs in Amazon Web Services and is deployed via CloudFormation. Each individual part is its own CloudFormation sub-stack. In turn, all sub-stacks are orchestrated with Step Function Workflows. This allows us to easily incorporate disaster recovery procedures.

All our data processing is running in Elastic Map Reduce clusters and is using Apache Spark. For storing data we use Simple Storage Service. Models and a lot of the array manipulations rely on Apache MXNet. The codebase is written entirely in Scala.

Artifacts Metadata Library

One thing that was difficult to manage by simple naming convention was artifact handling. Especially with machine learning models that have numerous parameters, it is important that proper auditing tools are in place. For this reason, we have created the Artifacts Metadata Library. This helps us with the following:

  • Lineage dependencies: An artifact can depend on another artifact and so on. By using the library we can trace back all of the dependencies involved in the creation of said artifact. One can think of this the same way transient dependencies work in Maven or SBT.
  • Auditing: We can go back and inquire about the specific parameters used when building different artifacts, be those models or datasets.
  • Reproducibility: By being able to follow back all steps involved in the creation of various resources, coupled with the functional behaviour of the processes that were used to generate them, we ensure reproducibility. The only caveat here is artifacts that contain stochastic elements, like the deep learning models.

Our library is a light version of Apache MLflow, tailored for our use cases and stripped down of all unnecessary features.

Data Processing

Data processing consists of two steps before the actual inference.

Batch Data Processing

The batch operations job selects just the necessary amount of data to be used further in the featurization job. For example, it selects the last maximum n significant data points for each identifier to be predicted. This job is also responsible for doing the basic cleaning of the data.

To optimise this process, each individual batch is built on the previous’s day batch. A sliding window of m days ensures that data does not accumulate indefinitely and that this incremental process only grows as little as possible. For example, an identifier with no new data points today will be directly copied from yesterday’s batch data if its last data point has been recorded in the last m days.

Featurization

In the featurization job, we further clean the data and join it with precomputed high dimensionality features. The pre-computed features are numerical representations of the verbatim features, that are computed at rare intervals of time (every few weeks or months).

The output of this job is represented by data samples where each individual feature has a single or multi-dimensional numerical representation.

For the array manipulations, we use a lot of Apache Spark’s built-in functions. However, not all operations can be completed with just Spark. For example, to simplify further loading and deserializing of data, we flatten multidimensional arrays and further transpose them. This allows us to do as little work as possible in the next phase, which is the actual inference, therefore reducing redundancy. Here is an example of a Scala Spark UDF that leverages the capabilities of Apache MXNet to transpose a flattened array:

// Given a flat array  representing a bidimensional array with shape ,
// return its transpose as a flat array
val transpose = udf((arr: Seq[Double], shape: Seq[Int]) => {
import org.apache.mxnet.{NDArray, Shape}
NDArray.array(arr.map(_.toFloat).toArray, Shape(shape)).T.toArray
})

Apache MXNet’s Scala library is a key component that allows us to achieve higher performance by reducing serialization which would happen when using typical Python-based libraries, that are arguably more popular but do not work that well at scale.

The difference between the batch output and the featurization output is that featurized data can only be used against a given set of models. By contrast, the batch output can be used for generating any number of featurizations. The reason is that the multidimensional features are regularly computed along with the models and only work with those respective ones.

Distributed Inference

The core of the pipeline is represented by the actual inference. Featurized data is used against a number of MXNet based models to output softmax predictions. Typical pipelines involve running a single model against a dataset, but we have determined that due to the large size of an individual sample, it is more efficient and cheaper to predict against several models with one pass over the data. That is why we can have batches of tens of models that run at once on a given dataset.

Fault Tolerance and Disaster Recovery

We generally use Elastic Compute Spot instances to do our data processing and the rare failures are fine because the instances are cheap and the processes fast. This is not the case with the inference job because here we use expensive instances that are likely to be terminated prematurely and the process is not so fast. That puts us in a vulnerable position from two perspectives. First, it would take a lot more time to rerun a job and we would therefore possibly not be able to fulfil the 24 hours availability requirements. Secondly, a failed run would mean money is wasted. Even more, the lengthier or more resource-intensive a job is, the more likely it is to be terminated prematurely.

To mitigate this issue of incomplete jobs, we have implemented a retry mechanism. This is a normal and expected response to failure. However, this alone would mean that the jobs need to run from scratch every time. This in turn would increase the costs without any benefits, due to previously wasted resources.

To make sure we do not waste the processing power we have paid for already, on subsequent retries, we only go through the data partitions we have not processed already. To be able to determine which data needs to be processed and which not, we use deterministic partitioning in a predefined number of partitions. To be more precise, we first map unique identifiers to buckets(partitions). In other words, a data sample carries its partition identifier with it.

Here is the Spark SQL code for generating the partition id out of a string identifier:

AGGREGATE(SPLIT(identifier, ''), 0L, (acc, x) -> acc + ASCII(x) * ASCII(x)) % num_partitions

Secondly, we eagerly persist predictions after each partition has been processed. This means that we do not wait for the full dataset to be predicted before we start persisting, which would be the normal and intuitive way to do this.
During retry runs, we check to see if data has already been persisted, and if so, which exact partitions have. We then exclude these partitions from subsequent processing.

By using this disaster recovery procedure, we exploit the low pricing of EC2 instances while incurring minimum overhead originating from premature terminations.

Artifact Size and Models Distributions

The Apache MXNet Scala plugin is quite big, more than 60Mb. To avoid packaging it in our main artifact, we launch the Spark jobs like this:--packages org.apache.mxnet:mxnet-full_2.11-linux-x86_64-cpu:1.5.1. This tells Spark to make sure the specified Maven jar is available on all the executors as required.

The MXNet models are in turn distributed using a similar approach. They are initially downloaded from S3 and then packed into a single archive. By using --files /home/hadoop/models.tar.gz when submitting the job, we make sure that all executors have access to all the models since a partition on an executor needs to be run against all the models.

Spark Tuning

Memory is an expensive asset when running Spark jobs, especially if they are coupled with Deep Learning models. To make sure jobs finish and are resilient we have made some modifications to the Spark configuration. It is also worth mentioning that the Spark distribution running on top of EMR is not the vanilla one from open-source Apache, but is optimized.

To stop repeated failures from killing the job, we have increased the spark.task.maxFailures. Tasks will eventually run, but because of the memory volatility, sometimes it is difficult to get through.

The spark.memory.fraction was set to .8 to allow as much memory as possible to be used by the Spark process itself. This sacrifices the memory that can be potentially used for caching, but the nature of the job does not require much of that. In fact, data processing is minimal for the inference job. We use Spark as a glorified task orchestration job.

Because we do inference, we don’t need to allocate CPUs to Spark, what we want is for the MXNet to use as much processing power as possible. Therefore we set spark.executor.cores to 1. We also allocate memory so that we only have two executors per machine using spark.executor.memory and spark.executor.memoryOverhead.

The default Garbage Collection settings in Spark are not ideal, we modify these so as to avoid a full GC by setting spark.executor.extraJavaOptions to -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=25.

Lastly, we keep in check the number of partitions so that not too much work gets done at any given point by executors. The overhead from this is minimal, and we also get the added benefit of not losing too much data in the case of premature spot instance termination.

Bucketing

The last piece of the pipeline is the Bucketing job. This uses precomputed softmax weights or thresholds to separate predictions into buckets of different qualities. These weights are individually tied to the models used, with the help of the Metadata Artifacts library.

Once the data has been persisted on S3, our AudienceHub solution takes care of distribution.

Putting it all together

The diagram shows how all the above-described components connect to one another. It’s important to see how there is no direct communication between any two processing elements. Each process consumes input data and outputs data, independent of other upstream or downstream processes.

A lot of the coordination and setting up of parameters for the Spark jobs is ensured via the Artifacts Metadata Library. It is also this way that we monitor the quality of the models and of the outputted data.

As mentioned at the beginning of the article, the individual processes are part of CloudFormation stacks. It is also worth mentioning that each stack has a Step Functions workflow inside it, which is responsible for running the jobs and for disaster recovery. The workflows are triggered via Simple Notification Service messages consumed by Lambda functions. This set-up allows us to easily develop and debug individual components and also to launch out of order processes on-demand.

Pegasus’ is very cost-effective, but we are continuously working on optimizing it with a combination of reduced redundancy and AWS specific costs-saving actions.