Sliding Window Hash Join: Efficiently Joining Infinite Streams with Order Preservation

Sliding Window Hash Join: Efficiently Joining Infinite Streams with Order Preservation
This post is part of our ongoing series on streaming SQL engines and advanced join algorithms. In previous posts, we discussed techniques like Probabilistic Data Structures in Streaming: Count-Min Sketch, General-purpose Stream Joins via Pruning Symmetric Hash Joins and Running Windowing Queries in Stream Processing that serve as building blocks. The concepts discussed in this series are implemented in Apache Arrow/DataFusion, which has a bunch of other cool features we haven’t talked about yet.

In a previous post, we discussed the Symmetric Hash Join (SHJ) algorithm and its implementation in Apache Arrow/Datafusion, an advanced feature aimed at enhancing stream processing features of the Datafusion query engine. As part of that work, we introduced the concept of build-side pruning, a technique that boosts performance by removing irrelevant rows from the memory early in the join operation. As we continue to strengthen the capabilities of our query processing engine in our effort to implement the Kappa architecture, we now turn our focus to another innovative incremental processing algorithm - the Sliding Window Hash Join (SWHJ).

The SWHJ is specifically designed for joining two possibly infinite streams while preserving the order of one of the input streams. The ordinary hash join algorithm has order-preserving properties but it is stream-unfriendly; while the SHJ algorithm is stream-friendly but fails to preserve input ordering. The SWHJ algorithm offers a “best of both worlds” solution whenever a sliding window constraint is applicable in one’s use case. Basically, SWHJ improves on the ordinary hash join algorithm by incrementally building/maintaining its hash table instead of materializing/scanning its entire input, whenever possible.

Background

Context: Query processing and streams

Consider the data flow between various stages or operators in a pipelined query execution engine; where each stage/operator performs a specific operation, refining the data as it travels through the system.

In an ideal situation, the query planning step identifies the optimal execution plan for the given query and the dataset. However, in stream processing, not all operators support continuous data flows. Some might disrupt or “break” the pipeline, making the query inexecutable.

These “pipeline-breaking” operators may require to “see” the entire input, or utilize excessively long lookaheads, or simply store impractically large intermediate results. All these behaviors slow down the entire process, or worse, halt the pipeline entirely. The goal, then, is to use only stream-friendly operators that maintain a smooth and uninterrupted flow of data while minimizing processing blocks or unnecessary storage.

While some operations, like sorting, are inherently incompatible with this goal, many scenarios permit solutions using only stream-friendly operators.

Context: Hash Join order preserving characteristics

A hash join operation consists of two main phases, the build phase and the probe phase:

  • In the build phase; we construct a hash table using one of the input tables, commonly referred to as the build side. This hash table is built based on the join keys of the build side.
  • In the probe phase; we scan the other input table, commonly referred to as the probe side. For each probe side row, we consult the hash table created in the build phase to find any matching rows from the build side.

Order-preserving characteristics of the hash join algorithm depends on how we carry out the probe phase: The order in which we scan and process probe side rows is typically the order in which they appear in the original table, which directly reflects to the result of the hash join.

Figure 1: Build and probe sides of the Hash Join algorithm.
Figure 1: Build and probe sides of the Hash Join algorithm.

Furthermore, if our hash table is an insertion-order-preserving hash table, each probe side row’s matching build side rows will follow their original ordering from the build side, implying a lexicographical ordering in our final join results.

In agreement with the above analysis, Datafusion maintains the right (i.e. probe) side order for Inner, RightSemi and RightAnti joins. Moreover, Datafusion’s hash tables preserve insertion ordering. Therefore, if the left side is sorted by a (in ascending order) and the right side is sorted by b (in ascending order), then inner join results will be sorted by b, a (in ascending order).

Sliding Window Hash Join: A brief explanation

As its name suggests, SWHJ is an adaptation/generalization of the ordinary hash join algorithm that can accommodate infinite streams when dealing with sliding windows. Unlike the SHJ algorithm that utilizes two hash tables and treats its two inputs on an equal footing, the SWHJ algorithm fixes one side of the join as the build side (left) and the other side as the probe side (right) — just like the ordinary hash join algorithm. However, unlike the ordinary hash join, SWHJ exploits the sliding window constraint to incrementally build its hash table. This way, it is able to maintain the order of the right input stream while efficiently processing incoming rows from both streams without consuming the entire left stream.

To see why one does not need to materialize the whole build side when executing a sliding window join, let’s examine a simple example query:

SELECT t1.*
FROM t1
JOIN t2
ON t1.id = t2.id AND
   t1.ts >= t2.ts AND
   t1.ts < t2.ts + 1 DAY

where both tables are ordered with respect to the ts column. Now, the join rules are:

  1. The id in both tables must be the same.
  2. The timestamp (ts) in t1 is the same or later than the timestamp in t2.
  3. The timestamp in t1 is less than the timestamp in t2 plus one day.

So, it's like saying: "Show me everything from table t1 where the same id exists in table t2 and the time in t1 is between the time in t2 and that time plus one day."

Since t1 and t2 are both sorted according to ts and we have the sliding window constraint, it is not necessary to materialize every row on the build side for any particular probe row/batch; only considering a days worth of data is enough for producing all viable matches. This presents a significant advantage for stream processing because it limits the required build-side information for a specific probe batch.

Figure 2: Viable matches for a given probe side batch.
Figure 2: Viable matches for a given probe side batch.

The essence of the SWHJ algorithm lies in the creation of a smaller, incrementally maintainable hash table that only contains data falling in this sliding window from the build side. This window's boundaries are calculated using interval arithmetic, a concept we've previously discussed in a blog post.

Each specific row, or batch of rows on the probe side, can only match with a finite-size sliding window on the build side; and we use this information to avoid reading the entire build side. This method becomes significantly advantageous when dealing with infinite streams, allowing us to manage relevant windows within these streams. Moreover, the assumption that both sides could be infinite doesn't hinder the execution of the algorithm anymore. Instead, we are able to designate a boundary for the hashable portion of the stream, all thanks to the join filter expression. In this process, we're essentially dealing with a bounded build side and harnessing the order-preserving attributes of the ordinary hash join algorithm.

Use cases: When is it appropriate?

1. Easy sliding windows via join filters

SELECT t1.*
FROM t1
JOIN t2
ON t1.id = t2.id AND
   t1.ts >= t2.ts AND
   t1.ts < t2.ts + 1 DAY

Join filters defining time-based sliding windows are the most common and natural use cases for the SWHJ algorithm. Above, we reproduce the simple sliding window query from our previous example. Join conditions t1.ts >= t2.ts and t1.ts < t2.ts + 1 DAY create a sliding time window of one day. For each row in t1, the query is looking for matching rows in t2 that have the same id and a timestamp within the preceding day.

SELECT t1.*
FROM t1
JOIN t2
ON t1.id = t2.id AND
   t1.sn >= t2.sn AND
   t1.sn < t2.sn + 10

A slightly different variant of this query may use a count-based sliding window: Assuming that the column sn represents some sort of a sequence number for events, join conditions t1.ts >= t2.ts and t1.ts < t2.ts + 10 describe a sliding window of ten events. For each row in t1, it's looking for matching rows in t2 that have the same id and a sequence number within the preceding ten numbers.

Note that we have not used any special function(s) or syntax to describe these sliding windows. Simply by using the comparison operators (>= and <) within the join condition, we defined our sliding window boundaries, ensuring that only rows within these boundaries are considered for matching and joining. A naive query engine without the SWHJ algorithm would have to consider every row in both tables, which would be inefficient for large datasets, and worsely, inexecutable for continuous data streams.

2. Performing non-causal/symmetric stream joins

Consider the following tables where the attribute sn represents a sequence number for events:

Table t1:

id sn
1 100
1 105
2 200
2 205
2 210
3 300
3 305
3 310

Table t2:

id sn
1 100
1 105
2 200
2 205
3 300
3 305

Let’s say we want to match all events whose sequence numbers are “close”. Note that this relationship is symmetric, so it requires a non-causal join filter as in the query below:

SELECT t1.*, t2.*
FROM t1
JOIN t2
ON t1.id = t2.id AND
   t1.sn > t2.sn - 10 AND
   t1.sn < t2.sn + 10

This query will give us rows from t1 (t2) that have the same id as in t2 (t1), and an sn that is within the range of t2.sn - 10 (t1.sn - 10) and t2.sn + 10 (t1.sn + 10).

The following table shows the outcome when the provided SQL query is executed on the given tables:

id sn id sn
1 100 1 100
1 105 1 100
1 100 1 105
1 105 1 105
2 200 2 200
2 205 2 200
2 200 2 205
2 205 2 205
2 210 2 205
3 300 3 300
3 305 3 300
3 300 3 305
3 305 3 305
3 310 3 305

For the row [2, 205] from table t2, the possible matches from table t1 (the build side) are:

id sn
2 200
2 205
2 210

In a real use case, one often applies some aggregation logic to these matches, but we chose to leave that out so that we can show you the “insides” of how these joins work under the hood.

Digging deeper: Algorithmic details

Let’s start off by declaring our goals with the SWHJ algorithm. We aim to:

  1. Support all types of joins and achieve order preservation for all join types that are amenable.
  2. Keep buffers for both sides finite.
  3. Preserve the right side order, whenever possible, by exclusively probing the right side.

By focusing on these objectives, we can develop an efficient algorithm for joining infinite streams while maintaining the order of the right input stream and accommodating various join types.

The Algorithm

We will discuss the SWHJ algorithm at two levels: First, we will give a high-level description of the SWHJ algorithm. Next, we will present pseudocode laying out the algorithm as a finite state machine.

The algorithm starts off by initializing the left (build) side buffer and a data structure to keep track of the visited rows from the left side. Throughout its operation, the algorithm will always maintain an interval that stores left side buffer boundaries. Recall that we will continuously processes any incoming data from both left and right streams — we will not consume any one side exclusively until exhaustion.

  • Every time the algorithm fetches a RecordBatch from the right stream, it computes the interval storing the range of this batch. Then, it uses this interval to shrink the build side interval using constraint propagation. This shrunk range will determine the new build-side sliding window bounds.
Figure 3: Buffers for a query with join conditions l.d = r.b and r.a > l.c - 15 and r.a < l.c + 2.
  • If the latest element in the left buffer isn't enough to cover the whole sliding window, the algorithm keeps fetching more data from the build side stream. Once there's enough data; it uses the build side buffer to perform a hash join between the right record batch and the left buffer. If the join type is Inner, Right, RightSemi or RightAnti, the algorithm can produce order preserving results since the right side is the probe side.
  • For Left, LeftSemi, LeftAnti and Full join types, the algorithm records the visited rows from the left side.
  • With lower and upper bounds of the sliding window in hand, the algorithm prunes the build side buffer using the lower bound. If the join type is Left, Full, LeftSemi or LeftAnti, it also produces unmatched results from pruned left rows.
Figure 4: Matching and pruning for a query with join conditions l.d = r.b and r.a > l.c - 15 and  r.a < l.c + 2.
Figure 4: Matching and pruning for a query with join conditions l.d = r.b and r.a > l.c - 15 and  r.a < l.c + 2.
  • When there is no more data in the right stream, the algorithm continues to pull left stream and calculates any unmatched results if the join type is Left, LeftSemi, LefttAnti or Full. Otherwise, it just finishes the execution.
  • If there is no more data in the left stream, the algorithm continues the calculation by reading the right stream one by one as usual, but it won't poll the left stream anymore. The remaining build side buffer is used to continue the execution.

The above list highlights the most important points, but we feel a state machine approach is more helpful for anyone who wants to implement the SWHJ for themselves:

Repeat indefinitely:
  Check the state of the process:
      Case "PullProbe":
          Fetch next item from right_stream
          If an item is received:
              Update metrics
              Check if batch is suitable for further processing
              If not suitable, break the loop
          If no item received, break the loop
          If there is an error, return error status
          If no batches were collected, change state to "ProbeExhausted" and continue
          Process collected batches and calculate necessary parameters for the next state "PullBuild"
			
      Case "PullBuild":
          Fetch next item from left_stream
          If an item is received, process the item and update metrics and check conditions
          If no item received, update the state to "BuildExhausted"
          If there is an error, return error status

      Case "BuildExhausted":
          Update state to "Join"

      Case "ProbeExhausted":
          Fetch next item from left_stream
          If an item is received, process the item, update metrics and return result if exists
          If no item received, update state to "BothExhausted" with final result as false
          If there is an error, return error status

      Case "Join":
          Perform join operation and calculate necessary parameters
          Prune internal state if possible
          Combine results and update state to "PullProbe"
          Update metrics and memory usage
          If result exists, return the result

      Case "BothExhausted"with final result true:
          Return "None" indicating we are done

      Case "BothExhausted" with final result false:
          Create result from the current state
          Update state to "BothExhausted" with final result as true
          If result exists, update output metrics and return the result

Roadmap

There are various possible enhancements that can make our query engine even more performant and stream friendly. Some enhancements we plan to work on in the near future include:

  • Improving the buffered hash table implementation for higher performance,
  • Using this plan in complex, order-preserving stream processing pipelines,
  • Developing even more pipeline friendly join operators.

Inspirations

  1. The Dataflow Model: Balancing Correctness, Latency, and Cost in Stream Processing - Tyler Akidau, Alex Balikov, et al. - Exploring the practical approach to processing massive-scale, unbounded data with low-latency and cost-effectiveness.
  2. A Survey on Transactional Stream Processing - Shuhao Zhang, Juan Soto, Volker Markl - Transactional stream processing (TSP) strives to create a cohesive model that merges the advantages of both transactional and stream-oriented guarantees. This survey provides insights into diverse TSP requirements and methodologies and aims to inspire the design of groundbreaking TSP systems.
  3. Designing Data-Intensive Applications: Principles for Reliable, Scalable, and Maintainable Systems - Martin Kleppmann - Unveiling the big ideas behind building data-intensive applications, including stream processing and distributed systems.

About Synnada

Synnada helps you build interactive, collaborative, intelligent, real-time data applications for your mission-critical systems, within minutes and with ease — powered by SQL and online machine learning.

We have a simple goal: Unify data, make it real-time and accessible for AI-driven applications. We strive to empower data artisans of the future by providing them with elemental blocks and infrastructure components required to create, maintain and optimize intelligent, real-time, mission-critical data systems.

Metehan Yıldırım

Metehan Yıldırım

Software Engineer @ Synnada
Mehmet Ozan Kabak

Mehmet Ozan Kabak

Co-founder and CEO @ Synnada

Get early access to AI-native data infrastructure