Hot topics, sorted by TFIDF using Lucene:

presto: [1] [2] [3] snowflake: [1] [2] f1: [1] [2] [3] leader: [1] [2] spanner: [1] [2] [3] dremel: [1] [2] [3] tao: [1] [2]


Followup - Lucene

2020-04-19 00:00:00 +0000

Summary

To follow up Lucene, I have a simple use case: given a corpus of documents, e.g. an artist’s most popular songs, a collection of notes, or a blog website such as this one, I can compute the classic TfIdf to get a quick summary of “most descriptive” words in that corpus.

Lucene fundamental data hierarchy:

  • corpus
    • 1 corpus => many documents
      • 1 document => many fields (filename, create_time, content, …)
        • 1 field => many terms (integers, longs, tokenized/non-tokenized strings). field data can be marked as stored and/or inverted.
          • (optionally) 1 term or token => many attributes (position, offsets, flags, part-of-speech tags)

A 4-D View of the Inverted Index

This section from the paper was especially well-written.

The Codec API presents inverted index data as a logical four-dimensional table that can be traversed using enumerators. The dimensions are:

field,  term,  document,  and  position

that is, an imaginary cursor can be advanced along rows and columns of this table in each dimension, and it supports both “next item” and “seek to item” operations, as well as retrieving row and cell data at the current position. For example, given a cursor at field field1 and term term1 the cursor can be advanced along this posting list to the data from document doc1 (and subsequent matching docs d2, d3, …), where the in-document frequency for this term (TF) can be retrieved, and then positional data can be iterated to retrieve consecutive positions, offsets, and payload data at each position within this document.

To output top words sorted by TFIDF from a corpus (lyrics, blog entries, etc.)

The steps should be simple:

  1. index the corpus
  2. retrieve all terms from the index where field=”body”
  3. compute tfidf for each term
  4. sort, truncate to top 10 for example

I have updated this website header to include “top words” as a summary. Each word link to the result of a Lucene search query. The full code is at https://github.com/warrenqi/lucene-lyrics-www




Paper: Apache Lucene 4.0

2020-04-10 00:00:00 +0000

Original paper http://opensearchlab.otago.ac.nz/paper_10.pdf

Summary

This is an overview of Lucene and specifically the updates to version 4. In a way, this paper distills Lucene’s rich feature set into just a handful of pages. A sort of index of index. Lucene version 4 has significant improvements that I find directly applicable to my system at work. I work on a sharded, distributed search/retrieval system at Booking.

This paper inspired me to start a small demo project that searches song lyrics: https://github.com/warrenqi/lucene-lyrics-www

Preface

Lucene is a library and toolbox for search. Version 4 adds significant new features and improvements to the core components, which was already very rich. For example, staring in version 1.2, Lucene already had wildcards, edit distance matching, and full boolean operators. Version 3.6 added regex, custom scoring functions based on value of field.

Importantly, Lucene makes index formats (“Codecs”) separate from the storage layer.

S4 Lucene components

  1. Analysis of incoming content AND queries
  2. Indexing and storage
  3. Searching
  4. Ancillary modules (everything else, e.g. result highlighting)

S4.1 Analysis

Mostly 3 tasks chained together

  1. Optional character filtering and normalization (e.g. remove diacritics/glyphs)
  2. Tokenization (delimiting)
  3. Token filtering (e.g. stemming, stopword removal, n-gram creation)

S4.2 Index and storage

There are many mechanical processes in this component.

  1. A Document consists of 1 or more fields of content. These need to be stored.
  2. Indexing of the documents is lock-free and highly concurrent.
  3. Near Real Time indexing (new in version 4).
  4. Segmented indexing with merging (and pluggable merge policies).
  5. Abstractions to allow for different posting list data structures.
  6. Transactional support.
  7. Support for different scoring models.

S4.3 Querying

Lucene provides a full toolbox of query parsers, as well as a full query parsing framework for developers to write their own query parsers.

S4.4 Ancillary features

  1. Result highlighting (snippets)
  2. Faceting
  3. Spatial search
  4. Document grouping by key
  5. Auto-suggest

S5.1 Foundational pieces

  1. Text analysis This is relevant for full text search.
    • Many attributes can be associated to a token.
    • A token value is also an attribute - it is in fact, the main “term” attribute.
    • Other example: part-of-speech tags
    • Language support and analysis is very feature rich.
  2. Finite State Automata Mostly relevant for improved full text search

S5.2 Indexing

A key part I take away is Lucene’s implementation of incremental update of index data. The actual storage of index data is abstracted to the “Directory API”. There’s a separation of “in-flight” data versus “persisted” data. Updates to index data are split into extents, named as the all-important Segments, and are periodically merged into larger segments.

Lucene assigns internal integer IDs to documents it scans. These IDs are ephemeral - they are used for identifying document data within a single particular Segment, and they’re changed upon Segment compactions.

Two broad types of fields in a document - but a field can belong to either or both:

  1. Fields carrying content to be indexed/inverted.
  2. Fields carrying content to be simply stored

S5.3 Incremental Index Updates

Also known as: how to handle extremely large indexes: divide and conquer.

Periodically, in-memory Segments are flushed to persistent storage (using the Codec and Directory abstractions), upon configurable threshold.

The IndexWriter class is responsible for processing index updates. It uses a pool of DocumentWriters that create new in-memory Segments. As new documents are added (to the end of the index usually), an index compaction/merge runs periodically, and it reduces the total number of Segments that comprise the whole index. Note A single large index is still split into multiple Segments. Version 4 makes the distinction of marking Segments as immutable.

I’m just going to quote this section from the paper because it’s so well written:

Each flush operation or index compaction creates a new commit point, recorded in a global index structure using a two-phase commit. The commit point is a list of segments (and deletions) comprising the whole index at the point in time when the commit operation was successfully completed.

In Lucene 3.x and earlier, some segment data was mutable (for example, the parts containing deletions or field normalization weights), which negatively affected the concurrency of writes and reads – to apply any modifications the index had to be locked, and it was not possible to open the index for reading until the update operation completed and the lock was released.

I’m not sure why Lucene 3 had to lock writes and reads here. The authors probably thought it through though, or the performance would not have improved enough versus fully implementing immutable segments, continuing below:

In Lucene 4.0 the segments are fully immutable (write-once), and any changes are expressed either as new segments or new lists of deletions, both of which create new commit points, and the updated view of the latest version of the index becomes visible when a commit point is recorded using a two-phase commit. This enables lock-free reading operations concurrently with updates, and point-in-time travel by opening the index for reading using some existing past commit point.

S5.3.2 Concurrent IndexReaders

As relevant to multiple Segment writers, a user typically obtains an IndexReader from either a commit point (meaning: data flushed to disk), or directly from an IndexWriter, which includes in-memory Segments.

For performance reasons, deletion is only actually removed during segment merge/compactions. This skews some global statistics slightly.

IndexReader implements the composite pattern: an IndexReader is actually a list of sub-Readers for each segment.

S5.4 Codec and Directory API

This is the exciting stuff.

  1. Experience an inverted index in Four-Dee (4D) !
    • We solve problems by using another layer of abstraction.
    • Kidding aside, this is a brilliant idea. Lucene 4 Codec abstraction opens up different inverted index compression techniques (see RoaringBitmaps). This is possible by viewing an inverted index as a logical, 4 dimensional table consisting of the axis: (1) field, (2) term, (3) document, (4) position.
  2. Codec API improves performance by allowing environment customizations
    • Online write-time sharding/pruning, speeding up reads using Bloom filters
  3. Directory API
    • Lucene uses the file abstraction.
    • Flexible enough to be adopted from in-memory, to KV stores, to SQL stores, to HDFS.

S5.5 Searching

This section is extremely interesting and relevant for full text search. Although, in interest of time, and because I’m now working on a search system that overwhelmingly deals with exact matches, I’m going to defer notes on this section for later.




Google F1: A Distributed SQL Database That Scales - reading notes

2020-03-29 00:00:00 +0000

Original paper https://research.google/pubs/pub41344/

Summary

This was a satisfying paper to read because it builds upon previous Google infrastructure papers, and brings them together from a large-scale application’s point of view. F1 powers AdWords, so it is a critical system in the sense that AdWords is responsible for 90%+ of Google’s revenue.

Themes:

  • For query execution - simplicity brings performance;
  • Build on strong foundational pieces;
  • Rather than minimize contention of a single disk in terms of disk seeks, F1 system minimizes network roundtrips, where each network trip maximizes parallel requests to the remote Spanner storage, which optimally utilizes local resources by recursively building on strong, simple foundations;

F1’s addition to Spanner:

  1. A full SQL layer, and distributed SQL query execution; including joining of (Protobuf native) data from external sources. This is largely the bridge between the data storage and consistency functionality of Spanner, and the AdWords application developers who are familiar with legacy MySQL system.
  2. Transactionally consistent local + global secondary indexes - this is for AdWords application only.
  3. Async schema changes: an external system checks and maintains the system in face of continuous schema changes - this guards against data corruption and maintains availability.
  4. Optimistic Transactions: this is built on top of Spanner’s capabilities, effectively leveraging a CAS implementation combining Spanner’s lock-free snapshot reads and consistent transactions.
  5. Automatic history recording and publishing - another feature for the users, includes a creative and effective cache application.

Key design choices underpinning F1:

  • The data schema is explicit in the sense it exposes data clustering close to the application programmer. This is a shared concept with F1. Concretely, F1 stores data with a hierarchical structure (outlined later). This layout is optimized with Spanner storage in mind, and the result is reduction of RPC trips.
  • Heavy use of batching, parallel processing, and async reads
  • A new ORM library for application developers that surfaces these points. This makes it easier to debug slow queries.

Scale:

In 2012, roughly 100TB, 100s of applications sending queries, and 1000s of users; >100K QPS; 99.999% availability. Latency p50 comparable to sharded MySQL, with tail latency much lower than MySQL.

Architecture overview:

F1 System consists of (1) Client, (2) Load Balancers, (3) the most commonly communicated F1 Server, and (4) F1 Master and (5) F1 Slaves.

F1 Servers (3) are mostly stateless except over the duration of a transaction, during which the F1 Server is a client of Spanner, and holds pessimistic write leases/locks.

F1 Master and Slaves (4 and 5) are responsible during distributed SQL query executions. Masters maintain pool membership over the Slave pool. Slave nodes are responsible for processing the distributed queries on behalf of the coordinating F1 Server that initialized the query.

Scaling the F1 system is trivial - simply add more nodes, which is done quickly. Adding new Spanner nodes results in data re-distribution (copying of Spanner directory files).

Due to the nature of remote data storage, minimum commit latency is relatively high 50-150ms compared to MySQL. The F1 ORM client library exposes this effect and forces the application developer to minimize serial tasks.

Spanner review:

  • Spanner provides consistent transactions using two-phase locking: multiple reads, taking shared or exclusive locks, followed by a single write that upgrades locks and atomically commits.
  • Spanner transactions are most efficient when updating data co-located in a single group of directories. This is relevant for local indexes and Optimistic Transactions.
  • Spanner multi-group transactions involve two-phase commit (2PC) on top of Paxos. This scales well to tens of “participant groups”, not more. This is important for global indexes.

F1 Data model:

Hierarchical schema

  • F1 makes data clustering explicit, mirroring the data layout of Spanner storage. Two types of tables: Root and Child, all have Primary Keys. Child tables must have their primary key prefixed with the parent table’s primary key. The storage is in sorted order. This implies query processing optimizations later in SECTION 8.
  • This interleaving of data rows from both root and children tables means the Spanner storage retains data locality. The paper highlights a comparison to traditional schema where a read from two related data tables incur a doubling of read (hence RPC) requests.

Note here the relationship between F1’s optimization and the true data’s properties: Advertising campaigns are clustered by a finite set of Customers/Advertisers, and the size distribution is not the same as say, a social network’s “users” data set.

Indexing:

  • Indexes are stored as separate tables in Spanner, key by
     (index key -> target table's primary key)
  • There are 2 types of physical index storage: Local and Global.
    • Local index: like child tables, local index make use of Spanner’s directory-local storage; local index updates are fast.
    • Global index: doing this with global consistency is still a hard scalability problem, but it’s mitigated because: Spanner has already auto-sharded the data across many directories, and consistently stored them on multiple servers; writing a single row of global index requires adding a single extra participant to the initiating transaction, incurring a 2PC; this is fine for up to 10s of participants.
  • F1 still encourages application developers to use global indexes sparingly, and to avoid bulk global index updates.

Schema changes:

  • F1 leverages Spanner’s non-blocking schema change feature, and adds some additional features. Schema changes happen often and is buisiness critical.
  • The full design of F1’s schema change subsystem is described in a separate paper.
  • The key features of the subsystem is to enforce (1) at most two schemas are active, and (2) dividing a schema change operation into mutually compatible phases, initiating an automatic MapReduce backfill job, concurrent with new writes.

Transactions:

  • F1’s transaction build on top of Spanner features. Most notably, Optimistic Transactions.
  • Optimistic transaction: (1) Read phase, collect rows and their last modify timestamps; (2) F1 creates a short-lived Spanner pessimistic transaction, involving a re-read, and if successful, a final write.

  • F1 clients defaults to optimistic transactions as a feature. The benefits are highlighted:
    • Tolerance for misbehaving clients: Spanner snapshot reads do not block and do not conflict, so long-running clients do not slow down the system.
    • Long running transactions as a feature: it enables some F1 transactions to involve waiting for user input.
    • Easy to retry transparently in F1 Server, hiding transient Spanner errors from user.
    • Speculative Writes: a MapReduce job can remember timestamps for that read, and speculatively write new data (think of some cache optimizations or execution optimizations based on history). This is a very sophisticated optimization.
  • Optimistic transaction drawbacks:
    • Low throughput under high contention: just imagine incrementing a counter.

Change history recording

  • History recording is another top level feature in F1. Every transaction creates one ore more “ChangeBatch” protobufs, which includes: (1) primary key, (2) complete column values before and after.
  • The ChangeBatch is written as another child table of each root table. For transactions spanning multiple root tables, a ChangeBatch is written per root table row, and includes references to each other.
  • Change history is propagated to consumers in a Pub/Sub system, and consumers sometimes complain of flooding.
  • It is also creatively used in caching for frontend: after a user commits an update, the F1 client reads the cache: it opportunistically looks at 2 places: its own cache (which requires invalidation and re-read) and the change history notifications. If the notification arrives earlier than cache propagation, this is a win.

Client ORM

  • F1 replaces a legacy MySQL DB, which application developers often interfaced via ORM. Traditional ORM has many drawbacks, most of which makes it unsuitable for a dataset the size and scale of AdWords. The traditional MySQL ORM also mismatched against Protobufs.
  • F1 client library aims to resolve most of these challenges. It futher extends SQL to work with Protobuf nested fields.

Query processing

  • F1 executes query either (1) as low-latency central/locally, or (2) long running batched/parallel.
  • All input data and internal data flow is randomly hashed and do not make use of ordering. This as I understand makes implementation simple.
  • Intermediary processors stream data as soon as possible, mirroring implementation of GFS/CFS; this maximizes pipelining, and minimizes buffering, and also goes against sorting/preserving data order in intermediate steps.
  • F1 bets on the speed and reliability of Google’s DC networks: F1 performance is theoretically bounded by network switches, but it has not been a problem in practice at Google’s DCs.
  • F1 (in 2012) does not make query checkpointing, so long running queries may fail and need a full restart. Implementing intermediate checkpointing in distributed query processing is difficult without hurting performance of the common non-failing scenario.
  • Hierarchical table joins are fast across 2 related tables, and only require buffering a single row from each table.
  • Queries interact natively with Protobufs




Paper: The Tail at Scale

2020-03-28 00:00:00 +0000

Original paper https://research.google/pubs/pub40801/

Summary

This was a very practical paper for me because the techniques were directly applicable to my work. I work on a sharded, distributed search/retrieval system at Booking. I have blended in my notes and some applicable details.

Preface

A “coordinator” service that fans out calls to N number of “data” nodes will have to tolerate high tail latency. To counter this effect we can use redundancy: at a high level, by making redundant calls to replicas of the same data node, obtaining the first/fastest response, and discarding/ignoring the slower replicas.

Scenario: A root service makes calls to 21 data nodes. Suppose each data node has 99 percentile latency of 10ms. The probability of all 21 calls responding within 10ms is:

(0.99)^21 = 0.81

Techniques for minimizing tail latency

  1. Keep low level queues short, so higher level policies take effect more quickly. For example: Google File System keep few operations outstanding in the OS disks queue. Instead, this service maintains its own priority queue of pending disk requests. This technique allows servers to serve interactive requests before older requests for batch operations.

  2. Reduce Head Of Line (HOL) blocking. For example: Google Web search uses time-slicing to allow interleaved execution of short running requests. See HTTP/2. Prevent a single large expensive query from adding latency to many smaller/cheaper queries.

  3. Manage background activities, synchronize/orchestrate disruption. For example: compaction in log-oriented storage, garbage collection. Explore this alternative: schedule and synchronize GC pauses at minimal load times to minimize tail latency during high load.

  4. Caching: it does not directly address tail latency – unless entire dataset fits in cache.

Embracing tail latency

  1. Hedged requests. Issue the same request to multiple replicas, use the fastest response; discard, or cancel the remaining outstanding requests. Naive implementations of this method yields unacceptable overhead. Further refinement usually reduces tail latency effects, with minimal overhead.

A typical approach is to defer sending a second request until waiting a period of time == the 95th percentile latency. This limits overhead to approximately 5%.

A Google experiment: 100 node fanout, 10ms delay hedged requests, 999 percentile reduced from 1800ms to 74ms, with just 2% increase in overhead requests.

  1. Tied requests. This is an attempt to further improve (1), by observing that: to permit more aggresive use of hedged requests, without large resource consumption, we require faster cancellation of requests.

Examine a server queue before execution. Mitzenmacher: “allowing a client to choose between 2 servers based on queue depths at enqueue time – exponentially improves load balancing performance over uniform random.”

Tied requests: servers perform cross-server status updates, peer-to-peer. A coordinator sends the same request to 2 servers, each tagged with the identity of the other server. When a servers starts executing a request, it sends a cancellation message to its “tied” peer. The other server can either immediately abort, or deprioritize the queued request.

It is useful for the client to introduce a small delay of 2x the average network message delay (approx 1ms) between sending the 1st and 2nd requests.

Google’s implementation: serving requests that is strictly not cached in memory & must be read from disk. Replication factor is 3. The results are effective in both an isolated environment and a busy environment.

  1. Alternative tied-request: probe remote queues first, then submit to least loaded server – this approach can be less effective than simultaneous tied-requests because: (A) load changes between probe and request; (B) request service times are difficult to estimate; (C) clients can create temporary hot spots when all clients pick the same server.

Operational notes on production

  1. Canary requests: prevent a bad request from nuking all nodes in a fanout. Google IR systems employ “canary requests”: a root server sends a request to one or two leaf servers, and only query remaining nodes if canary responds within resonable time. Otherwise, the system flags the request as dangerous, and prevents further execution. This guards against errors, and DOS attacks.

  2. Mutations: for services that require consistent updates, the most commonly used techniques are quorum-based: they write to multiple replicas; they inherently help tail-tolerance.