By Michael Allman, Staff Engineer at VideoAmp

For Spark 2.1, VideoAmp collaborated with Databricks and the wider Spark developer community to reduce the initial time to query Hive tables with tens of thousands of partitions from minutes to seconds. We also eliminated the need to fetch existing metadata when new table partitions are added. Together, these features bring Spark SQL rapid access to huge, evolving data warehouses—a key technical requirement for VideoAmp. Databricks and VideoAmp have written a joint blog post with technical details and benchmarks at https://databricks.com/blog/2016/12/15/scalable-partition-handling-for-cloud-native-architecture-in-apache-spark-2-1.html.

In this post, we’ll describe our motivation for developing this feature and provide additional benchmarks. We’ll also describe a problem we encountered running some of our queries while testing this feature internally, how to identify that problem in your own queries and how to work around it until it is fully addressed in Spark.

Some Background and Motivation
VideoAmp has been using Spark SQL from the inception of our data platform, starting with Spark 1.1. As a demand side platform in the real-time digital advertising marketplace, we receive and warehouse billions of events per day. Most of these events come as large JSON documents, which we store with full fidelity as partitions in one of our Spark SQL event tables. We now have tables with tens of thousands of partitions. We routinely trim a month of partitions from our largest table once it reaches 100,000 partitions.

Spark caches table metadata on first access for rapid query planning on subsequent accesses. Prior to 2.1, Spark fetched all partition metadata as part of its table cache. Additionally, adding a new partition to a table evicted that table from the metadata cache. Our largest tables are updated 10 to 20 times per hour, effectively negating the benefits of the table cache.

Our first solution to this problem took a shortcut—rather than fetching all of a table’s partition metadata, we fetched just the partition names. Because we use the standard layout for Hive table partitions, we can infer the location of a partition from its name. Fetching a table’s partition names from Hive is an order of magnitude faster than fetching their full metadata (the SHOW PARTITIONS command uses this faster API in Spark 2.1), and this reduced our planning times from minutes to seconds.

Eventually our tables reached multiple tens-of-thousands of partitions, and we began to hit the scalability wall again. We began to prototype a new approach based on deferred partition metadata loading. At the same time, we approached the Spark developer mailing list to sound out our ideas. Shortly thereafter, we submitted one of our prototypes as a pull request to the Spark Github repo. That began our collaboration to produce Spark 2.1’s new features for large partitioned tables.

Spark 2.1 separates table and partition metadata caching, and it only fetches partition metadata as-needed. That is, when Spark 2.1 plans a SQL query it computes and caches exactly the table partitions needed to satisfy the plan. The complexity of querying a partitioned table is now based on the number of partitions read versus the number of partitions in the table.

Benchmarks
We’ve benchmarked several queries from our data-to-day workload and selected two of them to illustrate the performance difference between Spark 2.0 and Spark 2.1.

Average CPM
Average CPM is our average cost to purchase an ad impression over some time window. This is a KPI for any demand side platform. The table over which we run this query has about 18,500 partitions.

Supply Query
Our supply query is an hourly aggregation summarizing facts about the ad purchasing opportunities we receive from our partner exchanges. This is a wide query with thirty output columns, multiple aggregate function calls over multiple groupings and a union of two subselects. We populate a new partition of a Spark SQL data mart with the results of this query, which typically produces about fifty million rows per hour. It reads from two tables with a total of about 71,000 partitions.

We ran our benchmarks on independent EC2 clusters, each configured with 4 c3.8xlarge workers. This equates to 128 cores and 240 GiB of memory per cluster. We ran one executor per worker, using all cores and configured with 16 GiB of heap.

Results
Our first benchmark splits the Average CPM query running time into execution and planning components. We ran this benchmark for a month, week and day of data. In Spark 2.0, planning time is relatively constant at 14 seconds for all partition selections, as expected. In Spark 2.1, planning time is hardly measurable, and the difference in query time between the two versions as we query fewer partitions becomes more substantial.

screen-shot-2016-12-27-at-3-53-49-pm

Screen Shot 2016-12-27 at 3.53.58 PM.png

The results of our supply query benchmark tell a similar but more dramatic story. We ran this query on an hour of data, just as we do in production, twice in a row. On Spark 2.0, planning takes more than 66% of total query time for both executions. On Spark 2.1, planning takes 2.6% of total query time in the first run and no measurable time on the second run. We can see here Spark 2.1 is caching and reusing partition metadata collected from the first query execution. However, Spark 2.0 fetches the entire table’s partition metadata in both the first and second execution, contrary to our initial expectations. However, we realized this query takes so long to execute (about 15 minutes) that between the start of the first run and the start of the second, our warehousing process has already added a new partition to the underlying table. Since Spark 2.0 invalidates a cached table if *any* of its partitions change, it cannot reuse the metadata it cached as part of the first query execution. This mirrors our early experience with Spark 2.0 running ad-hoc queries on our largest tables, where each query came with a several minute overhead.

screen-shot-2016-12-27-at-3-55-38-pm

Unsafe Broadcast Join Conversions
While testing pre-release builds of Spark 2.1 internally, we encountered some cases of unsafe automatic broadcast join conversions where we had not before. To review, if Spark estimates that the data for one side of a SQL join will fit within a certain size threshold (set by spark.sql.autoBroadcastJoinThreshold), it will broadcast that data to all of the executors and put it in an in-memory hash table for rapid lookup.

In previous versions, Spark used the file size of all of a table’s partitions in estimating a query plan’s size. This could easily result in query data size overestimations. Now Spark only includes the partitions read as part of a query in its size estimate. This is more correct than previous behavior, however in some cases it can lead to an automatic broadcast join conversion that will attempt to broadcast much more data than the cluster can handle.

This problem will manifest itself as heap OutOfMemory errors or excessive garbage collection in the driver process. If you suspect a bad query size estimate, set spark.sql.autoBroadcastJoinThreshold=-1 (which disables this optimization) and retry your query. If it succeeds, chances are Spark is attempting to broadcast more data than it can handle in-memory.

A definitive diagnosis can be made with a relatively simple procedure. First, make a note of your default spark.sql.autoBroadcastJoinThreshold, and identify which side of the join is being broadcast by examining the query execution plan. Set spark.sql.autoBroadcastJoinThreshold=-1 and run your query again. Once it finishes, open its detail page in the SQL tab of your app’s UI. There should be an Exchange stage in the query execution diagram before a SortMergeJoin or ShuffledHashJoin. Look for a statistic called “data size total”. If this value is significantly larger than your automatic broadcast join threshold, you’ve found the problem.

This issue has been reported as https://issues.apache.org/jira/browse/SPARK-18676 and work is underway to fix it. At the time of writing, steps have already been taken to decrease the likelihood of this issue occurring for the Spark 2.1.0 release.

Overall, we’re excited to see these capabilities brought to Spark 2.1 and believe they will enable further integration of Spark into environments with large Hive data warehouses.