This post is part of our ongoing series on streaming SQL engines and advanced join algorithms. In previous posts, we discussed techniques like Probabilistic Data Structures in Streaming: Count-Min Sketch, General-purpose Stream Joins via Pruning Symmetric Hash Joins and Running Windowing Queries in Stream Processing that serve as building blocks. The concepts discussed in this series are implemented in Apache Arrow/DataFusion, which has a bunch of other cool features we haven’t talked about yet.
In a previous post, we discussed the Symmetric Hash Join (SHJ) algorithm and its implementation in Apache Arrow/Datafusion, an advanced feature aimed at enhancing stream processing features of the Datafusion query engine. As part of that work, we introduced the concept of build-side pruning, a technique that boosts performance by removing irrelevant rows from the memory early in the join operation. As we continue to strengthen the capabilities of our query processing engine in our effort to implement the Kappa architecture, we now turn our focus to another innovative incremental processing algorithm - the Sliding Window Hash Join (SWHJ).
The SWHJ is specifically designed for joining two possibly infinite streams while preserving the order of one of the input streams. The ordinary hash join algorithm has order-preserving properties but it is stream-unfriendly; while the SHJ algorithm is stream-friendly but fails to preserve input ordering. The SWHJ algorithm offers a “best of both worlds” solution whenever a sliding window constraint is applicable in one’s use case. Basically, SWHJ improves on the ordinary hash join algorithm by incrementally building/maintaining its hash table instead of materializing/scanning its entire input, whenever possible.
Consider the data flow between various stages or operators in a pipelined query execution engine; where each stage/operator performs a specific operation, refining the data as it travels through the system.
In an ideal situation, the query planning step identifies the optimal execution plan for the given query and the dataset. However, in stream processing, not all operators support continuous data flows. Some might disrupt or “break” the pipeline, making the query inexecutable.
These “pipeline-breaking” operators may require to “see” the entire input, or utilize excessively long lookaheads, or simply store impractically large intermediate results. All these behaviors slow down the entire process, or worse, halt the pipeline entirely. The goal, then, is to use only stream-friendly operators that maintain a smooth and uninterrupted flow of data while minimizing processing blocks or unnecessary storage.
While some operations, like sorting, are inherently incompatible with this goal, many scenarios permit solutions using only stream-friendly operators.
A hash join operation consists of two main phases, the build phase and the probe phase:
Order-preserving characteristics of the hash join algorithm depends on how we carry out the probe phase: The order in which we scan and process probe side rows is typically the order in which they appear in the original table, which directly reflects to the result of the hash join.
Furthermore, if our hash table is an insertion-order-preserving hash table, each probe side row’s matching build side rows will follow their original ordering from the build side, implying a lexicographical ordering in our final join results.
As its name suggests, SWHJ is an adaptation/generalization of the ordinary hash join algorithm that can accommodate infinite streams when dealing with sliding windows. Unlike the SHJ algorithm that utilizes two hash tables and treats its two inputs on an equal footing, the SWHJ algorithm fixes one side of the join as the build side (left) and the other side as the probe side (right) — just like the ordinary hash join algorithm. However, unlike the ordinary hash join, SWHJ exploits the sliding window constraint to incrementally build its hash table. This way, it is able to maintain the order of the right input stream while efficiently processing incoming rows from both streams without consuming the entire left stream.
To see why one does not need to materialize the whole build side when executing a sliding window join, let’s examine a simple example query:
The essence of the SWHJ algorithm lies in the creation of a smaller, incrementally maintainable hash table that only contains data falling in this sliding window from the build side. This window's boundaries are calculated using interval arithmetic, a concept we've previously discussed in a blog post.
Each specific row, or batch of rows on the probe side, can only match with a finite-size sliding window on the build side; and we use this information to avoid reading the entire build side. This method becomes significantly advantageous when dealing with infinite streams, allowing us to manage relevant windows within these streams. Moreover, the assumption that both sides could be infinite doesn't hinder the execution of the algorithm anymore. Instead, we are able to designate a boundary for the hashable portion of the stream, all thanks to the join filter expression. In this process, we're essentially dealing with a bounded build side and harnessing the order-preserving attributes of the ordinary hash join algorithm.
Note that we have not used any special function(s) or syntax to describe these sliding windows. Simply by using the comparison operators (>= and <) within the join condition, we defined our sliding window boundaries, ensuring that only rows within these boundaries are considered for matching and joining. A naive query engine without the SWHJ algorithm would have to consider every row in both tables, which would be inefficient for large datasets, and worsely, inexecutable for continuous data streams.
Let’s say we want to match all events whose sequence numbers are “close”. Note that this relationship is symmetric, so it requires a non-causal join filter as in the query below:
The following table shows the outcome when the provided SQL query is executed on the given tables:
In a real use case, one often applies some aggregation logic to these matches, but we chose to leave that out so that we can show you the “insides” of how these joins work under the hood.
Let’s start off by declaring our goals with the SWHJ algorithm. We aim to:
By focusing on these objectives, we can develop an efficient algorithm for joining infinite streams while maintaining the order of the right input stream and accommodating various join types.
We will discuss the SWHJ algorithm at two levels: First, we will give a high-level description of the SWHJ algorithm. Next, we will present pseudocode laying out the algorithm as a finite state machine.
The algorithm starts off by initializing the left (build) side buffer and a data structure to keep track of the visited rows from the left side. Throughout its operation, the algorithm will always maintain an interval that stores left side buffer boundaries. Recall that we will continuously processes any incoming data from both left and right streams — we will not consume any one side exclusively until exhaustion.
The above list highlights the most important points, but we feel a state machine approach is more helpful for anyone who wants to implement the SWHJ for themselves:
There are various possible enhancements that can make our query engine even more performant and stream friendly. Some enhancements we plan to work on in the near future include:
Synnada helps you build interactive, collaborative, intelligent, real-time data applications for your mission-critical systems, within minutes and with ease — powered by SQL and online machine learning.
We have a simple goal: Unify data, make it real-time and accessible for AI-driven applications. We strive to empower data artisans of the future by providing them with elemental blocks and infrastructure components required to create, maintain and optimize intelligent, real-time, mission-critical data systems.