Elasticsearch vs Cassandra from a Data Engineering perspective

I am running Elasticsearch (a.k.a. ES) for an analytic system. Overall, it’s running fine but sometimes, when heavy aggregation queries are submitted, retrieval query latencies are longer than expected. This is because such queries consume huge amounts of memory and CPU power. Unless we split ES clusters into dedicated purposes for aggregations or retrievals, it’s hard to guarantee query latency SLA. Some users who were tired of monitoring high p99 query latency graphs recommended that I run the Cassandra (a.k.a C*) cluster for fast retrievals.

Problems

I haven’t used C* before and below I list all the indexing features that I’ve used with ES to apply my modeling know-how to C*.

  • time-based data retention management
  • partial document update
  • flexible filtering with fast retrieval
  • bulk indexing with Spark or Stream processing engine

Time based data retention management

Since we’re not so much interested in very old data, we need to delete outdated indices to free up the system’s resources. Both ES and C* support Time-To-Live (TTL) by setting up tombstones on data and purging them on index optimization. Since index optimization overhead is not trivial, ES experts recommend rotating indices in a timely manner (i.e., daily, weekly, or monthly) based on the documents’ timestamp and removing entire indices when they are outdated. How can we manage data retention on C*?

Partial document update

Suppose, for example, we are implementing food delivery order analytics. If the customer orders food, the order status is ‘order_created’. Then this order is dispatched to the restaurant and the order status is updated to ‘order_accepted_food_preparation.’ When the food is ready, the order status is changed to ‘waiting_for_pickup.’ Similarly, we have many common requirements for partially updating documents instead of storing an entire transaction history. Does C* support this in a high concurrency environment?

Flexible filtering with fast retrieval

ES is famous for its query and indexing flexibility. It supports arbitrary combination of boolean queries on indexed fields. It is also very easy to update index schema such as adding new fields, which can be done on the fly at any time. Can C* support this flexibility? If for performance reasons we cannot take advantage of the flexibility, how many features do we have to sacrifice with C* retrieval performance?

Bulk indexing with Spark or Stream processing engine

We can use es-hadoop connector for bulk indexing with Spark to load data from HDFS. Also, as mentioned in the previous blog post, implementing ES producer in Apache Samza is very straightforward with reasonable throughput if we use ES bulk indexing. What about C*?

Solutions

I tested the following features with C* version 2.2.5.

Time-based data retention management

There are many use cases of C* as a time series database powered with DateTieredCompactionStrategy (a.k.a DTCS). I understood that how DTCS works is similar to removing an entire index outdated in ES. With DTCS, C* creates SSTable, tiered by its created time and if all the rows in the SSTable are outdated by its TTL, C* will safely delete the entire SSTable instead of running an expensive merging process. I’ve seen a few negative blog posts including this one, to share their experiences of running DTCS in a real environment.

Partial document update

The recent version of C* supports Light-Weight-Transaction (a.k.a. LWT) to support partial document update. With LWT, we can execute INSERT INTO … IF NOT EXISTS to keep the initial record from overwriting and UPDATE SET … IF CONDITION to update under the specific condition only. The C* community recommends against using LWT for certain reasons. However, this feature was very important to me and I wrote a simple test program and ran it in my local environment to verify whether LWT works perfectly to clear any concerns. After a few tries, I decided not to use LWT because I could reproduce failed unit tests very easily with multi-threaded C* clients. When I tried a single thread C* client, it worked but I haven’t found any comments saying I should not use LWT where multiple clients can try to update data with the same primary key. It’s possible to prepare for the environment where only a single client will update the data with the same key after partitioning input data by the primary key. However, we need to measure engineering costs between input data prep work and data modeling without LWT.

Flexible filtering with fast retrieval

The C* indexing scheme has two components — primary keys and clustering columns. Primary keys are for partitioning data; clustering columns are used to sort them. Clustering columns also support the secondary index, but this is not recommended. Many people are saying that the secondary index should not be used for fields with high cardinalities. If we want to narrow down results based on other fields that are not primary keys or secondary indices, we can, in very limited situations, use ALLOW FILTERING. Many analytic systems support IN operator to support filtering with multiple values instead of long OR combined query. IN operator, however, cannot be used in primary keys with the following error messages

Bulk indexing with Spark or Stream processing engine

Spark-cassandra-connector is supported by DataStax. I was curious about how it would handle batch mutation and help save a great deal of frequent network round trips without causing any trouble. Indeed, there are many technical issues in batch mutations such as logged, unlogged and coordinator. Spark-cassandra-connector maintains the cache implemented with a HashMap and PriorityQueue hybrid. It caches all statements under the same primary key. If the size of the batch is over the threshold or if it needs to evict some records, the set of mutation statements will be pushed into C* as the logged batch. C* is well-known as high write throughput with low latency, yet I was disappointed at its write throughput where the input data were completely random, in other words, caching does not work efficiently and the frequency of batch mutations is really high.

Apache Samza does not support C* yet and we have to implement our own mutation batch using the DataStax C* driver or another open source library to take advantage of their error handling or connection pooling. Using DataStax C* Java driver was very straightforward but maintaining the cache for efficient batch execution was another problem. Neither did it look easy to re-use spark-cassandra-connector’s writer classes.

Conclusion

Finally, I stopped working on C*. I decided to add more machines to the ES cluster with modified shard allocation rules and thus make a set of new machines not impacted by any different data sources or queries. I am not an expert on C* and am not sure that I spent sufficient time to reach a conclusion that using C* offers greater benefits than expanding the ES cluster. Having said that, my overall impression of this initial foray into C* is that C* is too rigid for data engineering.

As far as minor things go, a noticeable difference was its documentation quality. When I searched questions on C* in Google, it seemed very likely that I would see stackoverflow Q&A page as the top ranked search result. For ES, though, the first answer was always from an ES official document. For example, to find the solution for the above error message “Select on indexed columns and with IN clause for the PRIMARY KEY are not supported”, Google’s search result was

cassandra_google_result

Stackoverflow was the first one  and datastax document was the second one(A week ago, datastax document was the third one). I am not underestimating stackoverflow Q&A but it can be a clue that C* documentation is not well organized because people are more often looking at stackoverflow pages than at C* official documents.

Samza vs Spark Streaming

Stream processing engines consume messages from data pipelines. They process data in various ways, ranging from extracting extra metrics or dimensions to performing partial aggregations. The engines also publish processed results in storage spaces like Elasticsearch or Cassandra. Stream processing engines consist of an event data consumer, a processing part, and a publishing part. It can thus be considered as a complement to a data pipeline and should be tightly integrated with the existing data pipeline. Currently, the most popular choices are Apache Samza and Spark Streaming.

What should be considered when choosing a stream processing engine

For many, the most important features to consider when choosing a streaming processing engine are the following:

  1. guaranteed delivery
  2. backpressure handling
  3. implementation flexibility

Guaranteed delivery

Guaranteed delivery sounds like a critical feature of a data pipeline but a stream processing engine is a consumer on a data pipeline. So we need to consider them together when choosing the stream processing framework. Apache Kafka can support guaranteed delivery from end to end. For the producer-side guaranteed delivery, we need to use synchronous producers (send() method does not return until all acks get back from the broker) with acks of one or more. Since the producer is asynchronous by default, somehow the producer can lose a few messages when the machine goes down before flushing all messages in the memory buffer.

If data is persisted once in the broker, the consumer can easily fetch messages at least once based on topics, partitions, and offsets.  However, checkpointing should be handled carefully. Checkpointing means bookkeeping the last message ID consumed and the consumer can resume processing from that last message ID without any duplicate data. What if we checkpointed in advance before processing all messages? For example, suppose a stream processing engine fetches 50 messages from message ID 1 to 50 and starts processing them sequentially. When it confirms processed message ID 27, it can write the number 27 to some durable storage such as Zookeeper, Cassandra, HDFS or Kafka and continue processing the next message. Then even though the stream processing engine goes down and gets restarted later, it can resume processing messages from ID 27, the last checkpointed one.

These sequences are all connected as input IO – processing – output IO. To remove the IO bottleneck except for the intermediate processing, we can make them non-blocking with Java BlockingQueue. This is good for overall processing performance but can make checkpointing complicated because it’s hard to catch the last processed message ID since a non-blocking IO does not handle messages sequentially. This makes error handling very complicated and almost impossible for guaranteed delivery processing on the consumer side. Also, a Kafka consumer of a streaming processing engine should not use automatic offset commit.

Robustness on backpressure

Backpressure means that the publishing endpoint of a stream processing engine cannot ingest data as fast as it can produce it. For example, we have a streaming processing engine that consumes messages from Kafka, doing many things, and then publishes the results to Elasticsearch. If the Elasticsearch cluster is degraded or has limited capacity, messages that are not drained to Elasticsearch will be accumulated inside Kafka or the stream processing engine. This can always happen  because usual business event traffic  shows a sine curve. Typically the difference in traffic between the peak and lowest dormant time can be more than double. If Elasticsearch is not scaled enough upon peak traffic, there will be backpressure everyday. A streaming processing engine should be able to handle backpressure without causing any trouble even if that situation lasts longer than expected. As the downstream side is scaled up, it should be able to catch up with backlogged data.

Implementation flexibility

Handling data in a streaming way is trickier than doing so in a batch way. This is the case mostly because state management and join operation are not straightforward in the stream processing. Using the example of a delivery campaign mentioned in a previous blog post, we want to aggregate the number of advertisements exposed. Whenever the advertisement is displayed to a potential customer, the event is logged to Kafka with the timestamp of when the advertisement was displayed and the amount of money charged from the media company. If we process and store these messages to analytics engines such as Elasticsearch or Druid, we can analyze how much money we’re spending on the marketing campaign. Writing data to analytics engines directly is not so difficult but what if we need to join other data to obtain information missing from the original event log? In a stream-processing world, data runs like water; stopping it is not easily done. In a batch world, however, we can pour all the water into a huge bucket and mix it together.

Another challenge is, how can we manage large distributed cache systems without external dependencies such as Redis or memcached? Let us suppose, for example, we need to monitor a user’s behavior using GPS tracker. The GPS tracker is polling the user’s location and status every second. Such a level of fine granularity is unnecessary; all that is needed is minute granularity. Then we can ignore all GPS tracking information except the very first event of each minute. I refer to this way of information gathering as “snapshotting.” If we can guarantee that messages are perfectly ordered based on their timestamps, snapshotting will be much easier if we simply take the first message of each minute and discard the others. But in real-world data pipeline, messages are not delivered in strict order. Then we can use cache to implement snapshotting if the key of the cache is user-ID and a user’s timestamp of a minute granularity. What then becomes critical is the flexibility with which we can implement the stream processing logic.

Samza vs Spark Streaming

I have been using Samza for two years, since its 0.7.0 version. Back in January of 2015, when I was reviewing Samza vs Spark Streaming,  I discovered a few critical features that Spark Streaming could not resolve.

Guaranteed delivery

The first one was Kafka integration. Samza built up its own Kafka consumer based on SimpleConsumer APIs. Spark Stream, in contrast, depended on having high level consumers. High level consumers work well in most cases. Indeed, it offers powerful features for fault tolerance and fully automated partition ownership management. What I observed, though, was quite often a Zookeeper session with high level of consumers was frozen. This meant its ownership of Kafka topic partitions was not released but had stopped consuming messages silently. To prevent this problem, we set up a monitor on the number of owned partitions of high level consumers. If this number did not match the actual number of partitions, an alarm was triggered. If we use Direct Kafka Stream, we are able to avoid the potential risks to high level consumers. The Samza Kafka consumer faces a slight drawback. Its consumer fetches topic metadata on an initial startup and coordinates tasks-partitions mapping. This meant that we had to restart an entire Samza job whenever Kafka topic partitions were expanded. While this was inconvenient, it was not a serious problem for us because topic expansion is rare and fully controllable. Actually, this problem was solved in 0.10.1.

Robustness on backpressure

Spark Streaming was vulnerable to backpressure handling. Because Samza works as single threaded and all blocking ways, its processing is blocked when the downstream endpoint is backed up and messages are safely kept in Kafka. If we scale up the downstream side within Kafka’s message retention hours, Samza will follow up slowly and steadily. Spark Streaming, however, had the problem that when DStreams were not drained its driver app would cause an OOM error. It had various features to slow down ingestion such as rate control but these weren’t perfect solutions. I am not sure how the latest version of Spark Streaming handles this problem.

Implementation flexibility

Spark Streaming programming can support only a restricted model of stream processing since its architecture is mainly based on Spark RDD processing. All intermediate objects should be serialized and deserialized in executors. If we want to use specific features on external libraries such as Guava to process RDDs, those objects should be serializable from a driver app to executors. What if we cannot control serialization/deserialization in external libraries? This seriously harms implementation flexibility. Also, Spark Streaming DStream batch duration does not allow itself to be controlled in such a way to make it happen in the specific time range such as “I want to make DStream from 2016-08-24T12:23:00Z every five minutes such as 2016-08-24T12:28:00Z, 2016-08-24T12:32:00Z”.

However, what has many advantages over Samza is Spark Streaming. Samza needs a long list of configuration properties for each job and multi-staging jobs should be pipelined with Kafka persistence. This can be extra overhead on Kafka cluster and many Kafka Ops want to avoid impacting the real production cluster due to stream processing. Configuring Spark Streaming is relatively easy and its micro batch can handle multi-staged shuffling without intermediate persisted IO on external storage systems.

The biggest advantage of Spark Streaming is its ability to share codes between real-time and batch processing. This advantage will not shine so much in complicated ETL cases. While we can pour all data in a huge bucket and slice and dice at once in a batch way, the streaming way of implementation is likely to handle data processing incrementally, which is not good for storage optimization.

Say we’re using Elasticsearch as the aggregation engine and we want to aggregate the number of event occurrences on an hourly basis and an additional geo-location dimension. Each event can be the following:

Then, we can use the following Elasticsearch API to index the counter for the time range from 2016-08-24T11:00:00Z to 2016-08-24T12:00:00Z on san_francisco_bay_area location dimension.

Later, another event comes in within the same time range and location, then we need to increment the counter using:

Since we cannot keep all data in an hour on the memory with Spark Streaming, we need to implement incremental processing, as in the example above. We have to write codes similar to what is shown below:

In a batch way, we’d better not use incremental processing because the example above is not idempotent. When Spark job retries same message processing due to network glitches between Spark executors and Elasticsearch nodes, counters will be incremented multiple times for the same data. In a batch way, if we assume all Kafka data are stored in HDFS or S3, we need to write the following Spark codes which are not incremental.

Luckily, we can share a few lines of codes with the example above but this incremental indexing will trigger many Elasticsearch update operations which will be extra overhead on Elasticsearch compared to the single write done by the batch job. Also, what happens if we want to join two streams whose join keys cannot fit into the Spark Streaming batch duration window? We need to implement upsert in Elasticsearch. In this case, the map function right before .saveToEsWithMeta should be implemented differently per data source in an incremental way. This will make code re-using tricky. In my experience, the portion of Spark batch job codes that can be re-used in Spark Streaming are not so significant to consider and I find the prospect of sharing codes to be unattractive.

Integration with external systems

Unfortunately, Samza does not officially support various connectors to external storage systems such as Cassandra and Elasticsearch , while Spark officially supports Cassandra and Elasticsearch connectors.

Elasticsearch support was newly added in Samza 0.10 but it depends on native Java TransportClient API, which is not good for Elasticsearch or JVM version compatibility. If we want to upgrade Elasticsearch major version, we cannot guarantee that current Samza jobs will successfully communicate with new Elasticsearch. Also, I saw that all Java TransportClient failed to join to Elasticsearch cluster when we updated the JVM version in the Elasticsearch cluster. This problem was caused by updated serialization format of some networking library in Oracle JVM. This is why the Elasticsearch-Spark connector implemented the internal Elasticsearch client using REST api.

It is not so difficult, however, to implement connectors to external systems inside Samza thanks to its flexibility. All connectors can handle messages one by one but Spark connectors look quite complicated when I looked at two Spark connectors of Cassandra and Elasticsearch, where a lot of advanced Scala language features were fully harnessed.

As minor things, Samza does not yet support Mesos and its stand-alone execution mode is still under development. However, Spark Streaming can support Mesos and the stand-alone mode.

Samza rocks, Spark Streaming optional

In summary, even though Spark Streaming has nicer features and provides code re-use, I prefer Samza. Samza is quite stable and guarantees an at-least-once message delivery with built-in checkpointing and single thread processing. I can implement various types of stream processing logic in Samza such as snapshotting armed with Guava Cache and Samza KeyValueStore. Configuring and deploying Samza jobs are a little messy but this is likely to be overcome by additional tooling, which, I believe, should be supported strongly by the community.

A Survey of Query Execution Engines (from Volcano to Vectorized Processing)

Open source analytical processing systems such as  Drill, Hive, Impala, Presto, Spark, and Tajo have recently gained popularity. These projects introduce themselves with various cutting-edge techniques that are not easy to understand at a glance. One of the most popular techniques is vectorized query processing. In this article, I’m going to give you comprehensive understanding about it. Rather than focusing on only vectorized query processing, I’ll explain various query execution engines for better understanding around query processing. In detail, I’ll present what query execution engine is, various processing models, and some examples of open source implementations. I hope this article would be helpful to understand even more analytical systems in Big Data world.

Query Execution Engine and Different Processing Models

Query execution engine (shortly called execution engine) in an analytical system is a core component that actually evaluates data and results in query answers. SQL engines like Hive, Impala as well as even general purpose engines like Spark commonly have execution engines. An execution engine dominantly affects the performance of data processing. For better performance, various models of the design and implementation have been introduced in database community. There are mainly three or four execution engine models. Most execution engines follow Volcano-style engine model [14].

In Volcano-style engine model, an execution engine takes an execution plan (also called evaluation plan), generates a tree of physical operators, evaluates one or more tables through physical operators, and finally results in a query answer. A physical operator is an evaluation algorithm that implements an operator of relational algebra (or logical plan). Physical operators are implemented though Volcano-style iterator interface, in which each operator implements an API consisting of open(), next(), and close() methods. open() initializes external resources such as memory and file open, and close() releases them. As shown in Figure 1, the next() call of each physical operator in an operator tree produces one new tuple recursively obtained from next() calls of child physical operators in the tree. This iteration interface is common in all models that I’ll describe later.

volcano_style

<Figure 1. A tree of physical operators and Volcano-style iteration>

Volcano-style (Tuple-at-a-time) Processing

Let me talk about the Volcano-style engine model in more detail. It was firstly introduced in Volcano system [14]. It is the most common execution engine model in real-world DBMSs, such as MySQL, SQLite, and earlier versions of Hive.

As I mentioned earlier, each next() in a physical operator recursively invokes next() of child operators, and then it finally produces a single tuple. It’s why this model is also called “tuple-at-a-time processing model”. While this model is very intuitive and easy to implement, it is not CPU friendly in terms of processing. There are many drawbacks to cause performance degradation [2, 13]. Here, I’m going to explain few significant points.

 <Figure 2. Latency Comparison Numbers [9]>

Firstly, next() is implemented via so-called virtual function call. In compilers, a virtual function call involves the lookup of a function pointer in the vtable, requiring additional CPU instructions. Also, a virtual function call is an indirect jump, usually causing a branch misprediction. A branch misprediction can cause many bubbles in CPU pipelines. This branch misprediction penalty is not trivial as shown in Figure 2. In Intel Skylake processors, this penalty is at least 16-17 cycles [5]. It significantly decreases instructions per CPU-cycle (IPC). Note that operators recursively call next() of child operators in order to return a single result tuple. Thus, a large function call overhead occurs for each tuple. The similar overhead also occurs when each expression in select list is evaluated. These overhead is called interpretation overhead. Figure 3 shows a typical aggregation query. Think about how many virtual functions are called only for a single tuple. According to [12], only 10% of the time for TPC-H Q1 in MySQL using Volcano-style iteration spent on actual evaluation.

<Figure 3. TPC-H Q1>

Second, accessing a row causes many cache misses. Volcano-style processing uses row-wise tuple representation called N-ary storage model (NSM). In this model, a tuple is stored as a sequence of bytes in a memory space as shown Figure 4. Each operator mainly reads some fields in each row. As shown in Figure 5, in order to load l_shipdate field in a row CPU will load more bytes that begin at the cache boundary into a cache, but most of the cached data will be irreverent. Subsequent accesses to fields will cause frequent cache misses, leading to the reference of memory. CPU will be stall while waiting for fetching one cache line from memory. As shown in Figure 2, a main memory reference takes roughly 100 ns, which may be a time for CPU to execute hundreds of instructions. Consider how much CPU performance would be degraded significantly if multiple cache misses occur to produce a single tuple.

nsm_and_dsm

<Figure 4. Relational table and its row representations in NSM and DSM (source: [2])>

cache_boundary

< Figure 5. Memory representation of rows>

Third, all routines of a tree of physical operators run tightly interleaved. The footprint of combined instructions is likely to be large to not fit in L1 instruction cache. It can frequently cause instruction cache misses. For example, in Figure 2 HashGroupby, HashJoin, and two SeqScan will run sequentially for each next() call. In real world queries, tens of operators of a single tree are usual. The latest Intel CPU Skylake just has 32KB L1 instruction cache per core [6]. Also, each operator maintains various states, such as cursors, hash tables, and partially aggregated values. For next() call, the execution engine should access most states of a physical operator tree. It can frequently cause data cache misses. As I mentioned earlier, both cache misses cause significant performance degradation.

Block-oriented Processing

Block-oriented processing [6] is based on the volcano-style iteration, but next() call returns a list of (100 – 1000) rows instead of a single tuple. Block-oriented processing amortizes the cost of virtual function calls by using tight loops in each operator. Thus, block-oriented processing reduces much of the interpretation overhead and increases instruction and data cache hit ratio than that of volcano-style processing. As I mentioned earlier, however, accessing fields in a row can still cause frequent data cache misses as shown in Figure 5. Block-oriented processing also has limitations for compilers to exploit modern CPU features like SIMD.

Column-at-a-time Processing

Column-at-a-time processing [10] is an early model for columnar execution. Column-at-a-time processing is based on block-oriented processing, but it uses vertically decomposed storage model (DSM) [1], where columns are dense-packed arrays. Figure 4 shows an example of rows represented in DSM. Besides, next() in each operator processes a full table and store intermediate data in a column-at-a-time fashion.

< Figure 6. Comparison primitive between an integer column and
a constant value (source [2])>

In this model, the building block operators are called primitives. They are simple functions that process arrays of values in a tight loop. This approach gives compilers more optimization opportunities to exploit modern CPU features. First of all, code footprints of loops can be easily fit into L1 instruction cache. Eliminated function call, loop unrolling (see Figure 7), less control and data  hazards can make branches even more predictable and maximize the depth of CPU pipelining. Auto-vectorization by compilers can use single instruction multiple data (SIMD) instructions, exploiting data-level parallelism in modern CPUs. Consequently, this model achieves higher CPU efficiency.

unrolled_loop

<Figure 7. An example of loop unrolling – a loop can be optimized to be an unrolled loop.>

See an example primitive of column-at-a-time model shown in Figure 6. uselect_bt_void_int_bat_int_const takes 4 parameters, where output is an array of output values, input is an input column, value is a constant value for comparison, and size is the number of input column values. This primitive performs a simple tight loop to check if i th column value is bigger than a given constant value.

DSM_cache_boundary

<Figure 8. Memory representation of rows in DSM>

Unlike the previous two models using row representation in NSM, the column-at-a-time processing uses DSM where values of each column are packed in a dense array. The access to values of rows is direct. As shown in Figure 6 and 8, a primitive consumes and produces arrays of values with a good cache locality. Thus, this model significantly improves CPU efficiency even more than the previous two models do.

However, column-at-a-time processing model has a drawback. Since this model processes a full table in a column-at-a-time, it can lead to a large amount of intermediate data to be materialized in memory, on disk, or both, causing data cache misses and I/O overheads.

Vectorized Query Processing

Vectorized query processing [12] is currently one of the most advanced columnar execution models. It was designed to solve the drawbacks of column-at-a-time processing [14]. Most parts of vectorized query processing are similar to those of column-at-time-time processing, but for each next() vectorized query processing model processes column chunks called vectors (see Figure 9) instead of columns of a full table. Vectors contain a batch of rows that are small enough to fit in CPU cache. Thus, each operator on vectors can do in-cache processing. Consequently, vectorized query processing dramatically reduces data cache misses while keeping the advantages of block-oriented and column-at-a-time processing models.

dsm_vs_vector<Figure 9. Row representations in DSM and Vector models>

Figure 10 shows TPC-H Q1 performance in a vectorized query processing engine (MonetDB/X100) with varying vector sizes. This figure shows how vector size affects cache performance. When the vector size is 1 and 6M, it works like volcano-style and column-at-a-time engines respectively. According to the figure, about 4k is the best vector size.

cache_effect_with_varying_vector_sizes

<Figure 10. TPC-H Q1 performance in MonetDB/X100
with varying vector sizes (source [12])>

Like column-at-a-time model, this model also uses simple functions called primitives that process column vectors in a tight loop. They usually perform arithmetic calculations, comparison operations, and logical operations. There are mainly two kinds of primitives: map and select. A map primitive is just a function transforming one or more input vectors into an output vector. A select primitive is a filter function, generating a selection vector according to a predicate (i.e., a boolean condition).

<Figure 11. add primitive for long and integer vectors>

<Figure 12. select primitive evaluating a boolean condition in long and int vectors>

Figure 11 shows an add primitive that adds corresponding row values of the two input vectors col1 and col2 and writes the sum of them into a result vector. A selection vector sel_vec is an optional parameter, and it can come from other selection vectors. A selection vector is an array containing offsets of the relevant rows. If sel_vec is given, primitives evaluate only rows corresponding to rows positions of the selection vector. If sel_vec is null, primitives evaluate all rows. Also, vec_num contains a vector size. This is a typical interface of primitives in the vectorized query processing.

Figure 12 shows a select primitive that evaluates a boolean condition on the two input vectors and writes relevant row positions into an output selection vector (res_sel_vec). Like add primitive, select primitives also can process input vectors with or without an input selection vector. If a select primitive takes a selection vector, it works as if the primitive evaluates a boolean condition as well as ANDing together. This is because with a selection vector, a select primitive evaluates only relevant rows already selected from other select primitives.

Implementations in Open Source Projects

So far, I discussed different query execution engine models. Now, let’s take a look at some real examples for the models.

Apache Impala is a distributed SQL engine on Hadoop. Impala was firstly introduced by Cloudera in 2008. As it was recently designed, Impala reflected the recent studies of the database community. It chose the block-iteration model and Just-in-time (JIT) query compilation using LLVM. With regard to their design choice, Impala looks similar to [4], but Impala uses push-based tuple iteration rather than pull-based iteration. You can find iteration interface and row block implementation at Impala source code as shown Figure 13 and 14. These source files also contain valuable comments to help us to figure out a real example of the block-oriented model. I recommend you read them.

<Figure 13. An execution Iteration interface in Impala>

<Figure 14. Row block implementation in Impala>

As I mentioned earlier, the block iteration model has some disadvantages such as data cache misses and less optimization opportunities by modern compilers, but Impala choose JIT query compilation techniques to mitigate them. At runtime, Impala generates codes for expressions, hash function, and others by using LLVM JIT. This approach can eliminate many branches causing misprediction and use intermediate data kept in CPU registers [4], gaining performance benefits. Cloudera also provides how Impala’s code generation improves the performance [8]. It’s worth to read it.

Apache Hive is a data warehouse solution in Hadoop. Before Stinger project, Hive used the tuple-at-a-time model. Since Hive adopted the vectorized query processing in 2014, Hive has provided two execution engines based on both query processing models respectively. Users can set a specific configuration to choose one of both engines. Hive has a highly abstract iteration interface. As you can see in Figure 15, Operator class is a typical Volcano-style iteration interface, and process()corresponds next() of Volcano-style model.

<Figure 15. Iterator interface in Hive>

Interestingly, this single interface supports both tuple-at-a-time and vectorized query processing. Depending on the engine mode, the first argument Object can be a row or VectorizedRowBatch, which contains a selection vector and a batch of rows organized with each column as a vector. As shown Figure 17, ColumnVector is an abstract implementation of a single vector, and it has concrete subclasses for various data types. Each concrete class uses a specific type array to represent a value vector.

<Figure 16. Vectorized rows in Hive>

<Figure 17. Column vector interface and long vector implementation in Hive>

Note that Hive is implemented in Java that does not provide SIMD primitives. Its vectorized query processing engine is not complete. Java’s superword optimization (SIMD) works implicitly when a sequence of routines satisfies with some specific conditions (e.g., aligned memory, direct memory access, and simple operation). In other words, developers cannot manually control using SIMD instructions, and they just let some routine satisfy with the conditions to trigger the superword optimization. Currently, Hive community is working on more superword optimization opportunities [7].

Columnar Storage vs. Columnar Execution

Lastly, I’d like to point out one more thing. In my experience, people are often confused into thinking that columnar storage and columnar execution are the same thing. Furthermore, people misunderstand that some system uses a columnar execution engine if it supports columnar file formats, such as Parquet and ORC. Actually, that’s not the case. Columnar storage is about how data are organized in a columnar fashion for I/O efficiency. Columnar execution mainly focuses on efficient computation. This confusion may be because many literatures assume that columnar executions implicitly work on columnar storages. In literatures, that assumption is reasonable because in database systems storage structures like block or pages are directly used as in-memory row structures. However, it is different in Hadoop ecosystem. In Hadoop ecosystem, analytical systems use their own in-memory row structures, and file formats (e.g., Avro, SequenceFile, Parquet, and ORC) are independently standardized. Thus, the systems read and serialize the storage representation into their own in-memory row structures. That is main difference between analytic systems in Hadoop ecosystem and studies in literatures.

Conclusion

I presented different query execution engine models and their pros and cons. After you read this article, it would be more fun to understand existing analytic processing engines. Besides, if you want to learn columnar execution and storage, I recommend reading Column-Oriented Database Systems (VLDB 2009 Tutorial). It would be helpful to figure out the overall studies and find good papers worth reading further.

References

[1] A decomposition storage model, ACM SIGMOD Conf., 1985.
[2] Balancing Vectorized Query Execution with Bandwidth-Optimized Storage
[3] Block Oriented Processing of Relational Database Operations in Modern Computer Architectures, IEEE ICDE Conf., 2001.
[4] Efficiently Compiling Efficient Query Plans for Modern Hardware, VLDB Conf., 2011.
[5] http://www.7-cpu.com/cpu/Skylake.html
[6] https://en.wikipedia.org/wiki/Skylake_(microarchitecture)
[7] https://issues.apache.org/jira/browse/HIVE-10179
[8] Inside Cloudera Impala: Runtime Code Generation
[9] Latency Numbers Every Programmer Should Know
[10] Monet: A Next-generation DBMS Kernel for Query-Intensive Applications
[11] MonetDB/X100: A DBMS In The CPU Cache, IEEE Data Engineering Bulletin, 2005.
[12] MonetDB/X100: Hyper-Pipelining Query Execution, CIDR 2005.
[13] The Design and Implementation of Modern Column-Oriented Database Systems
[14] Volcano-An Extensible and Parallel Query Evaluation System, IEEE TKDE, 1994.

Demystifying Different Roles in Data Team

One question I often get is how to build a data team in a company. This is mostly asked by people who are doing a startup or thinking about starting one seriously. This question is followed by another set of questions. Two common ones are:

  1. What does a data scientist or data analyst or data engineer do? How are all these positions different?
  2. If I have to hire one data person, should I hire a data scientist or a data engineer?

I will try to answer the very first question (how to build out the data team) and the next question (how the roles differ) in this blog. Instead of explaining different data positions one by one, I will describe them as I explain how a company’s data organization evolves by talking about the roles they are playing. This is obviously purely my personal opinion based on my own experience.

Since Data Engineering was explained by Jae in our previous blog, my focus will be more on the data analyst and data scientist side of things.

Beginning of Data Organization

When a company is small, you don’t really have sizable data to process. Actually worrying about daily survival is probably a more important priority. But as the company grows, there will be more and more data which you want to get some (actionable) insights from for your business.

Most likely in the beginning your production database (for example MySQL) will be used as the source of truth. But it is the production database so running some massive queries against it isn’t a great idea. Once in awhile this type of queries will freeze the database causing the service to grind to a halt. Production engineers will be mad at whoever ran the queries and those who ran the queries would be scared to run them again.

On top of that, you will likely want to add different data sources which are not readily available in the production database so that you can do one stop querying (joining of different data sources from a single database) as far as data analysis is concerned. Doing this requires engineering’s help and since the production database isn’t necessarily for this type of additional data, they are not going to be receptive to doing this type of work.

Introduction of Data Warehouse

Have you heard about “data warehouse”? Data warehouse is a term referring to a scalable database which contains all the data an organization needs for data analysis purpose. As your data grows and the demand to analyze it grows, you better set up a separate and scalable database which is your data warehouse. That’s the single source of truth and one-stop shopping center as far as data is concerned. That’s your data warehouse.

I would say the beginning of true data journey in an organization starts with its introduction of data warehouse. As mentioned earlier, when it is really small, your production database can be it. A popular choice of data warehouse these days is Redshift from AWS (explaining what it is itself will be another article) but I have seen companies using Hive as well.

Introduction of ETL processes

Once you have your data warehouse, the next thing is to load important data you want to analyze to it. This process is called ETL (Extract, Transform and Load):

  • Extract means extracting data from source (for example MySQL). Data sources can be MySQL, web access log, external data in FTP, AWS S3 and so on.
  • Transform refers to changing the data format to the one you want.
  • Load is about uploading it to the data warehouse.

Eventual output of an ETL process is a table (structured data) in the data warehouse. Still relational database knowledge (and the familiarity with SQL) is important in the era of big data.

You can use any language to implement this ETL but Python seems to be the most popular choice at this point (I might be a bit biased by the way). Simple ETL can be entirely written in Python but more massive large data ETL will require some big data processing technology such as Hive and Spark. ETL is mostly batch processing oriented as opposed to processing data in realtime at least in the beginning.

Managing ETL processes

When you have a handful of ETL processes, you can just use cronjob (or Jenkins) to schedule and manage them. Once you have a lot more processes with complex dependencies among them, you need some kind of scheduler or workflow engine which allows you to schedule jobs at certain times or based on data availability.

Airbnb’s Airflow is gaining a big momentum but there are lots of other options (Pinterest’s Pinball, LinkedIn’s Azkaban, Yahoo’s Oozie, Luigi, AWS Data Pipeline, Coursera’s Dataduct, …).

At Udemy (an online learning marketplace startup) we are currently using Pinball which is written in Python after evaluating a few different options mentioned above. In Pinball, you can visualize job dependencies like this:

pinball.png

Who manages data warehouse and ETL processes?

Who will be managing this data warehouse and implementing ETL processes? That’s the main job of data engineers. It is one of the hardest jobs in software engineering since many data sources are not in their control so on a given day some ETL processes will be always broken. These need to be fixed as soon as possible.

Also there will be a lot of requests from different teams to add more data to the data warehouse and worst of all the engineers really don’t know how the data is consumed in the end. Most of the time they are data code monkeys :). Having a big picture or some kind of feedback is very important for the engineers to be excited about their work and is an important responsibility of the data engineering management.

Learn from Data and Predict the Future

Once you have lots of ETL processes, you will end up having lots of tables in the data warehouse. It can be a few thousand tables with lots of data. Who would know all of these tables? It is not a good idea to open up access to all these detailed (more like raw) tables to employees of your company since they can’t know all of them and it will cause more confusions and can lead to very wrong analysis.

Creation of Summary tables

A better way of doing is to create summary tables around key entities of your business. In the case of Udemy, key entities are students, instructors and courses. Most data questions will be likely around one of these:

  • How many new students were enrolled in the last month? Breakdown by countries?
  • What’s the revenue breakdown of new users vs. existing users?
  • Who are the top instructors in terms of revenue or enrollment?

The summary table could be designed around certain actions such as search. For example, what’s the top search terms by revenue in the last one week?

Once you have a few of these summary tables, most users don’t need to know all the gory details of tables and their dependencies. They can just use them to get what they need. From these questions, you might get some ideas. This summary table building requires deep domain knowledge and understanding of your business and product.

Who creates the summary tables?

Creating and updating these summary tables itself is another ETL processes. They can be written by data engineers but it is better suited for data analysts through writing SQL (not necessarily by coding) mainly because this requires deep domain knowledge of business and product.

Data analysts generate internal reports to meet requests from different business units (sales, growth, product and so on) using SQL. To do this, they might need to join a lot of different tables and create new summary tables periodically so that querying can be simpler and faster. But some requests can’t be answered by summary tables and in such case one-off ad-hoc work is required. If there are similar data requests over and over, that indicates a need for another summary table.

Visualization

Not everyone can write SQL queries. Even if you can, writing SQL to get your insights is cumbersome. Visualization and easy-to-access dashboards are very important in terms of democratizing data throughout the company.

There are ranges of choices here: Tableau, Looker, ChartIO, AWS Quicksight and so on. There is some limitation but Airbnb’s Caravel seems very promising (it allows only one materialized view – in other words no support for JOIN). Most of these tools use the SQL interface.

Using data to predict the future

Once you have comprehensive summary tables, you can hopefully answer business units’ questions on data without much issue. The next step as a data organization is to use this data in your data warehouse to improve your actual service itself. You can think of this as providing a feedback loop (both positive and negative feedbacks). Essentially from user actions you learn something about users. It can range from some aggregated insights (what works the best in general) to individual preferences (personalization). This is the main responsibility of data scientist.

This work can be summarized as building a predictive model by learning from the past data. Using the past data as examples, you build a training set and then build a prediction model. Once the model is built the next step is to push this into production.

This sounds easy but building a model requires many iterations. If each iteration has to be done manually it will be slow and prone to errors. Depending on the size of the training set, you might need to use Hive and Spark. In short building a machine learned model needs a scalable data infrastructure.

Data Analyst vs. Data Scientist

I briefly touched upon different roles taken by data scientists and data analysts but let me summarize just to make a bit clearer (again this is purely my personal view):

  • Data analyst serves internal customers by writing reports or creating internal dashboards.
  • Data scientist is more external facing by writing a predictive model (rule based or machine learned). The data scientist’s primary contribution is to help the end users of the service. Examples are building search ranking models or recommendation models.

In terms of skillset, the underlying skills are quite similar. Some statistics knowledge (with R) and familiarity with SQL is a must. One big distinction in my opinion is whether you can code. In my definition a data scientist should be able to code so that prototyping can be done without anybody’s help. Here coding is mostly needed so that the scientist can clean up data and create features. This is in particular because the scale of data you have to deal with would be much larger.

In terms of career path, many data analysts want to be a data scientist at some point in their career. Because of this putting data analysts and data scientists under the same organization provides a better career path for data analysts. Also it can minimize potentially redundant work done by different data teams. But this discussion itself is worth another article.

Conclusion

I briefly shared my personal definition of data engineer, data analyst and data scientist. Just to make it more entertaining and practical, I explained how a data organization typically evolves together.

Screen Shot 2016-08-20 at 6.29.58 PM

Before closing, if you are wondering what my answer for the 2nd question in the opening (“If I have to hire one data person, should I hire a data scientist or a data engineer”) is, it is data engineer. You need data infrastructure to learn from your data. Frankly in the beginning of data organization journey data scientist isn’t really needed unless you already have sizable and clean data which is somewhat unlikely.

In the next blog, I will share my personal opinion on optimal team structure and collaboration model among data teams. I don’t think there is one-size-fits-all structure or model so I will just list different structures I have seen or heard about with pros and cons.

 

Life of a stock trader

Three months ago I decided to become a full-time stock trader. My decision may have surprised some people who didn’t know that I was into stocks, but in fact, I was thinking about it for quite some time. As I have been on my life experiment for a while, I thought it would be interesting to share my experience so far.

How it started

My first exposure to stocks goes back to the beginning of 2013 when I started working at Netflix. What is unique about Netflix’s stock options program is that it lets employees decide on their own how much of their salaries to be allocated into stock options between 0% and 50%. Back then, I had not traded even a single stock in my life and put every penny into a saving account. So it wasn’t a hard decision for me to allocate 0% into stock options. In hindsight, this was the worst investment decision of my life because the Netflix stock was just starting a massive rally that would last in the upcoming years. In 2013 alone, the Netflix stock rose by 350%! I still remember that every day everybody at work showed me off how much money they had made with their stock options while I had nothing to share with them. It was such a painful lesson.

Screen Shot 2016-08-11 at 3.41.41 PM.png
NFLX rallied 1300% from 2013 to 2015.

At the beginning of 2014, I made a bold decision, which turned out to be the best investment decision of my life. I put a significant portion of my salaries into stock options for the year. At first, it wasn’t as a smooth ride as I hoped; instead, the Netflix stock was more or less flat in volatile moves throughout the year. As a result, my stock options were worth nothing. Admittedly, I regretted my decision every fucking day!

But things turned quickly commencing in 2015. The 3rd season of House of Cards hit the air in February, and it was followed by another hit show Daredevil in April. On top of that, Netflix expanded into new countries in Europe as well as Australia and New Zealand. In response to the slew of good news, the Netflix stock moved up by 250% in 2015. Amid this impressive rally, I executed my stock options with 700% gains. It was such a life-changing event for me.

Beginner’s luck

There is a term called “beginner’s luck” in the trading world. This is something that every newbie stock trader experiences, and I was no exception. If you recall, the market was very bullish in late 2015. In particular, the FANG (Facebook, Amazon, Netflix, and Google) stocks were on fire making new highs every other day.

After cashing out Netflix stock options, I wanted to invest my money somewhere else. Since the FANG stocks were going nuts, I aggressively longed them via call options. At first, my strategy seemed to work beautifully. Over 3 months from September to November, I netted $150k gains by dip-buying FANG calls. After a series of success, I became too greedy to see that it was a luck. In December, I made my final move. I bought quarter million dollar worth AMZN and GOOGL calls in anticipation of a so-called Santa rally near the end of the year. (In case you’re not familiar with options, 1 contract of call option covers 100 shares of underlying stock. So this is a hell of a bet.) But then, a brutal market correction kicked in from out of nowhere at the new year, and all the major U.S. stock indices declined by as much as 15% in a month. In January 2016, I gave up most my gains that I had earned with FANG calls earlier. It was lucky that I didn’t lose my principal.

What’s interesting is that almost every new trader has a similar experience in their past. But if you think about it, it makes sense because people usually get interested in stocks when the markets are good. Since everything goes up in a bull market, everyone appears to be a good stock picker. But as soon as a correction hits, they give all their gains back to the markets. For most people, that would be the end of stock trading in their life time. According to a research, the average time for ordinary Americans to quit stock trading is 3 months. People start prudently but soon go out of control after some initial successes, and it all ends in tears. They usually never come back to stocks for life.

Co5orcWWAAA7sYw.jpg

This is sad because stock trading is really a survival game. As long as you stay long enough and disciplined, you can become a profitable trader. Sadly, too many people get blinded by the fantasy of overnight success and blow up their accounts.

Multiple sources of income

I occasionally run into people who say they’re not interested in stocks because their jobs pay them good enough. Although it may be true that you don’t need to worry about income now, you will need to find a way of making extra money if you want to retire sooner than later. Think about Warren Buffet’s quote below for a moment. I bet you don’t want to have to work until you die.

Cmy5QBLUsAAFER_.jpg

While I am a full-time stock trader, I still keep my engineer job on a part-time basis. This helps me psychologically because it saves me from panicking about short-term trading losses. I know I can still pay my bills even if I make no money in trading, so I can remain calm at bad times. Given nothing in life is certain, why wouldn’t you diversify your income? On rainy days, it will for sure help you.

Speaking of income, among people is also a misunderstanding about tax implications of stock trading. Many people believe that short-term trading, which I do for a living, cannot be profitable because of taxes. Although it is true that short-term trading is subject to higher tax rates than long-term investing, tax rates for short-term trading are not higher than income tax rates. If I made $300k short-term capital gains in a year, I would pay the same amount of tax as if I earned them as salaries. All in all, short-term capital gains are considered as part of income, and the same tax rates are applied. Therefore, there is literally no difference between having $300k trading gains and having $300k salaries.

Myth of stock picking

When I say I am a stock trader, most people ask me this question- “What are good stocks to buy now?” Unfortunately, this is a wrong question to ask. Success in trading doesn’t depend on how well you pick good stocks. It rather relies more on how quickly you adjust to changes in the markets.

13000313_10153688923651025_7889859875720795053_n.jpg

How does it make sense? Let’s say you have a 50% chance of being right on a trade. The real secret of being profitable is maximizing gains in winners while minimizing losses in losers. As long as you keep your gains bigger than your losses, your net gains remain positive. As a matter of fact, a lot of professional stock traders don’t have a higher winning rate than 60%. I will even argue it’s almost impossible to achieve a higher winning rate than 80%.

This is not some random theory that I made up but what I experience in my own trading. For example, below is a summary report of my trades for the past 3 months in one of my trading accounts. As can be seen, I placed 144 trades in total, and my win percentage is only 59%. However, I still managed to pull off $70k net gains (after commission fees) during that time. The reason is because I maintained net gains positive on average.

Screen Shot 2016-08-11 at 2.24.55 PM.png
Win percentage is only 59%, but total net gains are still $70k.

So what does this all mean? Simply put, it means you must cut losses as soon as you realize you’re wrong. It’s really a simple concept, but this is ironically the hardest thing to do in the world due to human nature. When most people get caught in a wrong trade, they deny what they see in their eyes; instead, they hope that things will somehow turn around.

A legendary stock trader Ed Seykota once said this- “The elements of good trading are: 1, cutting losses. 2, cutting losses. And 3, cutting losses. If you can follow these three rules, you may have a chance.”

In the end, stock trading is a decision making business. Don’t fool yourself into thinking that you need to pick stocks perfectly to succeed. As long as you can cut losses, profits will follow. Trust me.

Trading journal

As with anything else, analyzing data is the best way of improving your trading. Especially, visualizing your own performance brings an objective view to how you’re really doing. For instance, below is a timeline chart that I drew using the aforementioned trades of mine. Although it’s upward trending, you can see scary dips once in a while. What’s important is, risk signals are louder when visualized, which make you take prompt actions.

Screen Shot 2016-08-11 at 4.07.36 PM.png

Trading journal is nothing but bookkeeping and analyzing results. There are so many facts that you can discover by analyzing data. Just to mention a few, “How many trades did I place this week?”, “What is my winning rate this week?”, “How much did I pay for commission fees this week?”, and so forth.

Prior to May, I didn’t write a trading journal. Even though I knew I had made some gains overall, it wasn’t clear where they’re from and how I can improve them. After crunching some numbers into Excel one day, I was surprised to find out that I had spent $10k for commission fees in April alone by placing hundreds of trades! This immediately rang a bell in my head. The easiest way of improving my trading was simply stop over-trading and save commission fees. After that Eureka moment, I started recording every trade. Since then, my monthly commission fees have never got above $2k again.

Rise of new opportunities

It may sound crazy living off of stock trading, but actually, there are plenty of professional traders out there. Thanks to the advance of technology and distribution of internal knowledge of the Wall Street, individual traders have a better chance to play on a level field with big money than ever. Although it is perhaps too early to draw any conclusion from my life experiment, I do believe stock trader is a new career opportunity for tech-savvy engineers like myself. If you’re still in doubt, check out this Bloomberg article. It’s an amazing story of a Japanese day trader who earned 53 million USD (6 billion JPY) in a year. This is what’s possible in the era of a computerized world!

 

Introduction to Data Engineering

Data engineering can be defined as literally every engineering work on data processing, analytics, visualization, etc. Before going deeper, let’s talk first about why data engineering is important and what its scope is.

Data is the most important asset in a company because it’s the only ground truth with which a company’s business can be optimized. If all the data is easily accessible to all the teams, it can be leveraged in new and exciting ways. Everybody knows about Netflix’s recommendation and personalization engine, which is a perfect example of how business can be optimized by utilizing customer data. We hardly need to reiterate the importance of data; just consider the number of Big Data companies such as Splunk, Elastic, Hortonworks, Cloudera, Teradata, etc.

Data science is the process of data-driven decision making. The prerequisite for data science is data engineering. Data scientists handle data in a statistical manner to solve various problems including not limited to recommendation/personalization, e-mail targeting, market efficiency optimization. Let’s say one delivery company is doing an advertising campaign and promotions to bring about people’s first purchase. Then the most important measure of their spending efficiency is the cost for the first purchase. In this case, those metrics should be visualized by a graph with multiple dimensions including location, properties of targeted users, and time. Usually, data scientists focus more on statistical modeling and data analytics while data engineers work to deliver data, extract information, and store it.

Now, how does data engineering work? With the example of delivery campaign, we need to track how much money we spent on which media where advertisements were published and exposed to users. This event information should be pipelined to a data backend and kept somewhere. Data pipelines and storage systems shine here. To build analytic models, such raw data should be processed and restructured for efficient computation. What is needed here to process data to a specific format is an Extract-Transform-Load (ETL) framework. After building analytic models, we need to reiterate the campaign targeting strategy. When the user information is retrieved in advertisement matching systems, that profile information is matched by updated targeting systems. We need an indexing and retrieval engine to get the user profile and match with the targeting model. Finally, if we want to measure the performance of the campaign strategy, we need to draw, with a visualization technique, a trend that shows whether its performance is getting better or worse.

Data Pipeline

Let’s talk briefly about each data engineering component. First of all, the data pipeline is the entry point; it delivers data from “A” to “B”. “A” can start from the business front-end such as mobile apps or web sites and any intermediate data storages. “B” should be a persistent storage to make data durable. There are two communication models for a data pipeline–the “push” and “pull” models. With the “push” model, data is being sent from the sender to the receiver. The receiver can forward data sequentially based on the configuration but it does not expose any interface that provides random access to data. Apache Flume, Fluentd, Netflix Suro all follow this model. The “pull” model works similarly but it exposes APIs to consume data randomly. Apache Kafka and Simple Queue Service (SQS) of Amazon Web Service (AWS) have APIs to consume messages with offsets or visibility timeout.

The most popular project in the data pipeline area is Apache Kafka. It consists of three parts–the producer, broker, and consumer. The producer sends messages to the broker under specific topics. The broker stores them in local file systems. A topic can have multiple partitions, which for load can be distributed among multiple machines. The broker exposes an API to get the data based on the topic, partition, and offset. The offset can be considered as the simple message ID, starting from 0 and incrementing monotonically. When the consumer sends a request to the broker, the broker responds to the request with a chunk of data. The images are captured from the Kafka website.

Compared to the simple “push” model of the data pipeline, Kafka has extra operational overhead, such as Zookeeper, for the broker and the consumer coordination. Also, all downstream sides should be implemented manually to deliver messages from broker to other storage places such as database or HDFS. But then, why is Kafka so popular?

Kafka is a simple distributed message queue that can work as the buffer for backpressure on the endpoints. Say that we want to store data to Elasticsearch for analytics; the Elasticsearch ingestion rate, though, is somehow degraded. Then, if the message incoming rate is faster than the Elasticsearch ingestion throughput, the data producer side should keep messages in the memory or drop them not to blow up its memory. Kafka can keep messages for a while as long as its disk space is enough. Also, Kafka performance is the best among various distributed message queues because of its contention-free architecture. As mentioned earlier, Kafka messages can be consumed based on topics, partitions and offsets, without any coordination among consumers. This is why Kafka can support broadcast messages where all consumers see all messages. In addition to broadcasting, if the topic has multiple partitions, consumption from those partitions can be distributed to multiple workers. For example, if we have four partitions in a topic, the number of workers can be one, two, on up to four. With one worker, it will consume all messages from four partitions and with four workers, each worker will consume messages from a single partition.

Through the data pipeline, messages will be kept in storage space including NoSQL database, aggregation engine such as Elasticsearch, and cold file storage like HDFS or AWS S3. For storing data in files, the file format should be considered carefully. To express structured data, it is common to use text files with json strings. For better performance, however, a columnar file format like parquet or ORC would be more useful. If we can fully utilize the power of the aggregation engines, we don’t need to store data in the file system. Unfortunately, however, aggregation engines cannot do everything. So for more flexible analysis later, all data should be kept in cold file storage.

Data Indexing

Let’s turn our attention now to indexing and retrieval. For traditional information retrieval that has the power of an inverted index, the most popular choice is Elasticsearch. An inverted index is the mapping from a keyword to a document. If we have a bunch of documents, each document can be mapped as a bag of keywords under the specific document id. If we “invert” this relationship, we can build the index as the mapping from keywords to the document like an index can be found at the end of a textbook. Elasticsearch can support real-time indexing of stream events, various boolean queries and aggregation features. It has a great ecosystem such as Logstash (data pipeline) and Kibana (analytics UI for visualization) with open source community support. The following image is captured from here.

Screen-Shot-2015-02-17-at-1.25.15-PM-1024x692

Elasticsearch ingestion and aggregation performance are not perfect. Due to its indexing architecture–building an immutable index in the memory, flushing it to the disk, and merging them–the process depends heavily on disk IO. Also, Elasticsearch does not support pre-aggregated metrics. For example, it needs to do full scan the data to get the metrics such as minimum, maximum, and average.

Metric Aggregation Engine

A nice workaround for these problems can be Druid (“high-performance, column-oriented, distributed data store”). Druid can be configured with time granularity and a list of defined metrics. For example, if our data is a stream of events showing the response time of specific REST APIs, then we can configure Druid to ingest the data with a minute granularity and average metric on the response time. Then, Druid aggregates the average of the response time every minute so there’s no need for further post-aggregation with scanning individual records.

However, there is no silver bullet. Even though Druid can, in some situations, perform much better than Elasticsearch, Elasticsearch has some advantages over Druid. Since Elasticsearch is a search engine with a document-based model, indexing itself is idempotent and entity centric indexing can be useful to implement various metrics efficiently. But Druid stores each row without any document ID, its ingestion is not idempotent. Also, since it has many moving pieces including heavy dependency on Zookeeper for distribution coordination, its maintenance and operations are not as straightforward as what’s needed for Elasticsearch.

As I described so far, data engineering technologies are largely contributed by open source community. Since each open source project has own pros and cons, it should be carefully chosen based on legacy environment and engineers’ preferences. In the next blog post, I will talk about stream processing.