Blog

Do We Need Distributed Stream Processing?

In the era of big data and AI, many data-intensive applications exhibit requirements that cannot be satisfied by traditional batch processing models. Streaming applications, such as click stream analytics, IoT data processing, network monitoring, or financial fraud detection, must support high processing rates (e.g. 500 million tweets/day, 1.45 billion active users in Facebook, etc.) yet consistently achieve sub-second processing latencies. In response, distributed stream processing systems, such as Spark Streaming or Apache Flink, exploit the resources of a compute cluster for streaming applications. Their aim is to benefit from the aggregated throughput of many processing nodes. As with any distributed system, this raises the question of how efficiently distributed stream processing systems utilise the available hardware resources on each node. The performance of a single processing node is crucial, as it determines the size of a compute cluster needed to meet the throughput and latency requirements of a given streaming application.

When it comes to single node performance, a stream processing system must consider (a) what type of parallel processors do nodes offer (i.e. multi-core CPUs, GPUs, FPGAs) and (b) how to parallelise the streaming computation effectively. With highly-parallel heterogeneous architectures becoming commonplace in data centres, stream processing systems can exploit previously unseen levels of parallel processing even from single nodes.

In this blog post, we compare the performance of an efficient stream processing engine designed for single servers, SABER, with that achieved by popular distributed stream processing systems, Apache Spark and Apache Flink. We also compare the results to that by StreamBox, another recently proposed single-server design that emphasises out-of-order processing of data. Based on our results, we argue that a single multicore server can provide better throughput than a multi-node cluster for many streaming applications. This opens an opportunity to cut down system complexity and operational costs by replacing cluster-based stream processing systems with (potentially replicated) single server deployments.

Experimental set-up

For our comparison, we use the Yahoo Streaming Benchmark as a workload. A limitation of this benchmark, also reported by others ( Apache Flink, Apache Apex, Differential Dataflow), is that it fails to capture the rich semantics of sliding window computation in streaming applications. Sliding windows are arguably among the most challenging aspects of stream processing and have profound implications on how to efficiently parallelise computation. Despite these limitations, the benchmark has recently been used for system evaluation both in industry (Databricks, Data Artisans) and academia ( Spark Streaming, Drizzle). This makes our results comparable to prior efforts.

Single Node Comparison
Figure 1: Single server throughput for Yahoo Streaming Benchmark

The Yahoo Streaming Benchmark was designed to emulate an advertisement streaming application. It has a streaming query with four operators: filter, project, join (with relational data) and aggregate (a windowed count). In our implementation, input tuples are 128 bytes and stored directly in a Java ByteBuffer.

We perform the experiments on 6 servers (1 master and 5 slaves) with 2 Intel Xeon E5-2660 v3 2.60 GHz CPUs with a total of 20 physical CPU cores, a 25 MB last level cache (LLC) cache and 32 GB of memory. The machines are connected with 10 Gbps Ethernet. We evaluate the query with SABER (without its GPU support), Spark 2.4.0, Flink 1.3.2 and the last version of StreamBox. For the distributed experiments, we use only 8 cores per node, as we did not see any significant change in throughput after this number.

We designed our experiments with the explicit intention to isolate the performance of the stream processing system from external influences. For that purpose, we conduct the experiments in the following manner:

  • For SABER, we initially generated the data on a separate machine. Since only a single CPU core manages to saturate the 10 Gbps network connection (8.3 million tuples/sec), we instead generate data in-memory.
  • For Spark and Flink, we follow the approach from previous blogposts (Apache Spark and Apache Flink)
  • For StreamBox, we extend the wordcount example and create our own source and operators. StreamBox represents windowed operations as session windows with watermarks that define the end of a window. We generate ordered data with at least one source thread per worker to avoid a bottleneck. We emit watermarks every 10 seconds, in order to create 10-second tumbling windows.

Throughput comparison

Figures 1 and 2 show the scalability of the four systems in terms of throughput as we increase the number of available CPU cores. With a single node, Flink performs better than both Spark and StreamBox, increasing throughput by more than 1.9x. With 8 CPU cores, Spark and StreamBox have comparable throughput of 12 million and 11 million tuples per second, respectively, while Flink achieves more than 22 million.

Compared to the other systems, SABER exhibits nearly 7x, 3x and 7x better throughput than Spark, Flink and StreamBox, respectively. It processes almost 79 million tuples per second with 8 CPU cores. SABER surpasses the best single-node throughput of the other systems with just two CPU cores. Apart from not exploiting the memory hierarchy and minimising data copying, the throughput of both Flink and Spark is affected adversely by communication and serialisation overheads. This is expected – their distributed designs try to take advantage of the aggregated performance of multiple nodes.

With such a simple query, our experiment mostly measures how fast the systems can perform data movement because the time spent on meaningful computation is small. SABER binds worker and generator threads to CPU cores in order to minimise the memory accesses beyond the L2 cache. In addition, we maintain an input buffer of 512 KB, which ensures that all active data is kept in the LLC. We use atomic operations to write and read parts of this buffer with negligible synchronisation cost.

Cluster Throughput Comparison
Figure 2: Cluster throughput for Yahoo Streaming Benchmark

It is interesting to observe that, in contrast to recent results, Flink performs better than Spark, even with the cluster deployment. This increase in Flink's throughput is due to the faster 10 Gbps network, which we believe was not used in prior work (see the Structured Streaming paper). With StreamBox, we observe fatal memory leaks when we increase the ingestion rate to more than what we report in the figure.

In summary, SABER with just 8 CPU cores achieves better performance than Spark with 5 worker nodes (40 cores; 55 million tuples/sec) and Flink (40 cores; 67 million tuples/sec). A carefully tuned single-server system can outperform a compute cluster, reducing the required resources by more than half.

What is the COST of distribution?

According to the previously-proposed Configuration that Outperforms a Single Thread (COST) metric, we analyse the performance of the systems by comparing their single-core implementations with a handwritten C++ program. The C++ implementation processes almost 23 million tuples per second (i.e. it is 2x faster than SABER) on our testbed server. This result is largely consistent with Frank McSherry's implementation (35 million tuples/sec), which ran on a laptop with a higher base clock speed.

The following table compares our reported throughput results with that of the single-threaded C++ implementation. The outcome is still the same: there remains a large performance gap between the handwritten code (even though it is not fully optimised) and current stream processing systems.

  Spark Flink SABER Handwritten C++
Throughput (million tuples/sec) 2 4.8 11.8 23
Table 1: Single CPU core throughput for Yahoo Streaming Benchmark

It is important to understand the reasons behind this performance gap and use these to design hardware-conscious stream processing systems. We have begun designing highly efficient streaming operator implementations that exploit superscalar execution and SIMD parallelism. We also work on compilation-based techniques to keep data in CPU registers as long as possible, maximising data and code locality (Hyper). As we see from the results above, maintaining data in the LLC already results in a major performance benefit. We also envision a set of hardware-oblivious primitives that take into account the non-uniform memory access (NUMA) caused by multiple CPU sockets on modern scale-up architectures.

Conclusions

With the availability of large DRAM sizes, many CPU cores, and accelerators in data centres, the design of stream processing systems must focus on hardware-conscious techniques. In the modified version of the Yahoo Streaming Benchmark, SABER processes 79 million tuples per second with 8 CPU cores, outperforming Flink (3x), Spark Streaming (7x) and StreamBox (7x). It exhibits better performance than a cluster-based deployment with 40 CPU cores. Our results also show that there is still a performance gap to fill with a single node, and we argue that this constitutes an opportunity when designing the next-generation of stream processing engines. Finally, regarding the prior critique of the Yahoo Streaming Benchmark, we agree that the benchmark does not capture the behavior of real-world streaming applications, which are often compute-intensive.

All of our experimental scripts are available online, and you are welcome to try yourself and reproduce our results: https://github.com/lsds/StreamBench

Date Posted
Sunday, June 10, 2018
Authors
Related Projects
Links

Related Publications