SlicingDice Uncovered — Part 3 (S1Search In-depth)

Posted by SlicingDice on Dec 3, 2018 8:29:43 PM

In Slicingdice, Data Warehouse

This is the third post of the SlicingDice Uncovered series. The first post was about the SlicingDice infrastructure and the previous one described what is S1Search and how it works.

We actually didn’t expect to write this third post, but after the latest post covering an overview about S1Search, we received many messages requesting to dig deeper on a few parts of S1Search. So that’s what we will be doing here.

The next and final post of the uncovered series will talk about how we handle data integrity and scalability tests.

Let the games begin!

We start presenting the main files kept by a shard that resides in a S1Search node, then explain how these files work together and finally relate all of them to columns and queries.

Inverted Indexes and Posting Lists

The main idea to being able to answer queries in sub-second time is to place one’s data in a format from which answers to one’s favorite questions are available almost instantly. Considering that one’s favorite questions might be diverse, it is usually necessary to have data written in different formats.

Indexes are data structures usually created to allow fast search over a huge amount of data and Inverted Index is a particular type of index that is used extensively by S1Search. We start by introducing this data structure.

To begin with, an inverted index is built for a specific column of a table and it allows us to find out whether a given value is present on that column without having to run a full scan on this column. In case a given value is present on more than one row in that column, a list of every row containing the target value must be kept by the inverted index.

This list of entities containing a given value on a given column is called a posting list and the inverted index is a data structure that maps existing values on a column to their corresponding posting lists.

In addition to keeping entity identifiers in a list, a posting list might contain more information about each entity.

For a simple example, let’s consider that entities of a table represent web pages and that we’re building an inverted index for the column ‘keywords present in the web page’. For every known keyword in the dataset a posting list exists indicating which web pages contain that word. Since a given word might appear more than once on a web page, we might want to keep an additional piece of data for every entry on the posting list: the total number of occurrences of the word in the page.

With this composite posting list we not only can search entities matching a target value, but we can use the extra piece of information to rank pages according to its relevance to our search criteria. More complex posting lists entries can be designed, and the more information it holds, the more we can extract from it when executing queries.

How S1Search uses Inverted Indexes

The traditional way of using an inverted index is to keep it as an auxiliary data structure: some queries run on the table itself, some run on the inverted index, and some run on both of them. For example, you could use an inverted index to find Entity IDs of all pages containing the word “SlicingDice” and then retrieve from the table the URLs of these pages and the owner of the page.

By noticing that keeping data both on the column and on the inverted index is some sort of redundancy, S1Search developers decided to keep only the latter. These are key concepts to S1Search:

  • It keeps an inverted index for every column in the system.
  • It does not keep a materialized view of the column by default.

So how good is it? Inverted Indexes are good for answering questions of the type “which entities have a given value on a given column?” It is even possible to apply set operations and find answers to rather complex questions quite quickly, but these answers end up being a list of entities or maybe the size of such list.

Sometimes, however, the important question is “what is the value of a given user in this column?”, and for this one inverted indexes do not perform well. For this case a materialized view of the target column allows much faster recovery of the information. This is a case in which keeping both views of a column is required in order to answer one’s favorite questions.

A key difference of S1Search to traditional databases is regarding which columns to keep an index and which column to keep a materialized view. Traditional databases keep a materialized view of every column and indexes of few columns whereas S1Search keeps indexes of all columns and materialized view of few ones.

Inverted Index for Time-Series Columns

The description of inverted indexes so far works well for non-time-series data such as “username”, “gender” and “date of birth”. What if we want to keep track of user activity over time, such user access to a given web page? In other words, what about inverted indexes for time-series data?

S1Search approaches this problem by having one inverted index per time slice, where the size of the time slice is configurable and can be brought down to seconds. As you may have imagined, the problem with second-level Inverted Indexes is that one ends up with a huge number files in the file system.

In order to keep the number of used files to a reasonable amount, S1Search keeps second-level resolution files only for the last week of data. By doing this, S1Search developers assumed that one does not usually need second-level resolution on cold data. As data gets older it becomes available on hour-level resolution and as it gets even older, day-level resolution.

With or without this reduction of number of files, it should be clear that multiple indexes must be read when querying on a time-series column and that, since there is data redundancy, there might be many choices of which files to read. S1Search, being stingy as it is, selects the option that minimizes the number of files to be read.

Think about it: if we were to run a query on time-series data from yesterday, we could either read one file for each one of the 24 hours of day, or simply read the index using the day-level resolution file. We obviously chose the latter approach.

But dropping data resolution as data gets old might not be enough. If all Inverted Indexes were kept on its own file then the number of files on the system would not make sense.

To see this, consider that a customer opts for storing data for 3 months, that the last 7 days are kept under second-level resolution, that the last 30 days are kept under hour-level resolution, and that the last 90 days are kept under day-level resolution. The number of Inverted Indexes required for one column in the system would be:

(7 days) x (86400 second-level-index/day) + (30 days) x (24 hour-level-indexes/day) + 90 day-level-index = <strong>605610</strong>

Since we expect to support many columns and many customers, something must be done about this number.

The way we approached this was by developing our custom implementation of Inverted Indexes where we can keep several of them in one file. We currently group together all seconds of an hour and all hours of a day in what we call a second-level file and hour-level file, respectively. Each day has also its own day-level file.

With this clustering, the number of Inverted Indexes required for one column in the system is:

(7 days) x (24 second-level-index/day)  +  (30 hour-level-index)    + (30 month-level-index/month) x (3 months) = <strong>288</strong>

What an improvement, don’t you think? Of course some management is required, but it is totally worth.

An obvious sanity check for the implementation is that accumulating all posting lists of all seconds of an hour must produce the same result as reading directly the posting list of that hour. This implies that all three level inverted indexes must be added on the processing of a new data entry.

For an example, say a data entry contains (id i, value V, time “2016–01–01 16:27:01”). This means three posting lists need to be updated: the second level posting list corresponding to “2016–01–01 16:27:01”, the hour level posting list corresponding to “2016–01–01 16:00:00” and the day level posting list corresponding to “2016–01–01 00:00:00”.

It should be clear that we’ll spend three times more on writing operations and three times more storage, but consider that this reduces up to 86400 times of data to be read during query time when compared to reading all second-level posting lists that might fit a day.

The solution of having multiple indexes inside one file, however, only solves the problem of having way too many files in the file system. The amount of data to be stored is roughly the same when compared to keeping one index at its own file (although some saving is gained since we avoid repeating file headers). And this is another reason why we care so much about data compression.

Squeezing some segments

Another optimization applied relies on the fact that most values are related to very few entities. That is to say that most posting lists aren’t longer than a handful of Entity IDs. As a consequence, our data structures must be optimized to deal with this collection of many short lists.

The way we leverage this fact is by allowing data segments that are much shorter than the regular OS storage segment. More on these segments soon, but for a short introduction consider that a data segment is the smallest amount of space allocated to any data structure and that in these short lists the segment is likely much larger than the entire data structure itself.

As these short lists are the common case, reducing the segment size presented improvements as impressive as an economy of 4x of storage.

Moving segments to trash

Another advantage that arises from having one index per time slice is that we can easily remove old data from the system: S1Search has simply to remove files with expired data. This is important to free space and allow new hot data to get in, and it is also a requirement for Slicing Dice pricing model.

This behavior is also a consequence of the motivations that led to the development of S1Search: it was designed to handle mostly hot data, that could be used for immediate insights and actions.

Implementation details on these inverted index files will be given later on another post. For the discussion that follows it should be enough to keep in mind that out Inverted Indexes provide a map of one of these types:

  • (value) → list of postings, each posting containing one Entity ID
  • (value, timestamp) → list of postings, each posting containing a pair of Entity ID and frequency

What if we need the real data value (materialized view)

We now move to other important file types managed by S1Search. As we’ve mentioned, inverted indexes perform well for some questions, but a materialized view might be required for some other queries. This materialized view of a column is what we keep for the so called “stored columns” in instances of a data structure we call “stored index”.

The reason the materialized view is also called “index” is that we are only able to read the materialization by reading its index (whose key is the Entity ID). Think of this as our way of keeping the table’s primary key column sorted while coping with the fact that new entries arrive out of order.

With stored indexed we achieve:

  1. Keys are sorted in a way to allow key retrieval in log(n).
  2. The data structure is append-only, which allows fast insertion.

The model to keep in mind is that these stored indexes creates one of the following mappings:

  • Entity ID → value (as in the case of an overwritable column such as “Marital Status”)
  • Entity ID → list of values (as in the case of a non-overwritable column such as “Shopping List”)

The holy glory of global dictionary

There is one detail that we’ve hidden so far that we need to introduce in order to justify the next data structure: in all data structures mentioned so far the data written to disk is not the value itself but rather its hash. Except by the dictionary data structure, everywhere else on S1Search we store just hashes.

The dictionary is used for translation

Reasons for writing a hash instead of a string are many, the main ones being: it uses less space and it allows faster comparison of keys (although alphabetical order is lost). For the sake of producing query results that are human readable, however, we must be able to translate a hash back to its originating string.

This is done by keeping a global translation dictionary from hashes to the original strings for every string column on the system. This dictionary is a binary search tree whose key is a hash and whose value is the original string. Or close to it. It is actually an information that allow us to retrieve the original string.

We’ll need to talk about yet another data structure before giving the complete picture of the translation process. But before moving to the final main file data structure, here is one more cent on this translation dictionary file type.

Dictionary sharding and redundancy

Consider that different entities might have the same column and value. Also don’t forget that, because of our sharding scheme, these entities might not be stored on the same shard and node. If every node kept its own translation dictionary, we would end up with repeated string values in our cluster — something that we see at SlicingDice as a waste of space.

In order to avoid this, we decided to centralize the translation dictionary for a given column in one shard, that is hosted by one node. In order to decide which shard manages a given column’s dictionary, we map the column’s name to our shards using the Consistent Hashing technique.

Because we can’t rely on a given shard and node being up at all times, replication is applied and, in fact, multiple shards and nodes are responsible for storing a column’s dictionary. So when node A realizes a message contains data of a column whose dictionary belongs to a shard hosted on another node B, node A sends a message to node B in order to register the mapping between hash and string.

Important detail to keep in mind: node A does not contact node B directly, it actually sends a message to a Kafka topic that concentrates translation messages for a dictionary shard.

To make clear why a node A ends up with a message M with a column whose dictionary is kept by another shard, hosted on node B, consider that:

  • M has the form (EntityID 10, ColumnName, ColumnValue).
  • Node A received this message because “EntityID 10” belongs to shard 1, that is currently hosted on node A;
  • However, when it comes to translation dictionary, “ColumnName” was mapped to shard 30, that is currently hosted on node B.

The dictionary is used to support top-k and “LIKE” queries

We are not done with the dictionary capabilities yet, there is a last but not least important dictionary function: support top-k and LIKE queries.

A use of such queries can be found on a Business Intelligence tool that needs a blazing fast answer in order to suggest values in a search form. For instance, the tool could suggest the most commonly used values, or maybe values with high frequency in the data set.

A way of doing this on S1Search would be to query the distributed Inverted Index and accumulate the total number of users on every value of a column and then rank values according to this frequency of occurrence. But, as you can imagine, this might not be fast enough and is quite computationally intensive.

A similar but much better way of computing most frequent values is to have these frequency of occurrence all pre-calculated somewhere, and then S1Search only needs to read it all and rank values.

This is precisely what our last main data structure does: for every value existing on a column, it keeps track of the number of entities containing that value and it allows top-k types of queries. We call this data structure Frequency Dictionary since, as in translation dictionary, it is composed of a binary search tree whose key is a value — this time, the value string itself — and the value of the tree node is the total number of occurrences of that value.

We decided to keep strings in this data structure in order to allow top-k queries with search criteria similar to a SQL LIKE statement, such as “contains”, “starts with” and “matches precisely”.

Finally, the frequency dictionary of a column is distributed among S1Search shards together with its translation dictionary. Therefore a node in the system holds both dictionaries for a given column.

Making sense of all these data structures together

With these four data structures (inverted index, stored index, frequency dictionary, and translation dictionary) we are able to answer basically any question a traditional database with one table can answer by querying every node in our system and aggregating the result in the client.

Putting it all together here is a list of information that is kept within a S1Search cluster for a given column:

  • Inverted Index: Mapping of value hash → list of postings; each posting being either an (id) or a tuple (id, frequency).
  • Stored Index: mapping of ID → value hash.
  • Frequency Dictionary: map of value string → number of occurrences.
  • Translation Dictionary: map of value hash → address of frequency dictionary where to read the string of the value itself.

Going deeper — S1Search Segment Data Structure Overview

The next section is going to give some details on how supported queries are executed by S1Search. In order to cover this material, we need first to have a picture of the implementation of our file data structures.

First thing to notice is that we’re talking about file data structures. This is in contrast to in-memory data structure where data resides entirely on RAM. Many issues arrive when translating directly an in-memory implementation to a file implementation, most of them related to the fact that storage access time is a function of the current position and the target position.

The reason for keeping the data structure in a file is that the amount of data to be stored in the production environment does not fit entirely in memory, thus S1Search must use data structures and algorithms that load raw bytes from files and interpret them as needed. Furthermore, for performance reasons, selected parts of the data structure might be kept in-memory.

Without going too much into the bits and bytes of the process, you can imagine that our data structures consist of linking together segments of bytes in a file. A segment is simply a sequential address space in a file, such as the bytes in the address between 0 and 1023.

The linking part is done by writing somewhere in this segment an address to the next segment. Furthermore, a segment is usually broken into two parts:

  • A header containing information of the segment type, owner and possibly the address of another segment.
  • A payload, with part of the data of which this segment belongs to. For example, if a given segment is used to hold data of a posting list, the payload might contain part of the list of integers of the posting list.

The reason the entire posting list is not allocated into one large enough segment is that the posting list grows with time and we don’t know a priori how many bytes it will take to store the entire posting list in its final state.

Additionally, while the posting list does not reach its final state we’ll likely add another segment right after its first one to represent part of another posting list. In general, linked segments are a way of creating data structures of arbitrary growing size out of fixed size byte arrays. All sort of data structures can be built with linked segments, and the most important ones for S1Search are binary search trees and sorted linked array lists.

A challenge that must be taken into account is that we must be able to hold many instances of these data structures in one file. In fact, S1Search often has many instances of heterogeneous data structures in one file.

Here are some details on the data structures mentioned so far:

Inverted index

One inverted index object handles data for many time slices within a time period. One instance contains one logical inverted index per time slice on its domain. Depending on the time granularity of the instance: it might be all seconds of an hour in one instance, all hours of a day in one instance or simply the data of a single day in one instance.

One instance of an inverted index manages two files: one for posting lists and another for indexes.

Posting lists files

  • Each segment on the file belongs to a posting list.
  • The header indicates the owner of the segment. That is, it indicates the value and the timestamp of the posting list to which the segment belongs to.
  • All posting lists in this file contains timestamps of the same time period (which does not mean they are the same timestamp).
  • Locating a posting list consists of searching this file for all segments with a given owner identified by the tuple (value V, timestamp T).

— Indexes files

  • The index file is used in the task of locating all segments of a given posting list. This is done in two steps.
  • Initially a first level index is searched to solve the value V. The located node points to an address in the same file where a second index starts. All nodes in this second index - whose key is the timestamp T - have the same value V located in the first level.
  • The value of a node on this second index is an address of the same file where it starts a skip pointers list to the segments of the posting list of the located owner (value V, time-stamp T).
  • The skip pointer list is a list that might span itself multiple segments and whose entries are of the type (address of a posting list segment, maximum ID in that segment). The address contained in one of these entries is to be read in the posting lists file address space.
  • The order described is not unique. Depending on the type of column, the inverted index might first solve the timestamp and only then the value.
  • As for what data structure is the index itself, there are a few options: a binary search tree, a sorted list or even an unsorted list.

Our two-file inverted index implementation is shown on the picture bellow. A sorted list is used as the first level index and a binary search tree as second level index. Note also that this is a case in which we first solve value and then timestamp (or date). The posting list file contains extra segments that were left in blank: those are segments that belong to other owners other than the target key.

Finally, the magic, how queries are performed

We are now able to describe how some of our queries are executed. Not all query types will be covered, but hopefully representative examples should introduce most of the concepts behind S1Search’s query processor engine.

SlicingDice mostly answers these types of queries:

  1. How many IDs satisfy a given condition?
  2. Which IDs satisfy a given condition?
  3. Rank users according to this condition.
  4. What is the value on column X of IDs that satisfy a given condition?
  5. What are the top k most frequent values on this column?
  6. Build an n-dimensional cube by crossing values of a list of columns.

We divide these queries into categories: 1–4 are called simple look-up queries, 5 is top-k and 6 is called an aggregation query

  • Simple queries are simple because the answer for them can be read almost directly from some inverted index, maybe requiring a few operations to join information from a few of them.
  • Top-k queries run on frequency dictionary.
  • Aggregation queries require full scan of potentially several posting list files to compute the final result.

Query type 1–4: Simple Index Look-up Queries

Note that types 1–4 are simply different types of computations done with a given condition. A condition is a predicate that is evaluated on an entity (a function from the entities domain into the set {true, false}).

Supported conditions are presented as follows. We first present a visualization of each condition and then the precise predicate that is evaluated. While the predicate is important for understanding implementation details, we hope the picture is easier to keep in mind.

In the following figure, each cell is a posting list and filled cells must be read from disk or from cache during query processing. Note that queries on non-time-series data are represented by loading data of row associated with time-stamp 0 (zero).

Listed below is a description of the predicate evaluated at each condition.

  • Equals on a non-time-series column:
    “Entity has value V on column C
  • Frequency-recency on a time-series column:
    “Entity occurred at least K times on column C value V between date S and E
  • Group frequency-recency on a time-series column:
    “Entity occurred at least K times when considering value V1 on column C1 between dates S1 and E1, value V2 on column C2 between dates S2 and E2, …, value Vn on column Cn between dates Sn and En
  • Range on values of a numeric non-time-series numeric column:
     — “Entity has value in the interval [VS, VE) on column C
  • Range both on values and on time on a numeric-time series column:
     — “Entity occurred at least C times when considering values in the range [Vs, Ve) on date-times between [S, E)
  • Belongs to:
     — “Entity belong to table” or “Entity has some data on the table”
  • All predicates can be (recursively) combined with OR, AND and NOT operators to build yet another predicate.

Take the first condition and say we need to compute a list of IDs to which the condition holds true. On S1Search this is done by simply loading the posting list for value V on column C. Easy like that.

For the second condition on, though, S1Search has to load several posting lists and OR/AND/NOT them together. This is done by encoding every list as a bitmap and then applying bitwise operations.

Bitwise operations are fast as they allow many entities to be processed at a single CPU instruction. Once the final bitmap is ready, we convert it back to a list of integers format before returning the final result.

If S1Search needed to count the number of entities satisfying the condition, we would simply count the number of bits set on the bitmap.

If S1Search needs to rank users it will first build the bitmap and then read posting list files again in order to process and accumulate frequency of occurrence on every composite posting list it reads, but do so only for entities it already knows the condition holds.

The same goes for retrieving values of a column of IDs holding a condition: the condition is evaluated to a bitmap and entities whose bits are set on the bitmap have their values retrieved from stored indexes.

Query type 5: Top-k

A top-k query on a column is simply a matter of reading the entire frequency dictionary while keeping track of values with the highest frequencies seen.

Note that frequency dictionary is implemented as a binary search tree whose search criteria is the string of the value and not the frequency of occurrence, the result being that the full dictionary must be scanned at every top-k query.

For future comparison with aggregation query, recall that the frequency dictionary for a column is kept by only one shard, in one node (disregarding the replication), and thus this node is able to compute itself the top-k terms for the entire system.

Query type 6: Aggregation

The goal of an aggregation query is to build and hyper-dimensional table (or cube) where each entry of the table contains the number of entities satisfying the condition on each of the axis.

Easier shown than described, here is a type of table that can be built with aggregation queries by crossing columns ‘gender’, ‘country’, ‘plan’ and ‘month of subscription’.

Tables as the above are usually one of the main outputs of analytics databases and might be used to uncover unknown trends among entities. There is really a lot that can be taken out of these tables and S1Search’s goal is to feed such a table with up to the minute data.

The query processor that undertakes this task does it by reading data from posting list segments files. The process starts by reading the posting list file of the first column to be processed and materializing an in-memory view of the Inverted Index. Because we want to avoid materializing an in-memory copy of our entire data, only the most populated posting lists are kept. This is extremely important for the next steps as you shall see.

The second step is to process the second column in the list, while ANDing together every processed posting list on this second column to every top posting list kept from the first step.

If there are C posting lists in the first level, and S values on the second level, there shall be C x S posting lists materialized at the end of the second step. Before moving to the next step we keep only the top populated posting lists again.

The process repeats until all columns have been processed with the output of one step being the input for the next one.

The importance of keeping only top value between steps is that this reduces the amount of memory required by each step. This approach has limitations, of course, but it is possible as long as the application is interested mostly on frequent values — a common scenario for BI and analytics applications.

Note should be taken that the order of columns matter when computing this overlap aggregation. For instance, say we’re overlapping columns A and B. Then:

  • “A x B” means: “top values belonging to column B by those users that contains the top values of A on column A”
  • “B x A” means: “top values belonging to column A by those users that contains the top values of B on column B”

Another important issue (that might be seen as a limitation) is regarding the definition of top values. Recall that the process described earlier runs on top of posting list files (which belongs to Inverted Index objects) and that the full Inverted Index is distributed among S1Search shards. This way a completely correct election of top values for a given column can only be taken by considering every node’s data, which we do not do.

S1Search’s overlap processor takes this decision locally. The assumption here is that top values are approximately the same among nodes and in order to couple with this issue we keep twice the number of values than the configured one. Thus, if the customer selects a k filter factor between steps of the overlap, we actually keep 2k values between steps.

The second assumption implied by this method is that even though the top-k values might change among shards, the true list of top-k values is present within the top-2k values on every node. Although not guaranteed, this assumption is reasonable since we evenly distribute new Entity IDs among nodes and there is no reason to believe a given popular value would have exceptionally lower occurrence in one particular node when compared to the others.

Finally, the client must aggregate partial results received from every node in order to produce the final answer. If you are thinking offloading work to the client is a trend on S1Search, you’re correct.

For extra use cases, the process can be further configured to use some sort of filter at every step. In the table shown in the example, one could fix the countries in which one is interested — for example, to consider only countries in America.

This way the processing step handling overlap with the column “country” would not require to materialize every posting list read from file. Instead it would consider only selected posting lists.

Possible filters include a list of values, a condition or some numerical expression in case of numerical columns.