This post is part of our ongoing series on streaming SQL engines and advanced join algorithms. In previous posts, we discussed techniques like Probabilistic Data Structures in Streaming: Count-Min Sketch, General-purpose Stream Joins via Pruning Symmetric Hash Joins, and The Sliding Window Hash Join Algorithm that serve as building blocks. The concepts discussed in this series are implemented in Apache Arrow/DataFusion, which has a bunch of other cool features we haven’t talked about yet.
Most common use cases in time-series data involves windowing the data in some way or another. Many data processing frameworks and/or languages, including standard SQL, define windowing operators to support such use cases. Efficiently running such operators is an interesting (and surprisingly quite non-trivial) area of engineering, especially when we consider it as a milestone towards achieving the Kappa architecture — where one utilizes a unified compute engine that can handle both batch and streaming datasets.
In this blog post, we take a first step into this topic by analyzing the circumstances under which window queries can be run in streaming data applications without breaking the pipeline; i.e. in an on-line manner without breaking the stream. In other words, if we can calculate the window function results incrementally without scanning the entire input, we say that the window function is streamable or pipeline-friendly.
Before running a query, a query execution engine builds a physical plan that breaks down the query into a sequence of low-level executors/operators that compose together to compute what is described by the query. As the query runs, data flows between these operators and the engine incrementally builds/computes the final result in a pipelined fashion. If any of the executors/operators break the pipeline (meaning, it needs to see/scan all its input data before generating any result), the engine can not run the query in streaming/long-running use cases involving unbounded data sources. Therefore, our aim is to generate streaming-friendly physical plans whenever it is theoretically possible to do so. This blog post analyzes streaming execution support for window functions from this perspective.
A window function is a function that uses values from one or multiple rows to return a value for each row. The rows used by the window function to generate its results is determined by the window frame clause, which has the following syntax in SQL:
Then, we would get a final result as if the following partitition-less query was executed for each partition separately:
The final result would be the following table:
Typically; if a window query can generate its results incrementally as it scans its input, the output ordering of the window query will reflect the ordering of its input. However, this is not a strict requirement as long as implementations are concerned; a query execution engine may split table into multiple partitions and then merge these results without considering their original order . Hence, the following result would also be valid:
In this section, we will analyze window specifications from the lens of streamability. In other words, we will analyze different kinds of window specifications and discuss whether it is possible to execute queries involving such windows without breaking the data processing pipeline.
In all of our examples, we will assume that the input of the windowing operator in question is a table named source. Schema and example data for the table source can be found below.
In this case, we have window expressions of the following form:
For a more in-depth syntactic specification as well as a general discussion of SQL window functions, see .
Consider the following expression as an example:
Next, let’s turn our attention to the window frame itself. The table below shows which rows are necessary to compute the sum value for each row in our table.
As can be seen from this table, our specific example lends itself to computing/finalizing the window function result for any given row whenever data for the subsequent row is received. This motivating example leads us to pose our next condition for streamability:
Generalizing our learnings, we can pose the following streamability condition for windowing operations with lexicographical ordering requirements:
The leading ordering requirement (the first ordering requirement) and the actual leading ordering (the first expression specifying the existing input ordering) should match .
Note that this condition is actually a generalization/relaxation of our first condition rather than a new, independent condition.
In this case, we have window expressions of the following form:
Let’s start our analysis through the following windowing operation:
In our example, the table will be split into the partitions below:
and so on.
The key requirement for supporting streaming execution for this windowing operation is the ability to somehow tell when a specific partition finalizes; as only then we can emit its associated results for window frames that “touch” the partition boundary.
Let’s first consider cases where the existing data (input) ordering helps us deal with partitions. In general; we can support streaming execution for a windowing operation involving partitions if the existing data ordering can be exploited to locate partition boundaries. We can formalize this observation as follows:
Interestingly, we have even more flexibilities when we have composite partition keys. For example, consider the following window expression:
and so on.
The table below shows some examples showing streamable and non-streamable cases.
Even when the existing data (input) ordering doesn’t help us deal with partitions, we can still support streaming execution in certain cases by leveraging hashing. Specifically, consider the case where:
As an example, consider the window expression below:
and so on.
Now, observe that the window expression reduces to the following per-partition window expression:
Analyzed independently, this per-partition window expression satisfies our streamability conditions. This gives us a way out:
If our conditions for the partition-less case holds for every partition separately; then we can operate on, prune and emit results for each partition concurrently as new data arrives. However, since we will never know when a partitions ends, we will have to keep track of each distinct partition throughout execution. In high cardinality cases, this version may use significant memory, but it is nevertheless pipeline-friendly.
We are done with our case analysis, so let’s put together everything we have covered so far in one single statement: For a window expression to be streamable, or pipeline-friendly, we require:
For the sake of clarity, we also provide a truth table encoding the above statement:
When a query contains multiple window expressions, the query is streamable (as a whole) if all the constituent window expressions are individually streamable. As an example, consider the query below:
Next, let’s analyze a more complex example. Consider the query below:
Note that the ordering requirement is exactly the opposite of the original ordering requirement. After this re-write, our two window expressions now have compatible requirements amenable to streaming execution, making the whole query streamable.
The section above lays out the core idea for window expression reversals, but some details were left vague. For example, what exactly does “modifying/re-writing” the window frame clause mean? We explore such details in this section.
The algorithm for constructing/re-writing equivalent window frames is actually rather simple:
A Rust-like pseudocode implementation of this logic is as follows:
We can summarize our findings on window frame reversal as follows:
If the ordering requirement of a window expression and the existing data (input) ordering is exactly the opposite, the window expression is still streamable if streamability conditions hold for the alternate (i.e. “reversed”) version of the expression.
Fortunately, we can incrementally calculate/update results for most window functions without storing all the associated rows.
 PostgreSQL documentation: Window function processing
 Wikipedia article on SQL window functions.
 When there are multiple equivalent input orderings, it is enough for this statement to be true for one such ordering.
 Streaming systems by Tyler Akidau, Slava Chernyak and Reuven Lax. This is an excellent book to learn what is “under the hood” in stream processing systems. You will not just learn how to use a specific stream processing library or framework, but digest general stream processing concepts and how one can implement such libraries or frameworks. In this sense, this book will always be relevant.
 Efficient Processing of Window Functions in Analytical SQL Queries by Viktor Leis, Alfons Kemper, Kan Kundhikanjana and Thomas Neumann is great read for those who want to learn more about implementations of window functions.