Hot topics, sorted by TFIDF using Lucene:

presto: [1] [2] [3] snowflake: [1] [2] f1: [1] [2] [3] leader: [1] [2] spanner: [1] [2] [3] dremel: [1] [2] [3] tao: [1] [2]


Highly Available Transactions: Virtues and Limitations paper overview

2021-12-12 00:00:00 +0000

Commentary:

There are interesting analogies between distributed systems and memory models of multi-threaded programming languages, in the context of modeling system/program behavior when there are concurrent modifications to data.

This paper focuses on isolation levels, which defines how a (DB) system behaves when multiple clients modify the same piece of data. The highest isolation level - Serializable - dictates that a system should execute concurrent client-issued transactions “as if” the clients executed them in some serial order, one at a time. Wikipedia on Isolation

Isolation guarantees are different from consistency levels, which, in the context of distributed systems, mostly apply to how replicas/nodes in a (DB) system converge to a consistent state or “consensus”, and the order/sequence that these replicas converge in. Wikipedia on Consistency Models

Summary of isolation levels

Taken from the paper with extra remarks:

Highly Available:
    Read Uncommitted (RU)
    Read Committed (RC)
    Monotonic Atomic View (MAV)
    Item Cut (ICI) and Predicate Cut Isolation (PCI) -- not SI
    Monotonic Reads (MR) (a consistency level)
    Monotonic Writes (MW) (a consistency level)
    Writes Follow Reads (WFR) -- Lamport's "Happens-before" order

Sticky:
    Read Your Writes (RYW) (a consistency level)
    Pipelined RAM (PRAM)
    Causal (a consistency level)

Unavailable:
    Cursor Stability (CS)
    Snapshot Isolation (SI)
    Repeatable Read (RR)
    Recency
    Linearizability (a consistency level)
    One-Copy Serializability (1SR)
    Strong Serializability

Read Committed

While most existing databases implement extended versions of Read Committed isolation to include guarantees like Monotonicity and Recency, the agnostic definition of RC is:

Transactions should not access uncommitted or intermediate versions of data items. The condition known as “Dirty Writes” is prohibited.

There is a begin edge at the start of new writes, and an end edge when new writes are fully committed. RC dictates that no observers can observe an intermediate state between these two edges.

To implement RC in a HAT system, 2 options:

  1. Clients can send writes to servers, who will not deliver new values to other readers until notified that the writes have committed.
  2. Clients can buffer writes to servers, until writes are fully committed.

These types of implementations do not guarantee Recency. See Granularity of Locks paper

Writes Follow Reads

This is a property describing how someone would observe the ordering of events. It’s equivalent to Lamport’s “Happens-before” ordering: if T1 occurs before T2, and if another observer/session observes T2, it must also observe the effects of T1.

Lamport’s definition

Implementation: WFR ordering effectives dictates that upon revelation of a new value/write, the sequence of writes leading up to this write are also revealed.

Force servers to wait to reveal new writes (say, by buffering them in separate local storage) until each write’s respective dependencies are visible on all replicas. This mechanism effectively ensures that all clients read from a globally agreed upon lower-bound on the versions written.

This does not imply that transactions will read their own writes: in HA scenario (non-sticky), a client may have to switch servers, and issue its next requests against a partitioned, out-of-date server.

Monotonic Atomic Reads

This property describes the isolation effects of atomicity, but critically, adds monotonicity properties:

Under MAV, once some of the effects of a transaction Tx are observed by another transaction Ty, then all other effects of Tx must be observed by Ty

This never sounded so different from Read Committed. MAV is described as stronger than RC but in practical terms the effects of an RC versus MAV transaction are very similar:

MAV disallows reading intermediate writes: observing all effects of a transaction implicitly requires observing the final (committed) effects of the transaction as well.

Implementation: the paper’s sketched implementation of MAV describes it adds monotonicity properties on top of RC, in a distributed system with sharded replicas.

  1. Replicas store all versions ever written to each data item. They gossip about versions they have observed, and construct a lower bound on the versions updated at every replica.
    • This can be represented by a vector clock
  2. At the start of a transaction, clients choose a read_timestamp smaller or equal to global_lower_bound.
  3. During transaction execution, replicas return/work on the latest version of each data item that is <= read_timestamp.
  4. The DB system advances the global_lower_bound along transactional boundaries.




Google Spanner - 2012 paper overview

2021-08-20 00:00:00 +0000

Commentary:

“It is the most advanced database in the world right now.”

Novel Idea:

Google Spanner is a globally-distributed, synchronously-replicated database. Spanner offers a level of consistency it calls “external consistency” - which includes/implements serializable isolation level but with tunable concurrency control.

Spanner is made possible with TrueTime, a novel clock API that exposes uncertainty, and builds some powerful features on top: non-blocking reads in the past, lock-free read-only transactions, and atomic schema changes.

Since 2012, Google Spanner feature set has continued to evolve. This overview focuses on the original 2012 paper.

Impact:

Google’s business is in serving advertisement online, across the world, and at high availability. F1 serves this business, and F1 is built on top of Spanner as its physical database layer. The most significant impact of Spanner is perhaps that it powers Google’s revenue.

Since its introduction, Spanner has inspired several other distributed databases: TiDB, CockroachDB. Spanner remains a unique product of Google Cloud because of its custom TrueTime clock capability, and Google’s vast network connection across the world.

Spanner is designed for global scale - to millions of machines across hundreds/thousands of datacenters and trillions of data rows. Spanner provides all of this with strong consistency guarantees, and five 9s of availability guarantees.

Content:

At the highest level of abstraction, Spanner is a database that shards data across many sets of Paxos state machines in datacenters spread all over the world. Spanner automatically reshards data across machines as the amount of data or the number of servers changes, and it automatically migrates data across machines to balance load and in response to failures.

F1, which is Google’s advertisement backend, is a client of Spanner. F1 prefers high availability, so its Spanner replication preference is 5 replicas.

Spanner was built in response to consistent complaints from customers about BigTable’s shortcomings: applications wanted to evolve schemas, and/or strong consistency with wide-area replication.

First, Spanner offers such replication configuration for applications. Second, because of its ability to use TrueTime to assign globally meaningful timestamps upon transaction commits, Spanner provides “externally consistent” read/writes, and globally-consistent time snapshot reads.

S2. Implementation

Spanner deployment - universe - global
    Placement driver - moves data across zones
    Zone - set of physical locations that data replicates across
        One zonemaster
        Thousands of spanservers
            Tablets - similar to BigTable tablets
                SST/B-tree files + WAL
                DFS
            One Paxos state machine
                Writes initiate Paxos at leader
                Reads from any updated replica
                Leader for the group
                    Holds 10s leases
                    Lock table for 2PL
                    Transaction Manager for 2PC

Data placement

Spanner groups buckets of key-value pairs with the same prefix, into a directory. A Spanner Paxos Group may contain multiple directories that are frequently accessed together. In contrast to BigTable tablets, Spanner tablets are not lexicographically contiguous partitions of the key/row space - it can contain multiple discrete partitions of the key/row space, depending on access.

This is most similar to HDFS blocks, TiDB data regions, or BigTable tablets as mentioned.

Application needs

There were 3 key features that application developers wanted:

  1. Stronger consistency than eventual consistency
  2. Data model that was easy to evolve
  3. SQL-like query language

Examples of well-known Google applications that use Megastore are Gmail, Picasa, Calendar, Android Market, and AppEngine. The need to support a SQL-like query language in Spanner was also clear, given the popularity of Dremel [28] as an interactive data-analysis tool. Finally, the lack of cross-row transactions in Bigtable led to frequent complaints.

Google Percolator addresses transaction needs but it suffered in availability and performance due to its implementation of general two-phase commit. Spanner address the concern two ways:

  1. Support transactions as a feature, but let application developers deal with performance problems.
  2. Run 2PC over Paxos to increase availability.

Data model

Spanner rows must have primary key column(s).

Spanner offers the unique feature of INTERLEAVE IN to declare a table’s content is somehow hierarchically related/accessed frequently with another table’s content. At the physical data level, this is a hint to the data placement driver to colocate data.

S3. TrueTime

We leave most of the details for another paper

I’m still waiting on that paper.

Let’s define the following:

  1. Endpoints (earliest and latest) of a TTinterval are TTstamp
  2. TT.now() returns TTinterval that contains the absolute time this call was invoked
  3. epsilon is the error bound, equal to half of the TTinterval width
  4. We define any event e happened at physical absolute time t_abs(e)

In formal terms, TrueTime guarantees:

tt = TT.now()
tt.earliest <= t_abs(e) <= tt.latest

TrueTime is implemented by a set of time master machines per datacenter and a timeslave daemon per machine. The majority of masters have GPS receivers with dedicated antennas.

Every daemon polls a variety of masters [29] to reduce vulnerability to errors from any one master. Between synchronizations, a daemon advertises a slowly increasing time uncertainty, epsilon is derived from a conservatively applied worst-case local clock drift.

S4. Concurrency Control

Spanner guarantees a whole-database transactional read at timestamp t will see effects of all transactions committed as of t. This is one aspect of external consistency.

Spanner read-only transactions can execute without locking, without blocking writes, because Spanner can isolate data (in the Paxos state machines) based on their timestamps and execute the reads at an optimal, system-chosen timestamp, and query any Paxos replica that is up-to-date for that chosen timestamp. For example, in section 4.2.2, if RO-transaction is scoped to a single Paxos group, the Paxos leader can generally assign the transaction timestamp to be “last committed write” as it satisfies external consistency. For larger scoped RO-transactions, Spanner may choose to optimize the timestamp choice.

Paxos Leader Leases

Spanner’s implementation of Paxos leader lease renewal has some similarity to Raft, with TrueTime timestamps of the leader acting like the Raft election term number.

Timestamps for RW Transactions

In this paper Spanner implements RW transactions by two-phase locking. Spanner assigns the transaction timestamp to be the time of the Paxos write representing the transaction commit.

Within each Paxos group, Spanner assigns timestamps to Paxos writes in monotonically increasing order, even across leaders.

Start

Transaction coordinator leader assigns a commit timestamp s_i >= TT.now().latest

Commit wait

Transaction coordinator leader ensures clients cannot see data committed by T_i until TT.after(s_i) is true, meaning the absolute physical time of this transaction has definitely passed.

Serving Reads at a Timestamp

Every replica tracks a value called safe time t_safe, the max timestamp at which this replica is up-to-date. This is probably similar to the log index number in Raft. At the Paxos level, this is the highest-applied Paxos write.

t_safe = min( t_paxos_safe, t_TM_safe )

The tricky one is t_TM_safe: when there are zero prepared but not committed transactions, it is infinity. Participant slaves refer to the leader’s transaction manager for this timestamp. But as soon as a transaction is prepared, this timestamp can be assigned as follows:

As we discuss in Section 4.2.1, the commit protocol ensures that every participant knows a lower bound on a prepared transaction’s timestamp. Every participant leader (for a group g) for a transaction T assigns a prepare timestamp to its prepare record s_prepare_g. The coordinator leader ensures that the transaction’s commit timestamp s_i >= s_prepare_g over all participant groups.

Therefore in such a case, t_TM_safe = min_across_transactions( s_prepare_g ) - 1

As described in Section 4.2.4, this leads to a weakness where a single prepared transaction can prevent t_safe from advancing (at any replica). This is a false conflict if later reads do not conflict with the transaction. Spanner works around this by keeping a finer-grained mapping from key ranges to s_prepare_g timestamps.

Read-Write Transactions

Spanner uses two-phase commit to perform its transactions across Paxos groups. Spanner clients actually drive the following work of a transaction:

  1. Buffering writes
  2. Issue reads to leaders of appropriate group(s)
  3. Send keepalive messages to participant leaders
  4. Perform reads, kick off two-phase commit
  5. Choose a coordinator group, send commit message to each participant group leader

A non-coordinator-participant leader first acquires write locks. It then chooses a prepare timestamp (> than any previous transactions), then logs a prepare record through Paxos. Each participant then notifies the coordinator of its prepare timestamp.

Thus, the prepare phase at each Paxo group is replicated and highly available, compared to non-Paxos 2PC.

The coordinator leader also acquires write locks, but skips the prepare phase. It chooses a timestamp for the entire transaction after hearing from all other participant leaders.

The commit timestamp, s, must be:

  1. Greater than or equal to all prepare timestamps from each participant
  2. >= TT.now().latest at the time the coordinator received its commit message (from the client)
  3. Greater than any timestamps the leader has assigned to previous transactions

The coordinator leader then logs a commit record through Paxos, preserving the same replication and high availability.

The coordinator leader can skip the prepare phase because it must wait until all other participants reply with their prepare notifications anyway, at which point this coordinator will know if it is ready to commit.

Before allowing any coordinator replica to apply the commit record, the coordinator leader waits until TT.after(s), so as to obey the commit-wait rule described in Section 4.1.2. Because the coordinator leader chose s based on TT.now().latest and now waits until that timestamp is guaranteed to be in the past, the expected wait is at least 2 * epsilon_avg.

After commit wait, the coordinator sends the commit timestamp to the client and all other participant leaders. Each participant leader logs the transaction’s outcome through Paxos. All participants apply at the same timestamp and then release locks.

The application of the writes at a particular timestamp is similar to the Raft replicated state machine, but instead of log indexes, TrueTime assigns meaningful, monotonically increasing indexes values to the state changes.

More details about Spanner’s transaction capabilities here: document

Refinements

Another interesting mention: t_paxos_safe cannot advance in absence of Paxos writes. Instead of taking the easy route of doing zero-fill writes, this happens:

Spanner addresses this problem by taking advantage of the disjointness of leader-lease intervals. Each Paxos leader advances t_paxos_safe by keeping a threshold above which future writes’ timestamps will occur: it maintains a mapping MinNextTS(n) from Paxos sequence number n to the minimum timestamp that may be assigned to Paxos sequence number n + 1.

This is possible because the leaders know their term leases are 10 seconds, and they have access to TrueTime to assign such timestamps to the Paxos sequence numbers with confidence. Leaders can also tell when the end of their lease is coming up, and they must renew their lease to preserve lease disjointness.

Relationship with F1

Google’s advertisement backend, F1, was based on manually sharded MySQL with some interesting details:

This backend was originally based on a MySQL database that was manually sharded many ways. The uncompressed dataset is tens of terabytes, which is small compared to many NoSQL instances, but was large enough to cause difficulties with sharded MySQL. The MySQL sharding scheme assigned each customer and all related data to a fixed shard. This layout enabled the use of indexes and complex query processing on a per-customer basis, but required some knowledge of the sharding in application business logic. Resharding this revenue-critical database as it grew in the number of customers and their data was extremely costly.

And this bit:

The last resharding took over two years of intense effort, and involved coordination and testing across dozens of teams to minimize risk. This operation was too complex to do regularly.

The reasons to switch to Spanner were clear:

First, Spanner removes the need to manually re-shard. Second, Spanner provides synchronous replication and automatic failover. With MySQL master-slave replication, failover was difficult, and risked data loss and downtime. Third, F1 requires strong transactional semantics, which made using other NoSQL systems impractical. Application semantics requires transactions across arbitrary data, and consistent reads. The F1 team also needed secondary indexes on their data (since Spanner does not yet provide automatic support for secondary indexes), and was able to implement their own consistent global indexes using Spanner transactions.

As a result of Spanner:

Spanner’s automatic failover has been nearly invisible to them.

Spanner’s timestamp semantics made it efficient for F1 to maintain in-memory data structures computed from the database state. F1 maintains a logical history log of all changes, which is written into Spanner itself as part of every transaction. F1 takes full snapshots of data at a timestamp to initialize its data structures, and then reads incremental changes to update them.

Prior Work:

Google Spanner consistency and TrueTime discussion




JVM Metaspace - Classloader memory allocation

2021-08-12 00:00:00 +0000

Commentary:

JEP 387 “Elastic Metaspace” – a new classroom for the Java Virtual Machine

This article discusses one of my favorite topics - Java off-heap memory. Specifically, the metaspace.

Novel Idea:

This article introduces a new way for JVM to allocate memory for the Metaspace, a region of memory that JVM uses to load class objects. In the past, the Metaspace was managed with a slab allocator, but under stressful patterns of class loading and unloading, the slab allocator caused high fragmentation, and its allocation granularity was not tunable. This article, and the corresponding Java feature, enables more robust and elastic management of the Metaspace.

Prior Work:

  1. https://bugs.openjdk.java.net/browse/JDK-8198423
  2. https://spring.io/blog/2015/12/10/spring-boot-memory-performance
  3. http://trustmeiamadeveloper.com/2016/03/18/where-is-my-memory-java/

Content:

Off-Heap Memory and the Metaspace

The JVM manages the following in off-heap:

  1. Thread stacks
  2. GC control structures
  3. Interned Strings
  4. CDS archives and text segments
  5. JIT-compiled code (the code cache)
  6. and many many other things

All these data live outside the Java heap, either in C-Heap or in manually managed mappings. Colloquially named off-heap – or, somewhat more incorrectly, native – memory, the combined size of these regions can surpass that of the heap itself.

A java class is removed – unloaded – only if its loading class loader dies. The Java Specification defines this:

“A class or interface may be unloaded if and only if its defining class loader may be reclaimed by the garbage collector ” [5].

Before Java 8: The Permanent Generation

Before Java 8, class metadata lived in PermGen, which was managed by JVM GC. JEP 122 removed the PermGen: two factors coincided, (1) The advent of JRockit JVM, and (2) the Sun-JVM group internal roadmap.

Java 8

JEP 122 shipped with JVM 8. The first Metaspace was implemented with several features:

Better suited for bulk-release data is a so-called Arena Allocator [9]. In its simplest form, an arena is a contiguous region: on each new allocation request, an allocation top pointer is pushed up to make space for the new allocation. This technique is primitive but very fast. It is also very memory-efficient since waste is limited to padding requirements. We pay for this efficiency by not being able to track individual allocations: allocations are not released individually but only as a whole, by scrapping the arena.

The Metaspace is also, in its heart, an arena allocator. Here, an arena is not bound to a thread like a thread stack or a TLAB. Instead, a class loader owns the arena and releases it upon death.

From first glance, this allocation pattern is a perfect match for class loading/unloading - leaf/related classes are allocated close to the parent classes, and the root node is de-allocated last when all children objects are unloaded, resulting in a whole slab de-allocation.

Metaspace Problems

  • Fixed chunk sizes

For a start, Metaspace chunk management had been too rigid. Chunks, which came in various sizes, could never be resized. That limited their reuse potential after their original loader died. The free list could fill up with tons of chunks locked into the wrong size, which Metaspace could not reuse.

  • The lack of elasticity

The first Metaspace also lacked elasticity and did not recover well from usage spikes.

When classes get unloaded, their metadata are not needed anymore. Theoretically, the JVM could hand those pages back to the Operating System. If the system faces memory pressure, the kernel could give those free pages to whoever needs it most, which could include other areas of the JVM itself. Holding on to that memory for the sake of some possible future class loads is not useful.

But Metaspace retained most of its memory by keeping freed chunks in the free list. To be fair, a mechanism existed to return memory to the OS by unmapping empty virtual space nodes. But this mechanism was very coarse-grained and easily defeated by even moderate Metaspace fragmentation. Moreover, it did not work at all for the class space.

From the JDK ticket: “So, we could get metaspace OOMs even in situations where the metaspace was far from exhausted”

  • High per-classloader overhead

In the old Metaspace, small class loaders were disproportionally affected by high memory overhead. If the size of your loaders hit those “sweet spot” size ranges, you paid significantly more than the loader needed. For example, a loader allocating ~20K of metadata would consume ~80K internally, wasting upward of 75% of the allocated space.

These amounts are tiny but quickly add up when dealing with swarms of small loaders. This problem mostly plagued scenarios with automatically generated class loaders, e.g., dynamic languages implemented atop Java.

Java 16: Metaspace evolved

  • Buddy Allocation

Very simplified, buddy allocation in Metaspace works like this:

  1. Classloader requests space for metadata; its arena needs and requests a new chunk from the chunk manager.
  2. The chunk manager searches the free list for a chunk equal or larger than the requested size.
  3. If it found one larger than the requested size, it splits that chunk repeatedly in halves until the fragments have the requested size.
  4. It now hands one of the splinter chunks over to the requesting loader and adds the remaining splinters back into the free list.
  • Deallocation of a chunk works in reverse order:
  1. Classloader dies; its arena dies too and returns all its chunks to the chunk manager
  2. The chunk manager marks each chunk as free and checks its neighboring chunk (“buddy”). If it is also free, it fuses both chunks into a single larger chunk.
  3. It repeats that process recursively until either a buddy is encountered that is still in use, or until the maximum chunk size (and by that, maximum defragmentation) is reached.
  4. The large chunk is then decomitted to return memory to the operating system.

Like a self-healing ice sheet, chunks splinter on allocation and crystallize back into larger units on deallocation. It is an excellent way of keeping fragmentation at bay even if this process repeats endlessly, e.g., in a JVM which loads and unloads tons of classes in its life time.

Results




Google Dremel - a Decade Later

2021-06-14 00:00:00 +0000

Novel Idea:

Related to Dremel 2010, this paper examines what Dremel ideas withstood the test of time, and what ideas changed as Dremel evolved into BigQuery.

Main Result(s):

Most of the original Dremel system design withstood the test of time: some inspired major industry trends and are now considered best practices. Here are the top 5:

  1. SQL - as the de facto API and language of data retrieval and query.
  2. Disaggregated compute and storage - decoupling storage from compute, allowing separate scaling of either resource, and separate optimization paths (see #5).
  3. In situ data analysis - Dremel’s use of a distributed file system to share data access allowed an interoperable ecosystem of services to flourish.
  4. Serverless compute - elastic scale up and down of compute was at its infancy in 2010, but is pervasive today.
  5. Columnar storage - Dremel introduced a novel encoding for nested data that generalized columnar physical layout to relational and semi-structured data.

Prior Work:

Since the 2010 Dremel paper, multiple papers advanced the state of data analysis, as cited:

Content:

S2. SQL

Dremel reintroduced SQL for data analysis at Google. Before, the conventional wisdom at Google was “SQL doesn’t scale”, and the tradeoff was either scalability, or ease of use. Dremel combined both.

Dremel initially avoided query-time joins, relying on Google’s internal support for denormalized, but often hierarchical data - the nested structure of protobuf, together with Dremel’s ability to process it in situ, fit Google’s use case well.

SQL finally became pervasive at Google, across widely used systems such as Dremel, F1, and Spanner, and other niche systems such as PowerDrill [24], Procella [15], and Tenzing [16].

SQL functionality in Dremel has been expanded in recent years, in particular with joins. Distributed joins across large datasets remain an active area of research. Dremel introduced a new shuffle join architecture that leverages the latest research, with Google’s internal network optimizations.

S3. Disaggregation of storage and compute

Storage

Dremel started on a cluster of special hardware. In 2009, Dremel started migration of compute towards Borg, the inception of storage disaggregation/decoupling, along with the move of storage to GFS. The move was challenging:

Harnessing query latency became an enduring challenge for Dremel engineers, which we cover in more detail in Section 7. It took a lot of fine-tuning of the storage format, metadata representation, query affinity, and prefetching to migrate Dremel to GFS. Eventually, Dremel on disaggregated storage outperformed the local-disk based system both in terms of latency and throughput for typical workloads.

Memory

Dremel had an initial implementation of distributed joins modeled after the MapReduce shuffle operation, spilling intermediate results from local RAM to local disk. However, as most everyone knows by now (2021), this was a bottleneck.

  1. With such colocation, it is not possible to efficiently mitigate the quadratic scaling characteristics of shuffle operations as the number of data producers and consumers grew.
  2. The coupling inherently led to resource fragmentation and stranding and provides poor isolation. This became a major bottleneck in scalability and multi-tenancy as the service usage increased.

After exploring alternatives, including a dedicated shuffle service, in 2014 we finally settled on the shuffle infrastructure which supported completely in-memory query execution [4]. In the new shuffle implementation, RAM and disk resources needed to store intermediate shuffle data were managed separately in a distributed transient storage system.

The new shuffle implementation:

  • Reduced the shuffle latency by an order of magnitude.
  • Enabled an order of magnitude larger shuffles.
  • Reduced the resource cost of the service by more than 20%.

S4. In Situ Data Analysis

In situ data processing refers to accessing data in its original place, without upfront data loading and transformation steps. In their prescient 2005 paper [22], Jim Gray et al. outlined a vision for scientific data management where a synthesis of databases and file systems enables searching petabyte-scale datasets within seconds. They saw a harbinger of this idea in the MapReduce approach pioneered by Google, and suggested that it would be generalized in the next decade.

The transition to in situ analytics required 3 ingredients:

  1. Consuming data from a variety of data sources
  2. Eliminating traditional ETL-based data ingestion from an OLTP system to a data warehouse
  3. Enabling a variety of compute engines to operate on the data.

First, the data needed to be formatted in a way that enables interoperability between different systems. At Google, that format was Protobuf:

A self-describing storage format in GFS enabled interoperation between custom data transformation tools and SQL-based analytics. MapReduce jobs could run on columnar data, write out columnar results, and those results could be immediately queried via Dremel.

Second, the ecosystem of different analytical tools allowed federation of queries (similar to Presto):

In some cases, including remote file systems such as Google Cloud Storage 11 and Google Drive, 12 we read the files directly. In other cases, including F1, MySQL, and BigTable, we read data through another engine’s query API. In addition to expanding the universe of joinable data, federation allows Dremel to take advantage of the unique strengths of these other systems.

S5. Serverless Compute

Some key points in the dynamic utilization of compute resources in Dremel/BigQuery:

  1. Centralized scheduling: more fine-grained, better isolation and utilization of resources.
  2. New Shuffle Persistence Layer: decoupled schedule and execution of different stages of each query; at each checkpoint of a query, the scheduler can dynamically preempt workers.
  3. Flexbile Execution DAG evolution: BigQuery implements a more flexible execution plan than that described in the original 2010 paper.
  4. Dynamic execution plan: for queries on data where the cardinality estimates are wrong, Dremel/BigQuery allows the query plan to dynamically change during runtime, managed by the central query coordinator, and checkpointed by the shuffle persistence layer.

S6. Columnar Nested Data

From a computer science perspective, the Dremel model of encoding data is perhaps most interesting.

Dremel influenced or inspired the encoding of nested data on a columnar layout. This was evident in the development of Parquet, ORC, and Apache Arrow. ORC takes a slightly different approach to encoding than Dremel/Parquet does, and depending on data pattern, the compression efficiency differs.

The main design decision behind repetition and definition levels encoding was to encode all structure information within the column itself, so it can be accessed without reading ancestor fields. Indeed, the non-leaf nodes are not even explicitly stored. However, this scheme leads to redundant data storage, since each child repeats the same information about the structure of common ancestors. The deeper and wider the structure of the message, the more redundancy is introduced.

With repetition/definition levels it is sufficient to only read the column being queried, as it has all required information. In 2014, we published efficient algorithms [3] for Compute, Filter and Aggregate that work with this encoding. With length/presence encoding, it is also necessary to read all ancestors of the column. This incurs additional I/O, and while ancestor columns usually are very small, they could require extra disk seeks.

The encoding and filtering/querying of columnar data has been embedded into a library called Capacitor. It makes the following optimizations accessible to not just Dremel, but other Google systems such as Spanner and MapReduce.

  • Partition and predicate pruning: maintain statistics about values in each column.
  • Vectorization.
  • Skip-indexes: within a Capacitor block, column values are split into segments which the header points to; when querying with high selectivity, these indexes point directly to the value segments.
  • Predicate reordering: Capacitor uses heuristics to run the most selective filters first, even if its evaluation could be more complex.

Row Reordering

Here’s an optimization that was hard to imagine:

RLE in particular is very sensitive to row ordering. Usually, row order in the table does not have significance, so Capacitor is free to permute rows to improve RLE effectiveness.

Unfortunately, finding the optimal solution is an NP-complete problem [29].

short RLE runs give more benefit for long strings than longer runs on small integer columns. Finally, we must take into account actual usage: some columns are more likely to be selected in queries than others, and some columns are more likely to be used as filters in WHERE clauses. Capacitor’s row reordering algorithm uses sampling and heuristics to build an approximate model.




Google Dremel/BigQuery - reading notes

2021-04-29 00:00:00 +0000

Commentary:

Dremel is Google’s innovative approach to query massive amounts of data, utilizing thousands of machines, with an interactive latency that was previously not possible with MapReduce. The Dremel/BigQuery style of querying distributed data in-situ became a cited influence for many other distributed systems like Snowflake. The Protobuf record description, and its record shredding/striping and reassembly algorithm, went on to influence open source implementations like Thrift, Avro, and Parquet.

Novel Idea:

Before Dremel, analytical queries running on large volumes of archival data typically relied on proprietary databases, or recently, MapReduce. Dremel is a new system that introduces a novel approach to query data in-situ, at interactive response times (a few seconds), by doing the following:

  1. formatting the data in an optimized physical layout,
  2. distributing the query processing across thousands of machines,
  3. outsourcing the data storage to a distributed file system (GFS).

Main Result(s):

Dremel is able to query and analyze petabytes of data, and return calculated results within a few seconds, in 2006 - at Google’s scale. These were, and continue to be, industry-leading results.

Prior Work:

BigQuery is influenced by a variety of prior research work in databases. However, many of those were non-public or not practically implemented.

The paper cites several sources as inspiration for the record shredding algorith. I also see influences of PAX (Partition Attributes Across) as well - the idea to strike the optimal balance between row and columnar storage is not new to the year 2000.

Legacy:

It is easy to see the influence Dremel/BigQuery has on later systems. The most obvious is Snowflake, which directly cites Dremel. Similar systems on the Hadoop ecosystem include Apache Impala. Apache Parquet is an open source implementation of the physical data storage specification in Dremel. Apache Arrow is a continuation of that.

Content:

S2-3. Background and Data Model

The main problem Dremel tried to solve was interactive analytical querying of data at Google. The primary need was to get access to results quickly, ideally within a few seconds, while the raw data can span billions/trillions of rows.

The two main “ingredients” to make this happen are

  1. A common storage layer of physical data, on a distributed file system, that is high performance, resilient to failures, that enables quick access to the subset of data relevant for processing, and at the same time, separated from the processing layer.
  2. A shared storage format - consisting of a data model descriptor + serializer, namely Protobuf, and a physical data storage format, that is optimal for the nested Protobuf data structure.

Item 1 has been described in the GFS paper. Item 2 is the focus of Section 4.

S4. Nested Columnar Storage

Records can be wide, but not every field is necessary for query processing:

All values of a nested field, such as A.B.C, are stored continguously. A.B.C can be retrieved without reading A.E, A.B.D, etc.

In Protobuf, field values can be nested, optional, and repeated.

“The challenge is how to preserve all structural information and be able to reconstruct records from an arbitrary subset of fields”

The idea is to compute and store two additional integer values to every field value, as the records are written.

  1. Repetition Level

    • This is defined as “at what repeated field in the field path this value has repeated.”
    • In other words: Repetition Level starts with 0 at the beginning of a new record, at the record root level. As we traverse the record via DFS, jot down this node’s depth level by examining how many REPEATED fields exist to the root - call it rawRepLevel. If this node is a new occurence, use its parent’s Repetition Level; otherwise, this node is repeating, thus, use the rawRepLevel.
  2. Definition Level

    • This is defined as “how many fields in p that could be undefined are actually present in the record”
    • In other words: As we traverse the record by DFS, Definition Level specifies the lowest level, in the field’s path, at which the value is still defined/not-NULL.

Here is a version of Java psuedocode. Also listed: toy implementation

public void traverse(
    int curDef,
    int nonNullDef,
    int parentRep,
    int parentDisplayRep,
    Set<Field> seen) {

        int repLevel = this.type == REPEAT ? parentRep + 1 : parentRep;
        int repLvlDisplay = seen.contains(this.fieldpath) ? repLevel : parentDisplayRep;
        seen.add(this.fieldpath);

        if (isParentNode(this)) {
            Set<> nextLevelSeen = new Set<>(seen);
            int nextDefLevel = childrenAreNull(this) ? curDef : 1 + curDef;

            if (childrenAreNulls(this)) {
                for (child : this.children) {
                    child.traverse(
                        1 + curDef,
                        nextDefLevel,
                        repLevel,
                        repLvlDisplay,
                        nextLevelSeen
                    );
                }
            }
        } else {
            // a value node
            int defLevel = emptyValue(this) ? nonNullDef - 1 : curDef;

            writers.write(this.fieldpath, this.value, repLvlDisplay, defLevel);
        }
    }

Record reassembly is done via the FSM described in the paper. Apache Parquet has an open source implementation of effectively the same FSM.

S5-6. Query Language and Execution

There are several advantages of converting the records into blocks of columnar physical format:

  1. At the macro level: separate fields can be stored and processed on separate machines, allowing parallelization.
  2. At the single machine level: sequential reads and writes, better cache utilization at the file system and CPU cache levels.
  3. At the file level: more efficient encoding/decoding of values; also, better compression and vectorization - values are always of the same type within a block.
  4. Implicit skipping of empty values/NULLs: a requirement of Google, because large collections of data are sparsely populated.

As for Query Language, Dremel started with a derivative/subset of SQL, but as it evolved into BigQuery, it also added more and more features of standard SQL. This is motivated by wide adoption of SQL in industry.

Tree Query Execution Architecture

Dremel fans out a query from a root node to leaf nodes. Non-leaf nodes compute the boundaries of where pieces of data are distributed, and splits the query into paritioned queries. Leaf nodes work on the actual data scans. Then, as the leaf nodes return sub-results back up the tree, non-leaf nodes aggregate the results.

There are several advantages of doing this:

  1. Parallelization: effective partitioning of compute to where the data lives.
  2. Connection pooling: any parent node can maintain connection to a pooled set of nodes, and optimize against connection churn.
  3. Resilience against failure: at the largest distribution leaf nodes, any transient or permanent failure can be quickly failed over to replicas.

Ideas for further work:

Google BigQuery team wrote a follow-up paper, called Dremel - A Decade Later




Facebook TAO - reading notes

2020-12-11 00:00:00 +0000

Commentary:

Facebook overcomes its scale and performance challenges by taking the concept of an eventually consistent, distributed hash table to the limits, via a geographically distributed, hierarchical system of caches. This kind of system is innovative when its use case and scale is appropriate. I think Facebook TAO, originally published here in 2013 but referenced by many papers later, was at the beginning of a trend of “building a specific DB for your data and problem”.

“A million dollars isn’t cool, Mark. You know what’s cool? A BILLION dollars.”

Paper link: https://www.usenix.org/system/files/conference/atc13/atc13-bronson.pdf

Date: 2013

Novel Idea

Before TAO, applications at Facebook aggressively used Memcache as a lookaside cache against MySQL to serve the social graph. TAO makes special optimizations by optimizing graph semantics directly, and continues to use MySQL as its source of truth. TAO favors availability and performance over strong consistency.

Main Result(s):

TAO as described here in 2013 can serve Facebook’s social graph at ~1 billion QPS for reads and ~1 million QPS for writes, for more than 1 billion active users on this changing graph. It is single instance, multi-tenant.

Prior Work:

Some influential prior work that built up to TAO, at least from industry, were: Google BigTable, Spanner, Amazon DynamoDB, LinkedIn Voldemort, Twitter FlockDB, Redis, and Yahoo PNUTS. And from academia, there were the usual suspects: consistent hashing, distributed data stores (Chord and Tapestry).

Competitive work:

Facebook’s TAO system is unique in its scale and aggressive performance optimizations, because in this time period circa 2009-2014, Facebook saw incredibly high rate of growth in its active user base, worldwide. There were competitive distributed KV stores, e.g. Dynamo, but Facebook TAO leverages the unique use case of its application to avoid the application-side conflict resolution required in Dynamo (while not outsourcing Facebook’s critical infrastructure to Amazon). Twitter’s FlockDB also did not provide the same scale factor that Facebook needed. Same goes for other similar DBs.

Content:

S2. Background

Facebook pages operate on a pull model, which is read heavy. Every page aggregates and filters a large number of items specific to the viewing user (viewer).

S2.1. Graph structure and Memcache

Facebook has legacy systems accessing the social graph in MySQL, over client-side Memcache. This logic was tightly coupled.

The structure of the social graph is as follows: objects are stored as nodes, and associations are stored as edges.

A KV cache alone is not efficient when an edge list changes: we need something to support concurrent incremental updates to edge lists, and queries need to do better than always fetch the entire edge list, requiring a complete reload upon every change.

Client-side caching logic makes failure modes complex: we need something to centralize the graph access API and cache control logic.

Inter-regional communication for read-after-write: when we restrict the data model and understand the graph semantics, concurrent cache invalidation messages can be processed more efficiently. This provides Facebook with read-after-write consistency for clients sharing a cache, within a region, in normal operations, without waiting on inter-region communication. Section 4.4 - 4.5 describes this.

S3. Data Model and API

Consider the social networking example in Figure 1a. The social graph includes the users (Alice, Bob, Cathy, and David), their relationships, their actions (checking in, commenting, and liking), and a physical location (the Golden Gate Bridge).

Consider when Facebook application servers render the page: the individual nodes and edges can be cached and reused, but the aggregated content and privacy checks cannot be reused.

S3.1. Objects and Associations

Objects in TAO are keyed by 64-bit integers globally, structured as follows:

(id) -> (otype, (key -> value)*)

Associations are keyed by the triple (source_id, association_type, destination_id):

(id1, atype, id2) -> (time_int, (key -> value)*)

Notice the time_int field in the association edges, it plays a central role in queries.

Both objects and associations may contain data as key/value pairs. Within this data, a per-type schema enumerates the possible keys, the value types, and default.

S3.2. Objects

In Facebook usage, entities that repeat are represented as objects, such as people, comments, or places. A “Like” action is represented as a one-directional association (this is why you cannot Like a Like). Associations model actions that happen at most once or record state transitions. Hence, there is at most one edge of any type between any two objects - this edge may be bidirectional.

Self-edges are allowed.

S3.3. Associations

Bidirectional edges are modeled as two separate association entries. TAO supports synchronously updating both in its API.

# upserts this association and its inverse if defined
assoc_add(id1, atype, id2, timestamp, (k->v)*)

S3.4. Query API

As expected, a query needs an originating object id, and the association type atype.

An application needs to enumerate all destination ids. Here is where TAO can make optimizations specific to Facebook:

A characteristic of the social graph is that … many of the queries are for the newest subset. This creation-time locality arises whenever an application focuses on recent items. If the Alice in Figure 1 is a famous celebrity then there might be thousands of comments attached to her checkin, but only the most recent ones will be rendered by default.

TAO defines the association list as all objects associated with “id1” and “atype”, in descending order by “time_int” field:

Association List: (id1, atype) -> [ a_new ... a_old ]

Now the API can be described:

  • assoc_range(id1, atype, pos, limit): returns elements of the (id1, atype) association list with pagination. “50 Most recent comments on Alice’s checkin” is a call to assoc_range(632, COMMENT, 0, 50).

  • assoc_time_range(id1, atype, high, low, limit): a time bound version of the above.

  • assoc_get(id1, atype, id2set, high, low): returns all associations (id1, atype, id2) and their time and data, with the restriction id2 is in id2set. The parameters “high” and “low” are time bounds to optimize for large lists.

  • assoc_count(id1, atype) - returns count of association list for (id1, atype), the number of edges originating at id1.

S4.1. Architecture - Storage

TAO relies on MySQL as its backing source of truth. TAO splits its data across logical shards of MySQL database servers, in typical primary replica configuration. A DB table encapsulates all objects, and another table encapsulates all associations.

Each object’s id embeds its shard_id, which ties it to a hosting shard permanently. An association is stored on the shard of the originating object id, id1 - this means writes to the association are concentrated to a single shard.

S4.2. Caching

I think here are two high level points:

  1. Cache server fleets, called tiers, make up the majority of TAO machines, so clients interact directly with the caching servers which implements the complete API.

  2. “Cache servers understand the semantics of their content” and use this fact to aggressively optimize in case of misses, e.g. “a cached count of zero is sufficient to answer a range query” - so we can allow stale reads and issue an async cache refresh.

Bidirectional association updates are not atomic because they almost always span two different shards, and TAO leaves “hanging” bidirectional associations to a background job to eventually repair.

S4.4. Leaders and Followers

This is where Facebook’s unique scale leads to interesting design: to accommodate the sheer volume of requests, TAO takes one step beyond the typical “primary-replica-cache”, to use the same model on its cache tier. This is also to alleviate quadratic all-to-all communication when shards are small and are all served out of a single large fleet of cache machines.

TAO introduces the notion of Leader cache tiers and Follower cache tiers. Clients talk to Follower cache servers only. Follower cache machines either answers the requests, or forwards read misses and writes to the Leader. The Leader cache servers talk to the storage, which depending on region, can be either the MySQL primary or replica.

TAO implements consistency messages within its Leader-Follower cache tiers. When Follower caches handle writes, they send a synchronous request to the Leader shard, but then lets the Leader asynchronously update all other Follower shards later.

As an optimization, when invalidating an association that may truncate/discard data at the Follower caches, “the Leader sends a refill message to notify followers about an association write. If a follower has cached the association, then the refill request triggers a query to the leader to update the follower’s now-stale association list.” This eagerly refills for read queries.

S4.5. Geographical Scale

In this case it is best to refer to the picture in the paper: Figure 2: Multi-region TAO configuration.

The master region sends read misses, writes, and embedded consistency messages to the master database (A).

This is expected like normal “primary-replica-cache” setups.

Consistency messages are delivered to the slave leader (B) as the replication stream updates the slave database.

This is to invalidate caches in the Slave Region.

Slave leader sends writes to the master leader (C) and read misses to the replica DB (D).

Part C is expected as part of single global Master for writes. Part D is to update the local region’s read replica.

S5.2. Implementation - MySQL Mapping

Associations are stored similarly to objects, but to support range queries, their tables have an additional index based on id1, atype, and time. To avoid potentially expensive SELECT COUNT queries, association counts are stored in a separate table.

You can almost write out a good guess of what the query and schema looks like: SELECT id2 FROM assoc WHERE id1 = ... AND atype = ... ORDER BY time DESC LIMIT 6000

S5.3. Implementation - Cache Sharding and Hot Spot Relief

Shards within TAO are mapped to servers using consistent hashing. However, like with rest of Facebook’s unique scale, this is not enough, as TAO takes on the additional step of using shard cloning like CDN cache trees:

TAO rebalances load among followers with shard cloning, in which reads to a shard are served by multiple followers in a tier. Consistency management messages for a cloned shard are sent to all followers hosting that shard.

TAO takes the further step of going client-side caching when access rates exceed certain thresholds, caching the data and version - this fits TAO’s API by eliminating data transfer in replies when data has not changed since the client’s previous request.

S5.4 High-Degree Objects

TAO was designed to cache objects with less than 6,000 associations of the same type. There are implications when calling the assoc_get queries when the caller specifies id1 and id2, but the queried id2 could be in the uncached tail of the association list - this kind of query for high degree id1 may go directly to the database. However, one possible optimization available in the application layer is to invert the query direction if the association is bidirectional and the object id2 has a smaller degree for this association.

I suspect this limitation is also why Facebook cannot easily show me only the comments my friends made in popular post (in a mass group for example).

S6.1. Consistency design

In normal operation TAO provides read-after-write consistency when everything is isolated to a single tier, and async eventual consistency everywhere else - for successful writes. I note this is quite a loose level of consistency, as we mention problems below. TAO propagates its own changeset between the Master region and Slave region structure, which raises an interesting problem in Slave regions.

The changeset cannot always be safely applied to the follower’s cache contents, because the follower’s cache may be stale if the refill or invalidate from a second follower’s update has not yet been delivered. We resolve this race condition in most cases with a version number that is present in the persistent store and the cache. The version number is incremented during each update, so the follower can safely invalidate its local copy of the data if the changeset indicates that its pre-update value was stale.

So in the above paragraph Facebook mentions the problem with conflicting concurrent writes, but it’s not easy to resolve conflicts and it’s very application specific. Otherwise a branch forms in the timeline of an object or association.

In slave regions, this scheme is vulnerable to a rare race condition between cache eviction and storage server update propagation. The slave storage server may hold an older version of a piece of data than what is cached by the caching server, so if the post-changeset entry is evicted from cache and then reloaded from the database, a client may observe a value go back in time in a single follower tier.

This reads like a temporary condition and the write is not lost, so it will eventually propagate back to the user upon complete repair of the data within this follower tier.

Such a situation can only occur if it takes longer for the slave region’s storage server to receive an update than it does for a cached item to be evicted from cache, which is rare in practice.

One of the ways to work around this in TAO is to mark reads as critical, which proxies the read directly to the master region to avoid reusing stale data.

Ideas for further work:

Facebook has shown here how they have pushed the eventually consistent model to a very aggressive limit, and it works for them. Some key design choices in TAO, like tiered caching, geographical distribution, client-side caching above thresholds, leveraging application-specific logic to invert query direction for associations, are all good techniques to try in other projects when appropriate.

I also think there is further work possible to resolve the race condition between Follower caches and Slave region storage. The paper mentioned how Facebook resolves most of the problem with a version number but it’s not mentioned how the application can handle the merge conflicts, besides maybe to show an older version to the user. Maybe if Facebook had a central source of global timestamps in its changeset like TrueTime, this problem can be solved.




Weaving Relations for Cache Performance (Partition Data Across - PAX) - reading notes

2020-11-06 00:00:00 +0000

Paper link: https://research.cs.wisc.edu/multifacet/papers/vldb01_pax.pdf

Paper published: 2001

Impact:

PAX, as this paper originally describes in 2001, is the underlying idea for RCFiles, ORC, and Parquet. https://en.m.wikipedia.org/wiki/RCFile

Summary

Traditional database systems organize records one of two ways

  1. Row-oriented a.k.a N-nary Storage Model slotted pages, or
  2. Column-oriented a.k.a Decomposition Storage Model.

This paper describes a new record organization model that is more cache-optimized than NSM, and at the same time, more efficient than DSM when the number of columns increases. This paper evaluates this new model, called Partition Attributes Across (PAX).

Main Result(s):

This paper demonstrates that PAX, when implemented on a prototype database system (Shore), outperforms NSM in terms of CPU cache efficiency by reducing misses 50-75% when dataset fits within memory. PAX outperforms DSM as the number of query attributes increases.

Content:

S1. Introduction

  1. NSM - stores records contingously starting from the beginning of each disk page, and uses an offset table (slot) at the end of each page to locate the beginning of each record. If a query uses only a fraction of the record’s data (one column/attribute), the CPU cache behavior of NSM is poor: only a fraction of the data transferred to the cache is useful to the query: the item that the query processing algorithm requests and the transfer unit between the memory and the processor are typically not the same size. Loading the cache with useless data (a) wastes bandwidth, (b) pollutes the cache.

  2. DSM - Proposed in 1985 to minimize the unnecessary IO of NSM. DSM partitions the dataset vertically into n sub-relations, each of which is connected by a key. Each attribute value is accessed only when the corresponding attribute is needed. Queries spanning multiple attributes, however, must spend additional time to join each attribute/column together.

  3. PAX - For a given relation, PAX starts by storing data on disk page as NSM does. Within each page, PAX introduces the idea of a “minipage” which groups all the values of a particular attribute together, in a columnar fashion. During a sequential scan (e.g., to apply a predicate on a fraction of the record), PAX fully utilizes the cache resources, because on each cache miss, a group of a single attribute’s values are loaded into the cache together, reconstructing a record. PAX performs a mini-join among minipages, which incurs minimal cost because it does not have to look beyond the page - maintaining high data locality.

By 2001, it’s clear that CPU execution speed was outpacing IO speed, and to unlock the full performance of the processor, computer systems have to more carefully avoid stalls. In a survey of OLTP systems, 50-90% of CPU stall times were due to data cache misses.

The N-ary Storage Model (NSM) has been the traditional storage format for OLTP systems. The dataset is broken into disk pages, where each page stores records sequentially. Each record is inserted into the first available free space of the page. Records have fixed length and variable length attributes, so at the end of every page, a pointer is written to point to the beginning of each record.

NSM is simple and straightforward, but its cache behavior is not optimal when a query executes on a subset of the attributes in a record - which is often the case. Because CPU caches work in blocks, each CPU cache miss will bring into the cache all values adjacent to the selected attribute value, even though none of this extra data will be useful in the computation.

The Decomposition Storage Model (DSM) vertically stripes a relation into columns, each containing only the values of a single attribute. Each attribute/column is scanned completely independently. DSM offers a high degree of spatial locality when sequentially accessing the values of one attribute. Intuitively, DSM works well when a query utilizes a small number of attributes in a relation, but its performance deteriorates when the query involves more attributes. This is because the database system must join all attributes together, and these joins become expensive as the number of attributes and dataset size increases beyond cache or main memory capacity.

S3. PAX

The motivation for PAX is to keep the attribute values of each record on the same page as in NSM, while devising a more cache-friendly algorithm for data inside the page.

PAX vertically partitions the records within each page, grouping these values together as “minipages”. When using PAX, each record resides on the same page as it would reside using NSM, but values of each attribute are grouped together. This requires a “minijoin” with records within the page, but no joins that span across pages.

S3.2 Design

Each newly allocated PAX page contains a page header, and as many minipages as the degree of the relation. The PAX page header contains the number of attributes, the attribute sizes (for fixed length attributes), offsets to the beginning of the minipages, the current number of records on the page and the total space available on the page.

The structure of each PAX minipage is determined as follows:

  • Fixed-length attribute values are stored in F-minipages. At the end of each F-minipage there is a presence bit vector (bitmap) with one entry per record that denotes null values for nullable attributes.
  • Variable-length attribute values are stored in V-minipages. V-minipages are slotted, with pointers to the end of each value. Null values are denoted by null pointers.

S4. Implementation and algorithms for data operations

1. Loads and Inserts:

PAX allocates pages in a similar way that NSM does. The difference is when PAX writes values within each minipage. When variable length values are present, minipage boundaries are adjusted to accommodate records. If a record fits in the page, but a particular attribute value does not fit in the current minipage, PAX requires a reorganization of the page structure by moving minipage boundaries.

At the end of writing a record, PAX calculates the position of each attribute value on the page, stores the value, and updates the presence bitmaps and offset arrays accordingly.

2. Updates:

When implemented in Shore, PAX pages can shrink the same way an NSM page would. In case of expanding a record, when a record grows bigger than its current page, the record is moved to another free page. PAX updates an attribute value by computing the offset of the attribute in the corresponding minipage, and updating the presence bitmaps.

3. Queries

  • Scans

PAX invokes one scan operator per attribute involved in the request query. Each operator sequentially reads values from the corresponding minipage, using computed offsets.

  • Joins

Joins in Shore receive input on top of two scan operators, each reading one relation.

S5. Evaluation

The authors of this paper implemented NSM, DSM, and PAX on top of the Shore database. The authors selected a dataset of memory-resident, fixed-length numeric attributes.

The selection query were variations of this:

select avg(a_p)
from R
where a_q > Low AND a_q < High

It is important to point out that PAX targets optimizing cache behavior, and not affect IO - the unit of disk page does not change over NSM, so theoretical disk IO workload remains the same, and DSM still has an advantage over PAX when it comes to IO optimality over small number of columns.

When compared to NSM when each record is equal or greater than cache block size, NSM incurs a miss every record, but PAX incurs a miss every four records (using the author’s CPU, 32 bytes per cache block, 8 bytes per attribute value). This translates to correspondingly higher execution performance.




Snowflake Data Warehouse reading notes

2020-10-02 00:00:00 +0000

Summary

Snowflake (2016) is a cloud-native data warehouse that’s optimized for large analytical workloads, and provides ACID transactions, over structured and semi-structured data. One key innovation Snowflake offers is what it calls its “multi-cluster, shared data” architecture which improves upon the shared-nothing architecture. Within this blueprint, this paper talks about the three major components: (1) the data storage, (2) the elastic Virtual Warehouse, (3) the central coordination and management system called Cloud Services.

Impact:

Perhaps the most interesting impact is Snowflake, the product, has matured into a complete company providing data warehouse Platform as a Service (Paas). Snowflake Inc has a market cap of $80 billion today.

In more concrete terms, one value that Snowflake provides is a warehouse platform that’s not strictly tied to one of the three major cloud providers (AWS, Google, Microsoft). Meanwhile, Snowflake is also more feature complete, and has more security capabilities built-in, compared to some of the more obvious open source systems like Impala and Presto.

Some high level features of Snowflake advertised: elastic scaling and high availability, accommodates structured and semi-structured/schema-less data, point-in-time travel, and end-to-end security.

S1. Introduction

Cloud providers such as Amazon, Google and Microsoft are now providing platform-as-a-service; the key value they provide are: shared infrastructure, economies of scale, flexiblity, and capability. But traditional data warehouse software predates the cloud - they were designed with small physical clusters of machines in mind, poorly fitting modern cloud platforms.

Data has changed as well. Today’s data are increasing sourced from different origins, increasingly without schema. Traditional data warehousing systems struggle here.

Some open source systems aim to tackle these issues, but they are difficult to deploy without significant engineering effort. Snowflake aims to capture this market share, where a business workload can benefit from the cloud infrastructure, but are not served by open source systems.

“Snowflake is not based on Hadoop, PostgreSQL or the like. The processing engine and most of the other parts have been developed from scratch.”

Snowflake differentiates itself by the following

  1. Software-as-a-service deploy and run
  2. Relational ANSI SQL support and ACID transactions
  3. Support for semi-structured data in JSON and Avro
  4. Elastic scale up and down
  5. High availability and durability
  6. Cost efficient (same as 4)
  7. Fine grained access control

S2. Storage vs. Compute

First we critique the popular layout for high performance data warehouse systems: shared-nothing, where every node has the same responsibilities, over mostly uniform, commodity hardware. A pure shared-nothing architecture couples the compute resources with storage resources.

  1. While the hardware is uniform, the workload is not. A system configured for high IO is poorly fit for complex queries that demand higher compute, and vice versa. This leads to inefficency.
  2. Membership changes causes large data movement, usually leading to significant performance impact, limiting elasticity and availability.
  3. Expanding on 2, online upgrades are harder to roll out when every node is impacted.

These points are tolerable in an onprem system, as is the case with Presto. But in the cloud, node failures are more frequent and performance can vary more significantly. And as a product offering, Snowflake needed to bring online upgrades as a capabiilty. Thus, Snowflake separates storage and compute with two loosely coupled, independently scalable sub-systems: Snowflake has a custom built, caching, shared-nothing Compute engine, and Storage is outsourced to Amazon S3.

I now think this is brilliant for two implications - for (1) scale, and (2) business model. For 1, by simply outsourcing the underlying file block storage to S3, Snowflake can effectively “write-once, query many” by almost infinitely scaling up the number of query nodes attached to read off S3, and outsource the problem of getting arbitrarily large read throughput. For 2, this cost is effectively passed onto the customer, and Snowflake can simply send an invoice for the increased query performance.

S3. Architecture

There are 3 major components:

  • Storage
  • Compute, called “Virtual Warehouses”
  • Coordination/management

Storage

Snowflake chose to use out-of-the-box AWS S3 storage, and understand its tradeoffs, rather than implementing its own HDFS-style of storage.

AWS S3 had the following drawbacks:

  • Higher latency compared to local storage
  • Higher CPU overhead corresponding to every IO request
  • Objects could only be overwritten in full, not appendable

And the following benefits:

  • Supports GET requests for ranges of a file
  • High availability and durability
  • Effectively infinite space for spilling temporary data

Snowflake implemented the following to counterbalance the tradeoffs: (1) local caching, (2) latency skew resilience logic in the compute layer, and (3) a PAX-style physical storage format for the data in each table. And (4), perhaps worthy of emphasis for paging:

“Storing query results in S3 enables new forms of client interactions and simplifies query processing, since it removes the need for server-side cursors found in traditional database systems.”

Virtual Warehouses (Compute Units)

The Snowflake equivalent of a compute cluster is the Virtual Warehouse. VWs are effectively state-less, dynamically scalable. Each individual query runs on exactly one VW. Long running queries are retried from the beginning, without checkpointing (similar to early Google F1 descriptions).

Each worker node in a VW holds an LRU cache of table files, and the PAX philosophy is passed here:

“the cache holds file headers and individual columns of files, since queries download only the columns they need.”

Here’s an interesting insight: Snowflake query optimizer/coordinator “remembers” a mapping of worker node to data they’ve recently cached by consistent hashring.

“To improve the hit rate and avoid redundant caching of individual table files across worker nodes of a VW, the query optimizer assigns input file sets to worker nodes using consistent hashing over table file names [31]. Subsequent or concurrent queries accessing the same table file will therefore do this on the same worker node.”

This technique allows Snowflake to avoid eagerly replacing cache contents when a node fails, or the VW resizes - the underlying node LRU cache will eventually correct itself. This simplifies the system.

Compare and contrast the cache replacement “eagerness” with Facebook TAO – they are opposites of each other.

When one Snowflake VW worker finishes its work early, it can file steal additional work from peer workers for the duration and scope of a current query. This helps alleviate straggler nodes.

Execution Engine

Snowflake has its own, custom built SQL execution engine. The highlights are:

  1. Columnar - integrated and utilizes the optimality of underlying physical data layout
  2. Vectorized - Snowflake avoids intermediate results, and instead processes data in a pipeline fashion, in batches of a few thousand rows.
  3. Push-based - avoids control flow logic within tight loops for better cache efficiency

Snowflake makes use of several additional optimizations outside its execution engine: there no need for a buffer pool - Snowflake simply spills out to “storage” to accommodate large workloads. This trades off pure speed for more capability, and relies on the (outsourced) storage layer to speed up over time.

Snowflake query execution, with its underlying assumption of S3, can consider the dataset to be a fixed set of immutable files.

Cloud Services (Coordinator)

The Cloud Services component emcompasses the query optimizer, transaction manager, and access-control mechanisms of Snowflake. This component is replicated to tolerate transient failures.

  • Query Management and Optimization

CS governs the early stages of each query: parse, resolving objects/file handles, access control, and plan optimization. The optimizer is based on Cascades, without indexes, and automatic statistics building. The optimizer output is distributed to all worker nodes assigned to the query. Cloud Services monitors and tracks the state of the query to collect performance counters, and retry in case of node failures.

  • Concurrency Control

Snowflake is designed for analytical workloads first and foremost: large reads, bulk or trickle inserts, and bulk updates. Snowflake implements ACID transactions via Snapshot Isolation level of consistency.

Snapshot isolation built on top of MVCC is a natural choice for Snowflake, almost a direct consequence of using S3 immutable storage. Snowflake keeps track of file changes in a global key-value metadata store.

  • Pruning

Due to Snowflake’s storage choice, and the target market use pattern, Snowflake forgoes maintaining indices. For Snowflake, indices are too costly, and the requirement of random access is too big of a problem to solve.

Alternatively, Snowflake uses min-max pruning, aka small materialized aggregates, zone maps, and data skipping.

“Here, the system maintains the data distribution information for a given chunk of data, in particular minimum and maximum values within the chunk. Depending on the query predicates, these values can be used to determine that a given chunk of data might not be needed for a given query. “

This technique works well for sequential access across large spans of data, and it does not involve overhead of updating indices when bulk loading.

S4. Extra Features

One section to highlight in exclusive Snowflake features is the ability to optimize data layout for schema-less serialized data: as opposed to Apache Impala and Google Dremel (BigQuery), Snowflake aims to achieve both the flexibility of a schema-less database and the performance of a columnar relational DB. It does so by performing statistical analysis of the data within a single table file (stored PAX style). When Snowflake sees a frequently accessed path within a file, it tries to infer its type, and retrofit it into the optimized compressed columnar format like a native relational data. This also works with the min-max pruning.




AWS Aurora reading notes

2020-09-04 00:00:00 +0000

Paper link:https://www.allthingsdistributed.com/files/p1041-verbitski.pdf

Summary

Aurora’s goal is to increase throughput when moving MySQL to a cloud native environment. AWS engineers revised and revamped the storage and network layers of MySQL, to better utilize the characteristics of a distributed file system and AWS infra/layout.

Main Result(s):

Aurora keeps the “front-end” of a single master MySQL, but with the new storage and network backend, system throughput is significantly improved. Replication delay is very low (in milliseconds); Read throughput scales horizontally up to the maximum limit of 15 read replicas. Write throughput, however, is limited to single master vertical scaling. The limit of total data stored is 64TB (as of 2020).

Impact:

The most significant impact Aurora brings is increased throughput over a default MySQL installation in AWS cloud: lower replication lag, less dilated latency distribution, higher read-scale-out throughput, shorter time to recover master.

The key insight is to keep Aurora single master, and re-implement the read replicas around an ever-forward-rolling redo log. This sidesteps the complex problem of distributed consensus in other distributed databases (e.g. Spanner).

S1. Introduction

  1. MySQL is built with local disks in mind. This assumption does not hold well when operating MySQL in the cloud, where the storage may be mounted further away from the compute. When this happens, latency and performance suffers.

  2. In AWS, we have a true distributed file system, that abstracts away the failures of individual disks. This, combined with other key characteristics of AWS foundational pieces, makes good motivation to rethink the way a cloud installation of MySQL should operate.

  3. Distributed synchronization protocols (like 2PC) are difficult to make performant in large distributed environments. This is a motivating factor to “eliminate multi-phase synchronization” by pushing the limits of a single master writer.

S2. Replication to avoid correlated failures

  1. Aurora aims to use a quorum-based design, but a typical 2/3 quorum is inadequate for AWS scale. It is easy to see why: Assume DataCenters A, B, and C. If DC C fails, we will break quorum with any concurrent failure in DC A or DC B, due to the simplest scenarios like maintenance - we would lose 2/3 copies. Aurora needs to tolerate a DC failure as well as “concurrently occurring background noise failures”.

  2. The chose quorum design is
    • writes can continue after one entire AZ failed, but no more.
    • reads can continue after one entire AZ failed plus one additional node, but no more.
    • total quorum votes is 6: so writes require 4/6 and reads require 3/6.
  3. The breaking up of storage into 10GB segments and distributing them into replicas across AWS makes this act like partitioning the data on top of a distributed file system:

“We instead focus on reducing MTTR to shrink the window of vulnerability to a double fault. We do so by partitioning the database volume into small fixed size segments, currently 10GB in size. These are each replicated 6 ways into Protection Groups (PGs) so that each PG consists of six 10GB segments, organized across three AZs, with two segments in each AZ. A storage volume is a concatenated set of PGs, physically implemented using a large fleet of storage nodes that are provisioned as virtual hosts with attached SSDs using Amazon Elastic Compute Cloud (EC2). The PGs that constitute a volume are allocated as the volume grows. We currently support volumes that can grow up to 64 TB on an unreplicated basis.”

Is the slightly lower quorum requirement for read to help make recovery faster? It would seem so.

S3. Porting the extraneous IO to be cloud-native

  1. MySQL issues multiple concurrent local disk writes, before it issues these multiple writes across the network to a read replica, which then issues its own set of multiple local writes.

  2. These include: the redo log (the diff between before-page and after-page), the statement binlog, the modified data pages, a second temp write (called a double-write, a good description here https://www.percona.com/blog/2006/08/04/innodb-double-write/ and here http://enjoydatabases.blogspot.com/2016/08/innodb-doublewrite-buffer.html ) to prevent torn data pages, and the metadata files.

  3. The traditional MySQL replication, when operated in a distributed environment, is akin to a 4/4 quorum.

  4. With Aurora, only the redo log is replicated across to read replicas, and the distribution is done using the semantics of a distributed and highly available file system, S3. When 4/6 replicas acknowledge, the primary can consider the log records durable.

  5. Read replicas can asynchronously and continuously apply the redo log on top of their buffer caches to materialize the updated records.

S4. Read replicas asynchronously process the primary’s redo log

  1. This paper only provides a sketch of the underlying implementation.
  2. The master can assign a monotonically increasing ID - the Log Sequence Number - to each log record.

“The logic for tracking partially completed transactions and undoing them is kept in the database engine, just as if it were writing to simple disks.”

  1. We don’t need a distributed consensus protocol. This is not a Raft log.

  2. Read Replicas can gossip with each other to fill in gaps up to the maximum LSN.

    • The storage layer determines the max LSN that guarantees availability of all prior log records (Volume Complete LSN, or VCL). During storage recovery, every log record with an LSN larger than the VCL must be truncated.
    • This must be further truncated by the database’s durability criterium:

“The database can, however, further constrain a subset of points that are allowable for truncation by tagging log records and identifying them as CPLs or Consistency Point LSNs. We therefore define VDL or the Volume Durable LSN as the highest CPL that is smaller than or equal to VCL and truncate all log records with LSN greater than the VDL. For example, even if we have the complete data up to LSN 1007, the database may have declared that only 900, 1000, and 1100 are CPLs, in which case, we must truncate at 1000. We are complete to 1007, but only durable to 1000.”

  • Reads can be made consistent by finding the Minimum Read Point LSN across all nodes. Each node in the cluster can compute this.

Follow ups:

There isn’t a clear description of the dual-master Aurora yet. Is it implemented as a hot spare? Or does it use a synchronization mechanism like 2PC or just sync replication between the two master copies? Would be interesting to find out if the synchronization incurs performance penalty in trade of higher availability.




Presto: SQL on Everything - reading notes

2020-07-23 00:00:00 +0000

Summary

Company: Facebook

Paper link: https://prestosql.io/Presto_SQL_on_Everything.pdf

Paper published: 2018

Presto is a flexible OLAP engine: it executes queries ranging from user-facing applications with sub-second latency requirements, to multi-hour ETL jobs that aggregate or join TBs of data. Presto’s main innovation is its Connector API, which lends itself access to high performance data source backends (streams, KV stores, RDBMS, HDFS). So while some other open source OLAP engines focused on optimizing queries for a specific environment (Impala, Spark), Presto decoupled the query execution optimization from the storage IO optimization, which is generalized.

Main Result(s):

Facebook realized in early 2010s that SQL is the most accessible query language, but many systems had multiple incompatible SQL-like support. Presto unifies the “frontend” by providing ANSI SQL interface to different backends, while defining a connector interface that allows developers to integrate Presto with different backends via RPC endpoints. Presto is an extensible and flexible system. Presto’s performance depends heavily on the storage backend it is connected to: Facebook for example made internal storage optimizations (Raptor) for Presto. Today, Presto is widely adopted in many organizations.

Impact:

Presto was originally designed to enable interactive query over Facebook’s data warehouse. It is powering most of the analytical workload (interactive + offline) at Facebook. It is also widely used in other companies in the industry. Presto is available as a managed service as AWS Athena.

S2. Motivating Use Cases

A. Interactive analytics over Facebook data is defined as: 100GB - 1TB compressed, need query results within seconds to minutes. Most of the data sits on HDFS. 50-100 Concurrent users. B. Batch ETL: scheduled by workflow management, tuned for efficiency by data engineers, usually CPU and memory intensive aggregations or joins with large tables. C. AB Testing: expected results in hours, with accuracy in mind. Also, users expect to do further drill-downs, so pre-aggregation of results is not suitable. Multiple joins with large datasets (users, devices). Query shapes are programmatically defined, so this narrows down the scope. D. Advertiser Analytics: user-facing application; queries are highly selective (for individual advertisers); query shapes contain joins and aggregations, or window functions. Replication lag expected in minutes, but query volume is high and latency requirements are strict.

S3. Architecture Overview

Presto cluster consists of a single coordinator and one or more worker nodes. Clients send SQL statements to the coordinator over HTTP.

A split in Presto is an addressable chunk of data in some external storage system. The coordinator evaluates and distributes a query plan into tasks for the workers. Splits are assigned with the tasks. Worker nodes fetch data from the assigned splits. Presto workers execute tasks in a way that’s familiar to distributed computers (Spark, GFS/MapReduce): they cooperatively multi-task, they pipeline results from one stage to the next. When data shuffles between nodes, Presto tunes its buffering to minimize latency.

Presto offers many high-level plugins, the most important of which is the Connector API, composed of: (1) Metadata API, Data Location API, Data Source API, and Data Sink API.

S4. System Design

A. SQL Dialect

Presto implements most of ANSI SQL, plus a few extensions (lambda support, transforms, filter, reduce) on embedded complex types (maps, arrays).

B. Client Interface, Parsing, and Planning

The Presto coordinator listens on HTTP from clients, and also has a JDBC client that interfaces with BI tools. Parsing: Presto uses ANTLR parser to convert SQL statements into AST. Supported functions include subqueries, aggregations, and window functions. Logical Planning: the AST is converted to an intermediate representation (IR) that represents plan nodes, each node representing a physical or logical operation, and the children of that node are its inputs.

C. Query Optimization

The plan optimizer converts a logical plan into a more physically executable structure. The process greedily evaluates transformations until a fixed point. Presto uses well-known optimizing transforms such as predicate and limit pushdown, column pruning, decorrelation. Presto already considers table and column statistics for join strategy selection and join re-ordering. Further work on cost-based optimizer is ongoing.

  1. The optimizer can take account of physical data layout optimizations: the Connector API can report locations and other data properties such as partitioning, sort, grouping, and indexes. In fact, a single piece of data can be provided by multiple physical layouts.

  2. Predicate pushdowns: Presto leverages the most aggressive data retrieval optimization available, pulling only data out of the necessary shard if it’s provided, and using indexes whenever possible.

    “For example, the Developer/Advertiser Analytics use case leverages a proprietary connector built on top of sharded MySQL. The connector divides data into shards that are stored in individual MySQL instances, and can push range or point predicates all the way down to individual shards, ensuring that only matching data is ever read from MySQL. If multiple layouts are present, the engine selects a layout that is indexed on the predicate columns. Efficient index based filtering is very important for the highly selective filters used in the Developer/Advertiser Analytics tools.”

  3. Inter-node parallelism: (resembles MapReduce/Spark) parts of a plan that can be executed across different nodes in parallel are called stages. They are distributed to maximize parallel IO. When the engine shuffles data between stages, the query uses more resources. Thus, there are important considerations to minimize shuffles.

  • Data Layout: when doing large joins in the AB Testing use case, Presto leverages the fact that both tables in the join are partitioned on the same column, so it uses a co-located join strategy to eliminate large shuffles across nodes. Presto can also make use of indexes as nested loop joins.
  • Node properties: nodes can express required and preferred properties while participating in a shuffle.
  1. Intra-node parallelism: when appropriate, Presto exploits single node multi-thread execution, for example hash joins. This also combats natural partition-skew, to a certain degree (Spark for example does not have an automatic mechanism to do the same).

D. Scheduling

Presto internalizes the assignment, execution, and management of tasks in its cluster. The first major theme of Presto scheduling is in its considerations for performance: maximizing parallelism, pipelining, buffering, minimizing shuffles.

Some definitions:

  • A coordinator distributes plan stages to workers.
  • A stage consists of tasks, where a task represents a single unit of processing.
  • A task contains multiple pipelines within it.
  • A pipeline is a chain of operators on the data, and the optimizer can determine the level of local parallelism in a pipeline. Pipelines are linked with in-memory shuffles.
  • The coordinator links stages together with shuffles.
  • Data streams stage to stage as soon as it’s available.

Schedule decisions are made in two parts: (1) a logical ordering of what stages to schedule, (2) determine how many tasks should be scheduled and the physical nodes they run on.

  1. Stage scheduling: Presto has two modes: all-at-once or phased. All-at-once starts all stages concurrently to minimize wallclock time. Phased mode runs an algorithm to find strongly connected components within the directed graph of data flow, which outputs a modified DAG that avoids deadlocks, and then execution may begin in topological order for maximum efficiency.

  2. Task scheduling: Leaf Stages and Intermediate Stages.
    • Leaf stage - these read data from connectors (in the form of splits). In shared-nothing configurations (Facebook Raptor), the scheduler may assign a leaf node to every worker node, to maximize IO parallelism. This exerts pressure on network bandwidth. Presto plugins at Facebook allows a mechanism to express a preference for rack-local versus rack-remote reads.
    • Intermediate stage - these only read data from other stages.
  3. Split scheduling:

    “When reading from a distributed file system, a split might consist of a file path and offsets to a region of the file. For Redis, a split consists of table information, a key and value format, and a list of hosts to query”

Every task in a leaf stage must be assigned one or more splits to become eligible to run. Presto asks connectors to enumerate small batches of splits, and assigns them to tasks lazily. This minimizes time-to-first-result lag, decouples time-to-enumerate splits from query response time, allowing each worker to keep a smaller inflight queue.

E. Query Execution

The core query execution parts of Presto takes inspiration from DB literature.

  1. Local Data Flow:

A worker thread does compute on a data split within a driver loop. The Presto driver loop cites the original Volcano paper, with variations: Presto’s loop is more friendly to yielding, aims for maximizing work in very quanta; also, the unit of in-memory data the driver loop operates on, also called a page in traditional DB literature, is columnar encoded. This is a difference from Impala (2013).

  1. Shuffles:

When transporting data across nodes, workers use a long-pull, in-memory mechanism. This is unlike Spark.

Something interesting Presto does is using a continuous monitor-and-adjust feedback loop to tune the input/output buffer counts and depths, for optimal utilization and minimal latency: when overall utilization is high, it can tune down the concurrency level, increasing fairness, protecting against slow clients unable to consume at the output rate.

  1. Writes:

Presto tries to strike a sweet spot between output write concurrency versus metadata overhead, using the buffer utilization threshold.

F. Resource Management

Being its own distributed system, Presto exercises some freedom in the way it manages CPU and memory, taking lessons learned from other distributed systems (HDFS, Spark).

  1. CPU scheduling:

Presto splits are allowed to run on a thread for a maximum quanta of one second, after which it yields the thread and returns to the queue. This is an effective technique to accommodate slow and fast queries on the same cluster. While it is always challenging to accomplish fair scheduling in practice with arbitrary workloads, the Presto scheduler is effective - it penalizes long running tasks.

  1. Memory management:
  • Memory Pools: Presto splits memory allocation into user and system pools. System memory allocation are side-effects of Presto implementation, e.g. shuffle buffers. User memory allocation are directly related to input/query. Presto kills queries exceeding memory limits.

To handle data skew and the skew of per-node memory usage (for example one data partition consumes 2x median memory), and allowing multiple concurrent queries to run within some memory overcommit capabilities, Presto uses two techniques - spilling and reserve pools.

  • Spilling: Presto supports spilling to disk for hash joins and aggregations, but Facebook disables spilling for the sake of predictable latency.

  • Reserved Pool: When the general memory pool (system + user) of a node is exhausted, the query is “promoted” to use the Reserved Pool of memory across all worker nodes. To prevent deadlocks, only a single query can enter the Reserved Pool across the entire cluster. Obviously, if a node has exhausted its general memory pool AND the Reserved Pool is occupied by some query, all other queries will stall on that node. This is an effective technique to combat skew stragglers spilling to disk, and the trade-offs are clear.

G. Fault Tolerance

Presto relies on clients to retry failed queries. Presto coordinator is a singleton, but Facebook externally coordinates failover to standby coordinators (Zookeeper or Helix will do fine here).

S5. Query Processing Optimizations

Presto made an intense effort to optimize query processing.

A. Working with the JVM

Presto optimizes within the JVM: we know the G1 collector does not work well with large objects. Presto data structures on the critical path of query execution are implemented as flat arrays instead of objects and references.

“For example, the HISTOGRAM aggregation stores the bucket keys and counts for all groups in a set of flat arrays and hash tables instead of maintaining independent objects for each histogram.”

B. Code Generation

  1. Presto uses generated bytecode to evaluate constants, function calls, and references when running over data.

  2. Instead of a generic processing loop, Presto uses the semantics of its query execution:

    • It prevents cross-pollution of JIT compile due to queries being switched across quantas.
    • Because the engine is aware of the types involved in each computation within the processing loop of every single task pipeline, it can generate unrolled loops over columns with elimination of target type variance, making the JIT compiler job easier.
    • JIT compiler profiling of every task is independent

C. File Format Features

Presto makes use of well-known optimizations in row-column optimized data formats, and extends those optimizations to how it processes data while in memory.

Competitive work:

Leading open source competitors of Presto are: Hive (written by Facebook as a SQL interface over HDFS, generating MapReduce jobs), Spark, and Impala. Spark and Impala both integrate more tightly with their respective storage backends, whereas Presto was designed with backend adaptability in mind.