One of the most frequent use cases in our data processing pipeline at AudienceProject is to move data between EMR clusters running Apache Spark and our perpetually growing DynamoDB data store.

Surprisingly, there is a distinct lack of good solutions to this problem out there, both amongst the open-source Spark community and dedicated AWS libraries.

Historically, we have often relied on Hive scripts to move data around, and on Spark to do the processing itself. The EMR-DynamoDB connector from Amazon (which uses Hive under the hood) has served its purpose well enough in this regard, but we have gradually been migrating all data processing away from Hive, its sole purpose in our pipeline remaining that of an ETL tool. This is a dependency we have long wanted to break.

After stumbling across the excellent library by Travis Crawford at Medium, which did not fully support all of our business needs, we decided to give it a shot ourselves. This has resulted in an easy-to-use, highly adaptable solution that we have since deployed across most of our production environments.

Find it on Github here: https://github.com/audienceproject/spark-dynamodb

Design Goals

Going into the project, there were two distinct motivations driving our design goals. Firstly, they were dictated by the immediate needs of our existing services. Secondly, we had a number of clear goals in mind with regards to what ought to be possible to achieve with a connector between DynamoDB and Spark.

Design goals dictated by our production needs:

  • Reading from both primary and secondary indices
  • Writing Spark data frames back to DynamoDB
  • Automatically matching the provisioned throughput
  • Defining the schema using strongly typed Scala case classes

Design goals dictated by our desire to create a feature-rich connector:

  • Inferring the schema based on the contents of the table
  • Lazy evaluation of the scan operations
  • Filter pushdown from Spark to DynamoDB

The second list is more experimental in nature but still applies to both Spark and DynamoDB in ways that seem to align perfectly between the two technologies. We will look more into the how of this in the following paragraphs. It is also worth noting that the last two points can influence performance significantly under certain circumstances.

Spark Data Source API

We implemented our Spark-DynamoDB connector using the Spark Data Source API, as this allows DynamoDB to live as a first-class citizen in the Spark ecosystem, alongside CSV files and SQL databases. The Spark Data Source API requires a custom data source to define two things:

  1. A DefaultSource class which will function as an access point to the custom data source from other Spark programs by providing relation objects (see 2)
  2. A subclass of BaseRelation which is additionally decorated with one or more traits describing the kind of behaviour that is supported by the custom data source

Our DefaultSource class has two options for providing a DynamoRelation for reading; one expecting a schema, which is used either for hard-coded user schemas or when we construct the schema based on a provided case class; and one which expects no schema and thus requires us to infer it from a preliminary scan of DynamoDB. The DefaultSource can additionally provide a DynamoWriteRelation for writing data back into DynamoDB.

The main feature of the DynamoRelation class is the buildScan method, which is inherited with slightly different signatures from each of the -Scan traits. The most basic version is a scan method with no arguments, essentially meaning a full table scan. The other two we implement are PrunedScan and PrunedFilteredScan, which are used by the Spark backend when the scan can be narrowed down to an array of particular columns or with a set of row filters from the current execution plan. Indeed, this filtered scan is what allows us to do filter pushdown to DynamoDB, but more on that later.

DynamoDB API

DynamoDB is a schema-less, NoSQL key-value store. Primary keys are defined either by a single hash key or by combined hash and range key. The database is accessed through a REST API which exposes the following 3 operations for reading: GetItem, Query and Scan. GetItem retrieves a single item by primary key. Query retrieves one or more items by hash key and a conditional operation on the range key. Scan retrieves every item for every key in the database.

On the first iteration of the Spark-DynamoDB connector, we have only made use of the full scan operation, but with a clever matching of the hash/range key schema and row filters from the Spark execution plan, certainly, future improvements could see the query operation be given a part to play as well.

When reading or writing to DynamoDB, capacity is consumed from a predetermined throughput, which is given in capacity units per second. Understanding the semantics behind the scan operation and how to read capacity units are consumed is essential to making DynamoDB integrate as seamlessly as possible with Spark. There are three aspects of the scan operation that were important to consider when designing the DynamoDB connector: capacity usage, pagination, and parallelisation.

See the official AWS docs here: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html

Capacity usage

The scan operation consumes capacity based on the cumulative size of items returned per request. From the AWS docs:

“One read capacity unit represents one strongly consistent read per second, or two eventually consistent reads per second, for an item up to 4 KB in size.”
(https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ProvisionedThroughput.html)

Contrary to what the above wording implies, capacity usage for scans is in fact calculated by counting every 4 KB of combined data in the payload returned (rounding up), and not on a per-item basis. There are two eventually consistent reads per 4 KB, which is then equivalent to having 8 KB of combined data in that case.

Pagination

There is a limit of 1 MB data returned per scan request to DynamoDB. Scan operations are therefore paginated, with every page containing items up to a cumulative size of 1 MB. It is possible to set a per-page item limit, in which case the size of a page is limited to this number of items or the 1 MB capacity, whichever is smaller.

Parallelisation

Of particular importance to our use case is the “parallel scan” feature provided by DynamoDB. With every scan request we specify the total number of segmentations in our accessing scheme, and the particular segment index we want to read with the given request. This is highly useful for distributing the read operation across all available Spark executors.

Calculating Throughput and Partitioning Accordingly

Recall the buildScan method from the Spark Data Source API. The return type of this method is an RDD[Row]. The most flexible way to produce such an object is to make a class that inherits directly from RDD. This DynamoRDD class overrides two methods from RDD: compute and getPartitions.

This fits well with the parallelisation scheme of DynamoDB. The number of partitions in the RDD will exactly match the number of parallel segments being read from DynamoDB, which will match the number of parallel processes running physically on the cluster. This is illustrated below.

The compute method takes the partition index and returns an iterator over the items that the
DynamoRDD should retrieve for that partition. For this purpose, we create a ScanPartition object for every logical RDD partition, which encapsulates the read operation on a single DynamoDB parallel scan segment. This object holds a reference to a shared, serialisable TableConnector object, which encapsulates the global quantities of the scan request parameter configuration, as well as the logic for massaging the DynamoDB Java API request/response objects.

The final part of this puzzle was then to develop an equation for each of the parameters to the DynamoDB scan operation, in order to maximise throughput within the reserved capacity and distribute it equally among all partitions. We will visit each parameter in turn here, describing our reasoning behind the way we calculate it in the source code.

  • Read/write limit

    This number is the rate, given in capacity units per second, that we limit throughput to when reading/writing from each of the scan partitions. This limit is given by the provisioned throughput on the database table, multiplied by the target capacity percentage, divided by the total number of segments.

  • Table size

    This is the size in bytes of the DynamoDB table. Used for later calculations and also exposed to the Spark Data Source API.

  • Average item size

    The average size in bytes of a single item in the table.

  • Item limit

    This limit is the maximum number of items that will be retrieved in a single scan operation. We use this to balance the provisioned capacity across partitions, making sure that it will not be exceeded by the cumulative consumed capacity from all partitions reading in parallel. To estimate this quantity, we first take the number of bytes that is read by a single read capacity unit (RCU), which is 4 KB for consistent reads and 8 KB for eventually consistent reads. We divide this by the average size in bytes of a single item and multiply the result by the normalised read limit (per scan partition) given in RCU per second.

Inventing a Schema (NoSQL and you)

The most pronounced difference between data living in DynamoDB and data living in Spark is that DynamoDB is schema-less, whereas it is mandatory to provide a schema when processing data in Spark. We have built two solutions to this problem, each with their own advantages and drawbacks. The first is a reflective analysis of entity case classes, inspired by Spark’s own use of case classes for the strongly typed Dataset class, with which it is seamlessly integrated. The other is a schema inference based on a preliminary scan of the DynamoDB table.

Schema inference

In order to provide quick, untyped use of the custom data source, we have implemented schema inference. The code makes a single, one-page scan of the DynamoDB table, and reads through every item in the response, building the schema by accumulating the properties present in the response.

This approach has a few drawbacks, however. If any properties on items are not present in the one-page scan, they will not be added to the schema, and will therefore not be readable by the data source. This has no easy solution since it is necessary for Spark to know the schema in advance of partitioning the initial RDD, which is necessary for the throughput calculation described above.

The approach also suffers from a hard limit on the size of the scan request to DynamoDB. This can be problematic because the number of distinct properties that could potentially be present in the table, and would thus have to be specified in the request, is unlimited. With schema inference, it is hard to prioritize among such properties.

Reflective analysis

The basic assumption with this approach is that the user has a clear idea of what kind of items are stored in the database, and can express this in terms of an entity case class. This case class will be useful not only for imposing a schema on the data when reading from DynamoDB but also for typing Datasets in Spark.

The schema analysis itself makes use of the ScalaReflection object from the Spark library, in order to make the correct mappings between Scala types and Spark types. The resulting Spark schema is then used to build the scan requests to DynamoDB and parsing the response correctly. Since DynamoDB is a JSON document store, both lists and nested hierarchies can be represented. We have used Spark MapType and StructType to model nested hierarchies in DynamoDB, and ArrayType is understood as DynamoDB lists and sets. In this manner, we have obtained nearly complete parity between the structure of data in DynamoDB documents and in Spark data frames.

Accommodating the Logical Execution Plan

When implementing a custom data source for Spark, it is important to build it in such a way that it can leverage the lazy nature of Spark’s query planner, as well as any kind of optimisations that may be possible based on the information available in the logical execution plan. The iterator returned from the compute method of the DynamoRDD can be implemented lazily, preventing waste of I/O to DynamoDB if Spark determines that it can terminate the job early (e.g. due to a limit operation in the execution plan).

Below is a screenshot of the physical plan from a Spark query running a show operation with a limit of 100 on a DynamoDB table with ~1 million items. As soon as the output from the scan operation exceeds the global limit, the lazy evaluation means that no more requests to DynamoDB are made once these first rows have been fetched.

Finally, we implemented filter pushdown to the scan operation’s FilterExpression parameter, which limits the amount of data returned in the response by removing items based on a filtering condition on its properties. We accomplish this by translating the row filters from the Spark Data Source API into a composite filter expression built using the DynamoDB Java SDK. There is an almost 1-to-1 mapping between row filters and the filter expression, having to leave out only a single Spark conditional (string ends-with), which is not in the DynamoDB API. By doing filter pushdown in this way, we can save significant I/O when the optimized Spark query only operates on a subset of the data from DynamoDB.

Below is a code snippet that shows how filters applied to the Spark data frame get translated into a filter expression on the DynamoDB scan request. The name map and value map properties in the request are shown as well, following best practice from the AWS docs.
(https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Scan.html#DDB-Scan-request-FilterExpression)

spark.read.dynamodbAs[PanelData]("panel_data")
    .filter(col("age") > 25)
    .filter(col("education").isin(Seq(3, 4, 5): _*))
    .show()
"FilterExpression":
"attribute_exists(#0) AND #0 > :0 AND attribute_exists(#1) AND #1 IN (:1, :2, :3)",

"ExpressionAttributeNames": {
  "#0": "age",
  "#1": "education"
},
"ExpressionAttributeValues": {
  ":0": { "N": 25 },
  ":1": { "N": 3 },
  ":2": { "N": 4 },
  ":3": { "N": 5 }
}

Putting it All to Use

Let us now take a look at how the data source performs in action. As it can be seen from the code example above, it is quite straightforward to load a DynamoDB table into a Spark data frame using the custom data source. The DataFrame can be manipulated inside Spark just like any other data frame, so it can be filtered on, joined with other data frames etc.

The screenshot below illustrates how accurately the custom data source is able to match the provisioned capacity of the table it is reading from. We are reading from a DynamoDB table with a provisioned read capacity of 400 using a Spark cluster of 4 machines with a total of 32 cores, running 1 executor with 7 cores on each machine.

After putting the library to use across most of our production services, we consider it an essential part of our Spark toolbox and are confident that its utility can extend beyond the confines of our own micro-service universe. The library is open-source under the Apache License 2.0. You are invited to pull/star/fork or whatever else you can think of on the Github link below!

https://github.com/audienceproject/spark-dynamodb