In order to deliver real-time metrics to our customers and support a new range of applications, we are now using a combination of streaming and batch architecture. This enables us to deliver updates as fast as every 5 minutes and easily handle peak times of 25.000 events per second without breaking a sweat.
In the following, you can read how we did it.
Data Flow at AudienceProject
At AudienceProject we collect web access logs, which subsequently go through several regular processes such as:
- Backing up of the raw data
- Cleaning up
- Enriching with metadata
- Data processing
At the end, we generate report metrics data that we make available to our customers through AudienceReport.
On top of all of this, we have several downstream projects that depend on data produced by the pipeline at different stages.
This pipeline is the spinal cord of AudienceProject.
The Old Architecture in a Nutshell
Our previous pipeline relied on Amazon Simple Workflow to coordinate a series of batch jobs that would take care of backing up, enriching and processing the ingress traffic. Amazon SWF makes it easy to handle errors and have retry procedures run when things do not go according to plan.
The pipeline was built in such a way that it would handle even severe Amazon outages like the one on the 1st of March 2017. We rarely had to go in and manually fix issues.
However, there were two key points where the batch architecture was beginning to show its limits:
- Scaling with the increasing amount of data was proving to be more and more challenging.
- The overhead and general limitations of batch jobs did not allow us to easily deliver updates faster than 2 hours.
The Aqueduct architecture design was driven by a series of guiding principles.
The pipeline is not a monolithic project. It is made up of individual smaller components also called Processing Elements. These components are loosely coupled and are able to function completely independently of the rest of the processing elements. If fed with data they will produce output. In a sense, they are functional components.
The loose coupling allows the processing elements to be tested individually. It also allows us to gradually introduce new developers to the pipeline. Understanding the whole architecture is not a mandatory requirement. To be able to work, a developer only needs to understand what is the data input and what is the data output of a component.
Idempotence and Incremental Updates
From a data processing perspective and to be able to ensure resilience, all processing elements output the same result given the same data input. This and the incremental updates approach, allows us to have automatic recovery and increased resilience in the pipeline. A concrete case where this is needed is when we receive out of order data (a few days late).
We came to accept that failures happen and when running a real-time distributed system, correcting errors on the fly is difficult and expensive. However, our customers need accurate metrics and a core requirement of the pipeline is to deliver them, albeit not in real-time. We put a lot of emphasis on outputting metrics which are as accurate as possible until the numbers are eventually locked into place.
The system’s pipes must allow an arbitrary number of client projects to feed from the outputted data, be that data at rest or in motion. Aqueduct is not a closed project, but one that delivers value to our whole ecosystem.
We want to use this system for years to come. It must be easy to scale horizontally. Ideally, most scaling operations are elastic.
As much as possible, each Processing Element should have a single purpose. It must be simple to explain in one sentence what each component of the real-time pipeline does.
Data Ingestion and Backup
We use Amazon CloudFront to collect weblogs from users. CloudFront is a globally distributed CDN that periodically persists the weblogs files to Amazon S3 as tab-separated gzip-compressed files. In practice, logs can be persisted to S3 as fast as 3 minutes after being recorded.
When files are persisted to S3, a trigger sends a message to an Amazon SQS. The message contains details about the file.
An Amazon Beanstalk Worker application is responsible for backing up the logs files. Amazon provides the SQS polling functionality, so we only had to build a simple Akka HTTP stateless application that moves files from the ingestion location to the backup location. The move operation does not require downloading the file, it is just a copy request followed by a delete request.
In the rare cases when there is a too high influx of data files, the application autoscales based on the number of messages in the SQS. Also, the copy/delete operation pair is resilient. If there is an application error, the message will go back to the SQS and it will be retried without any loss of data. Too many retries and the message goes to a dead letter queue from where it needs to be handled manually.
Similar to how we get notified when new data arrives at the ingestion location, we execute an Amazon Lambda function for every new file copied into the backup S3 bucket. The Lambda function receives as argument a message with the details of the file. It then pushes a message to two different SQS, one in development and one in production. This allows us to quickly replicate a production-like environment in development.
The message from SQS is polled by another Amazon Beanstalk Worker application. For every message received, our Akka HTTP application streams the contents of the file to the local environment. The file stream is processed log by log (line by line) and batches of logs are sent to an Amazon Kinesis Stream.
To optimise this process, the log lines are serialised as Protocol Buffer messages and then batched together into Kinesis Records. Serialising data with Protocol Buffers allows us graceful schema evolutions when, for example, Amazon decides to add new columns to the schema of the web access logs. We also avoid sending redundant key information, therefore minimising the amount of data that needs to be transferred.
We make use of a custom Kinesis Aggregator to create the batches of records that are sent to Kinesis, similar to how the official Kinesis Producer Library works, but without the daemon part. This way we are in full control of when and how to send data.
The Beanstalk workers are being scaled based on the size of the SQS. The Lambda function which routes messages to Development and Production is scaled based on the number of files copied to the backup location. The only thing that needs to be scaled manually is the Kinesis Stream, but this can be automatised as well.
We maintain a sliding window of 1 hour with the details of processed files. This is because SQS has at-least-once guaranteed delivery and we want to avoid streamifying a file twice. This behaviour is not a problem for the Backup component since we cannot move a file twice.
Enriching and Cleaning Data
The Processing Element responsible for enriching data is an Amazon Lambda function. It receives small batches of Kinesis Records which, in turn, contain thousands of Protocol Buffers serialised raw logs.
Each of the logs is processed individually. Based on elements like User-Agent, IP, HTTP referrer etc., new metadata is added, like geolocation and OS information. We also remove some information from the raw logs, such as Amazon specific fields. At the end, we have what we call enriched logs.
The last step is pushing the enriched logs to another Kinesis Stream.
The concrete implementation of the Lambda function is in Scala. To make everything run blazing fast, we take advantage of the temporary storage location of Lambda containers. Between individual runs of a Lambda function, the container in which it lives is reused. In fact, once the Lambda function is loaded for the first time, the JVM container will keep the objects in memory and will reuse them for subsequent runs. Keeping everything local to the Lambda container makes all processes much faster than if we would have to access remote data stores.
The datasets we use to enrich data have a one to one correspondence to the date of the logs. This way, we ensure idempotence. The datasets are loaded once when the function executes for the first time inside a container. When receiving data from dates for which the datasets are missing, the Lambda function downloads the respective datasets on the fly.
Processing 1000 events take 1 second. The first execution of a Lambda function inside a container will, of course, take longer because of the files that need to be downloaded.
The Lambda function scales based on the number of shards in the input Kinesis Stream. Often the downstream enriched logs stream needs to be scaled as well.
The last major component of our real-time pipeline is the data processing step. We aggregate the information in the enriched logs over batches of 5 minutes. The metrics we compute are then stored in various data stores. It is important to note that in this component, we do not make use of all the data available in the enriched logs stream. The data we do not need is simply filtered out in the application without any significant overhead.
Our data processing component is an Apache Spark application running along living streaming job. It reads data from Amazon Kinesis and then offloads the work to the cluster’s slaves. The Spark Streaming job runs in cluster mode on top of YARN. All of the filtering, grouping and aggregation usually happens within 150 seconds at peak loads. This is, of course, much faster than the 300 seconds limit above which we wouldn’t be able to keep up with the incoming data. To cope with increased traffic, we scale the Spark Streaming application simply by adding more slaves to the cluster.
To reduce costs, we run this cluster on spot instances. However, this adds some challenges when it comes to handling cost spikes that kill the cluster. We want to minimise the periods of time when data is not processed. To solve this problem we have built what we call an arbiter which constantly monitors the state of the cluster and makes sure that there is always a healthy cluster running.
It has two main tasks, the first being to correct and lock in place the metrics outputted by the data processing component. A Spark application sharing the same code as the data processing component is used for that.
The second main task is to generate and persist .parquet files from the raw logs. The structure and information in the .parquet files are the same as the one in the enriched logs Kinesis Stream.
These two tasks are highly redundant, but ensure that the numbers we deliver are ultimately correct. Other smaller tasks fulfilled by this job are snapshotting metadata databases for the purpose of idempotence. The heavy lifting of these tasks is handled by the Spark framework, which also makes it easy for us to scale out in case we need to.
Handling Out of Order Logs
There are cases when Amazon delivers logs files with delay, sometimes days. Aqueduct is handling this gracefully since everything is incremental. The real-time pipeline processes it normally and it is up to the Correction Job to add the new data to the already existing .parquet files and to correct the metrics for the day of the logs.
This and the lack of Kinesis support are two reasons for which we are not in a hurry to adopt Spark Structured Streaming.
Putting It All Together
It is important to notice how no two components directly interact with each other. All communication goes through data stores. This loose coupling allows us to develop the individual processing elements separately.
Extending, testing, deploying and scaling this pipeline was a breeze since its release. Recent hires are already contributing to it. The velocity of making new features available in the pipeline has increased and so has the confidence in it. It is a solid framework that makes it easy to consume data at various stages. Accessing data in our ecosystem is seamless so that we can focus on capitalising on it.
It is hard to put a number of how much it costs to run this pipeline, but our best estimates indicate that we need to spend 150$ per day with the whole pipeline (excluding S3 and CloudFront costs).
One last thing about the naming: We labelled our project Aqueduct because the way data travels through the pipeline feels as if it is water flowing through an aqueduct.