SlicingDice Uncovered — Part 2 (S1Search)

Posted by SlicingDice on Dec 3, 2018 8:31:54 PM

In Data Warehouse, Slicingdice

This is the second post of the SlicingDice Uncovered series, where we intent to give a great and in-depth overview about everything that powers SlicingDice. The first post described our infrastructure details, so in this post we will dig deeper in our in-house developed database: S1Search. The next posts of the SlicingDice Uncovered series will talk about how we handle integrity and scalability tests and more. (Update: there is also a third unexpected post describing s1search internals in-depth.)

As you might have learned from part 1 of the uncovered series, SlicingDice runs on top of a database called S1Search. This post introduces the design principles behind S1Search that are important for understanding many of SlicingDice’s characteristics. If you haven’t read about the S1Search development history, below is a quick recap for the completude of this post.


Technically speaking, what is S1Search?

S1Search is a Java-based Data Warehouse and Analytics Database — a special type of database optimized to store data such that analytical queries can be executed really fast, even when dataset size is huge.

Furthermore, S1Search is a distributed non-ACID eventually consistent non-relational database:

  1. Distributed: Data is spread among several machines for greater performance and reliability.
  2. Non-ACID eventually consistent: Data is eventually written to wherever machine it needs to be written.
  3. Non-relational database: It was not built to support all guarantees and constraints of relational models.

S1Search was strongly inspired by Apache Lucene’s (and ElasticSearch’s) data insertion techniques and search engine concepts — such as Inverted Index — , also combining well-known concepts from other successful performant databases — like column-store data modeling.


So why did we build S1Search?


S1Search was built in the intent to store data as cheaply as possible while being able to query data in sub-second time. As a way to achieve this, S1Search favors economy both on storage and query time over strong guarantees usually granted by relational databases.

This is possible due to the fact that S1Search was built for applications where a fast but possibly out-of-date response is preferable to a completely updated but potentially slower answer. Besides, S1Search was designed to be scalable and run on cheap commodity hardware. These principles are behind many of the design choices made during the development of S1Search: from architecture to consistency protocol, we want to make S1Search light, cheap and fast.


Moving to bits and bytes

Now that you know what we expected S1Search to achieve, we move on to describing how we got there.


We start with a high level overview of S1Search internals and processing flows, passing through its architecture, data model and distribution, and finally covering the most important flows: data insertion and querying. By the end of this post we will also cover a little bit on some optimizations we implemented on S1Search in order to fulfill our “light, cheap and fast” goal.

At the end of this post there is a list of many references we used to learn about databases and consequently implement things on S1Search.

That being said, let’s rock.

What does S1Search look like?

S1Search implements a client-server architecture: an application instantiates a client which submits queries to be processed by the S1Search cluster — the machines that actually store data. On the server side, nodes are as independent from each other as possible following the principles of a shared nothing architecture. As for the client, it is a Python class that handles communication to the cluster and coordinates requests.

To further explain how the client coordinates things, we need to understand what it coordinates in the first place. Hence we now present the data model supported by S1Search and the scheme used to partition data among cluster nodes.

S1Search’s Data Model

Databases are usually classified according to the abstraction they provide to the system using it. Many traditional databases follow a so called relational model which is backed by sound mathematical theory. Although broadly adopted, alternatives to this model have been growing consistently from the 2000’s on as a multitude of databases were created with new models that allow cheaper management of big data.

The so called non-relational databases span a list of categories that include: key-value store, document store, graph database, wide-column store and others. With so many competing models on the database ecosystem, it should be clear that a definitive solution has not yet been found and that there is still space for innovation. This is the case of S1Search: although similarities can be traced, none of the known models are exactly a description to what S1Search implements.

S1Search was not developed with any model in mind and, as a result, it has its own representation of data. However, you may follow the model explained in this section to organize your data when using S1Search as your data warehouse solution.

First things first, S1Search stores data of entities, so every data must be associated with an entity. An entity is an instance of whatever the application wants to store data about in S1Search and it is identified by a numerical value. You may think of it as the primary key of a record, although this equivalence is not exact. But what is data?

S1Search natively supports two types of data: time-series and non-time-series data.

A time-series data entry is a triplet (Column Name, Column Value, Timestamp) and a non-time-series data entry is a tuple (Column Name, Column Value), where:

  1. Column Name is a string that must have been registered beforehand as a column.
  2. Column Value is any string or number depending on the type of the column being added.
  3. Timestamp is a formatted date string such as “2017–03–14T15:05:43Z”.

Note that these tuples/triples are added independently: if you ever think you are adding many rows of data to S1Search at once, this is simply syntax sugar. Internally these data is split into distinct tuples or triplets. Therefore, a complete data record in S1Search associates either a tuple or a triplet to one entity and many tuples/triplets can be associated with a single entity.

With all that in mind, a good picture of S1Search’s data model is a table whose sorting key is the Entity Identifier (Entity ID) and whose content is a list of tuples and triplets. This data visualization is called a materialized view.

Additionally to the materialized view S1Search keeps data in a format called inverted index view. Hence S1Search always provide two views of these tuples and triplets: a materialized view and an inverted index view. By using a materialized view, one can query the value of an entity on a column and the inverted index view allows one to query for all entities with a value on a column.

Example of Materialized Data Structure


Example of Inverted Index Data Structure


How S1Search Partition its Data

The problem with any big table is that it eventually gets too big for a single machine to manage it. By “too big” we mean that it gets too slow to query for entries in the table.

A common solution is to partition a big table into smaller ones and give each partition to an independent machine. The usual way of splitting the table is by cutting it horizontally: for instance, call rows 0 to 1,000,000 the first partition of the table and set a machine to handle it. This technique is called sharding and is broadly used by S1Search.

The way data is distributed among S1Search nodes is by following a mapping rule from the Entity ID (a numerical value) to a set of partition IDs (called a shard) to a node responsible for storing this shard.

A simple sharding scheme consists of keeping in the same shard a quantity of sequential Entity IDs, such as having IDs from 0 to 999,999 in the first shard, IDs from 1,000,000 to 1,999,999 in the second shard and so on. A shard is then identified by its index in this sequence — the first shard has ID 1, the second has ID 2 and so on — and they are distributed evenly among nodes.

For redundancy reasons, there might be more than one node responsible for a shard, in which case all responsible nodes must independently store to its system the same set of stored data. The advantage of this design is that shards might be added as the system grows and might be moved between nodes if a node is to be shut down. As a result, sharding is a technique applied for the sake of horizontal scalability of the system.

Summing up, every data inserted on S1Search must belong to some Entity ID, and as Entity IDs are divided among existing shards. All data belonging to an Entity ID will end up being stored in the same shard, instead of being distributed among all the shards.

The perfect match — S1Search and Apache Kafka

If you haven’t heard of Kafka so far, here is a quick overview:

Kafka is a publish-subscribe mechanism that detaches data production from data consumption. Messages sent to Kafka are organized into topics, to which multiple clients writes and from which multiple server nodes read. Each topic is organized into partitions in which total ordering is guaranteed. By providing this total ordering, Kafka might be used as a commit log system and this its second functionality to S1Search. Furthermore, Kafka is a distributed system running on multiple servers and with redundancy storage for reliability.

For more information on Apache Kafka, this book could be a good start.

Apache Kafka has two main roles in S1Search’s pipeline:

1. It servers as a buffer of messages to be processed.

2. It also works as a tool for failure recovery.

Buffering messages is important in case the system is under heavy load and cannot process all messages immediately. In this scenario messages are held on Kafka until the cluster is available to process them.

As for failure recovery, consumed Kafka messages are not immediately removed from its logs: they can be reprocessed in case a node fails or needs to reboot.

One important note regarding the use of Kafka is that when an application sends a message using the S1Search client, the client returns as soon as the message is added to the in-memory message buffer of the Kafka’s client, so that it will be asynchronously sent to a Kafka topic packed other messages in batch. Therefore, a query executed immediately after inserting this data won’t see the effect of the data entry, since the entry probably has not yet been processed by a node. It eventually gets processed for sure, but in the meanwhile that data entry will not be returned on queries.

Relating S1Search shards to Kafka: for every S1Search shard there is a Kafka topic from which nodes read messages. This, however, does not get rid of all communication between client and server during data addition. In order to build a complete data entry message, the client must first have contacted servers to fetch available Entity IDs. This is one of the topics covered in the next section.

Preparing the message — Entity and Record IDs

Given that in S1Search every data entry is associated with an Entity ID, every data entry message must contain one Entity ID. This Entity ID must come from somewhere, and in order to get a new Entity ID, the client must first connect directly to a cluster node.

The node which the client connects to is responsible for managing a set of shards and keeping information of used Entity IDs that are allocated to these shards. This node answers the request with a pool of IDs that the client might use in the future when the need for new Entity ID arrives.

The decision of which node to connect is made by the client and it does it in a round-robin fashion as a way to achieve load balancing and fair data distribution among nodes. This is, as in many other scenarios, a case in which the client performs important management logic.

Besides the Entity ID to whom the data in the message belongs, there is one more piece of data that must be added to a message regarding S1Search’s consistency protocol: a Record Identifier (Record ID) generated by the client. This record is later going to be used when verifying which messages were effectively processed and added on S1Search and which were not.

For the sake of understanding the need for the Record ID, recall that a node might fail. When the node reboots, it reads Kafka topics to reload any message it could not completely process before crashing. In this process the node might end up reading again a message that has in fact been processed before the failure.

S1Search operations are not idempotent and reapplying a command might produce wrong results on future queries. Record IDs are thus used to verify if a given message was already processed. A S1Search node keeps track of processed Record IDs and skips a message in case the Record ID has already been processed. As for the Record ID itself, it is a string containing a unique identifier from the client that generated the record and a sequential number generated by the client itself.


Okay, time to start putting all pieces together…

Putting it all together, an overview of the path S1Search takes to effectively add a data entry is presented below:

— Path of Data Insertion

Say the application wants to add a data describing the following user:

{"name": "Rick", "age": 30, "job": "Developer"}

First thing to note is that this data is broken into three columns and each one is an independent column: name, age and job.

If these columns don’t yet exist in S1Search, they must be declared. In this case, a column declaration command is sent to any of the nodes on the cluster, who is then responsible for setting up the new column on Zookeeper.

This introduce another element in our pipeline: Zookeeper is a distributed system used for reliably managing shared configurations. In S1Search, Zookeeper is used for keeping metadata on column definitions.

For more information about Zookeeper, take a look at this book.

Once Zookeeper has updated its data, it sends notifications to every node in the cluster and eventually they all register the new column.

Assuming columns have been declared, the client must now associate the data being added with an Entity ID. If the client does not yet have its own pool of available Entity IDs, it must fetch one from any S1Search node.

The complete message built by the client contains the original data, an Entity ID and an attached Record ID.

The message will look like this:

{"customer_id": 109, "database_id": 1277, "entity_id": 10, "column": "ColumnName", "value": "ColumnValue"}

Using the example we gave at the beginning of this section, the insertion message will look like this:

{"customer_id": 109, "database_id": 1277, "entity_id": 10, "column": "Name", "value": "Rick", "column": "Age", "value": "30", "column": "Job", "value": "Developer"}

The message is then sent to a Kafka topic and is eventually processed by the S1Search node that host the shard responsible for the Entity ID 10. In this example it will be the shard 1, as this shard goes from Entity ID 0 to 1,000,000.

When processing the message, the node checks the Record ID to make sure it is a new record or if the record can be skipped. If the record is to be processed, the node doesn’t immediately writes to its files. Instead it buffers new entries using an in-memory buffer.

This is done so to allow S1Search to sort write commands in a way that optimizes the overall process. When this buffer of messages reaches a given size or when messages have been buffered for long enough, the node writes to all necessary files. Think of the node writing the entry data to both materialized and inverted index views, but keep in mind that there are several files to be modified.

Modified files include the following data structures: index and posting list associated with an inverted index, a bitmap of known IDs in the system, a bitmap of Record IDs from the client source and search trees associated with translation of string hashes and frequency of occurrence of values.

Don’t worry, these files are described later on this post on a chapter describing in-depth details. Once every bit and byte has been set the data entry is ready to be returned on queries, which is the subject of the next section.

— Path of Query

We move on to an overview of what happens on S1Search during query processing. As every node in our cluster contains all columns of a set of entities, all nodes must be queried in almost every query.

The orchestration of this process is done by the client. Not too much of a trouble for the client though: as nodes are independent from one another, orchestration is roughly a matter of sending the same query command to every node in parallel and then aggregating the answers. Three other important tasks are taken by the client during query processing:

  • Sometimes, the client expects the answer to contain the column value stored in S1Search. However, since S1Search uses a dictionary encoding data compression technique, the data must first be translated from the encoded format to the original one. It is thus the job of the client to query nodes for translation.
  • On queries that might produce a large result message, this result is paginated, i.e., broken into smaller pages made available to the application as they are ready. It is the client’s job to keep track of produced pages and continuously query nodes for the next page until the complete result has been aggregated on the client’s side.
  • In case a node is down and fails to return its corresponding part of the answer, the client might produce an approximation of the answer of that node. This is possible on some types of queries due to the fact that entities are approximately equally distributed among nodes.

As for the server, the query process consists of parsing the query command into a sequence of operations to be taken and move on to taking the appropriate steps, such as:

  1. Reading posting lists and indexes data directly from SSD or in-memory segments.
  2. Uncompressing raw data if needed.
  3. Computing set operations on posting lists.
  4. Building the JSON answer expected by the client.
  5. Updating caches and storing information for the automatic query planner.

S1Search handles some types of queries (count, result, score, top-values and aggregations) and more details on the internal of these queries shall be given in a later post.

Understanding these internals requires knowing more details of the underlying data structures which are out of the scope of this introductory post. There are, however, some other features related to important optimizations that were shortly mentioned so far and that deserve more attention.

The next section develop on this concepts.


Some S1Search optimizations

The two presented paths introduced a few features that were added to S1Search after a lot of work on profiling rounds and that improved amazingly read and write performances. These features are: compression of data, avoid unnecessary repetition of strings and bulk write/read operations.

In Data Compression We Trust!


A known fact regarding primary memory and storage devices is that access to the former is around 1 million times faster than to the latter when considering hard drives and 10 thousand times faster when considering modern SSDs. Therefore, efforts to reduce the amount of data transferred to and from disks are usually worth the price — and this is precisely the point of compressing data. At the cost of adding CPU cycles to compress and uncompress data, huge gains on the size of transferred data can be achieved by setting a proper compression scheme. But one scheme won’t fit them all. Since every type of data has its own characteristics, different compression protocols might be required to achieve optimal compression rates, and we’ve played with a few of them.

Currently implemented compression schemes include: delta encoding, run length encoding, Simple 9 encoding and bitmap encoding. We also mix these compression schemes when the compression rate is worth the extra effort. Furthermore, whenever relevant, we reduce the amount of bytes needed to represent a numerical value on a file to a minimal as long as we know an upper bound for that column. For example, by limiting file sizes to 17 Gb we can use 34 bits instead of 64 when representing file addresses.

A point where S1Search seems to differentiate from other databases is that we make no distinction between hot and cold data when it comes to compression. Everything gets compressed in the same way.

Data compression is one of the parts where our developers have put a huge amount of time and effort. Because of that, it’s common for us to see compression ratio between 1/10 and even 1/30 of the original data size when inserting it on S1Search.

Avoiding unnecessary repetition of Strings

Simply put: we avoid repeating strings in our files. The full string of a value is present in only one file in our system, being replaced by a hash value everywhere else and this is why some query results often requires translation. This kind of technique is commonly known as dictionary encoding and will be subject to a detailed explanation in future in-depth posts.

Batch Write/Read operations

Recall we mentioned that S1Search buffers data entries before writing to disk only to sort them and write later? This is something we try to apply wherever possible: buffer data, sort them and apply modifications in bulk.

When writing data to storage the goal is to minimize costs with seek time and this is achieved by writing all we need to write at once at a given position instead of moving back and forward on storage device.

Another way where this principle is put to action is when adding data on S1Search’s indexes. For example, instead of adding 100 entries one at a time in a balanced binary tree, we’d rather add them all at once in a non-balanced binary tree. Of course that, when buffering data and postponing write operations, the node might crash before flushing the buffer. This is why keeping messages on Kafka and being able of correctly reprocess them is so important to S1Search, thus the imperative need of Record IDs.

Conclusion

This post was the first step into a journey of getting to know S1Search. An overview of its properties was given, its architecture was described and many important features were introduced while covering two of its main processing paths.

We hope that after reading this post you’ve come to the following conclusions:

  • S1Search is a client-server shared-nothing analytics-focused database that provides strong but non-ACID consistency.
  • It makes extensive use of search engines approaches along with well-known performant database storage modeling like column stores.
  • It implements a handful of data compression techniques, and.
  • S1Search’s goal is to be light, cheap and fast.

If you got interested into learning more about S1Search, make sure to check this blog regularly to get more details as we release them.

Some References

Below is a list of many concepts we use on S1search and that were mentioned on this blog post. Refer to them as a start point in case you want to learn more about these concepts.

— Architecture concepts

— Database-related concepts

— Database compression concepts/techniques

Below are some books, courses and papers that helped us learn when developing S1Search. They aren’t ranked nor ordered in any kind of importance or priority.

— Courses

— Books

— Database related Videos

— Conferences

— Some Interesting Database Papers

— Some Interesting Data Compression Papers


— Other databases to look for inspiration