# General-purpose Stream Joins via Pruning Symmetric Hash Joins

As the Lambda architecture that separates batch and stream processing systems gives way to the Kappa architecture which argues for a unified approach, a significant amount of attention is again focusing on stream joins in the data space. At Synnada, we strongly support the ideas and motivations behind the Kappa architecture, and we have been working hard on readying Apache Datafusion for this transformation.

Today, we are excited to announce the addition of Symmetric Hash Join to Apache Datafusion. This new feature will greatly enhance the capabilities of our query processing engine and give users even more powerful tools for querying, analyzing, and managing large datasets in real time.

Symmetric Hash Join will integrate into DataFusion's existing API seamlessly, fully compatible with Datafusion's existing static data sources and streams. Whether you have static data with cardinality constraints or streaming data, Symmetric Hash Join will help you process large datasets faster and more efficiently, enabling you to generate low-latency, data-driven results.

Our implementation of this feature includes an advanced optimization technique called build-side pruning, which allows the algorithm to detect early on when certain rows no longer matter to the join result and remove them from memory. This results in a significant performance boost and more efficient use of computational resources.

In this blog post, we'll take a closer look at the Symmetric Hash Join algorithm. We will explain how it works, its advantages, and the benefits of using it. We'll also show you how to use it in your Datafusion pipeline and provide tips and tricks for getting the most out of it.

We hope that this new addition will help you to achieve your big data goals more easily and with greater speed, and we welcome your feedback and questions.

# Symmetric Hash Join

## A brief explanation

To explain the Symmetric Hash Join (SHJ) algorithm, let’s first quickly remind ourselves how the ordinary Hash Join algorithm works. At a high-level, the latter works in a two-step fashion: It first consumes one of its inputs (the build side) to build a hash table, and then consults this hash table while consuming its other input (the probe side) to determine matches. The SHJ algorithm eliminates this build/probe side asymmetry by maintaining a hash table for each input, and consults these tables continuously as it consumes both its inputs.

## When is it appropriate?

The SHJ algorithm is typically useful when quick response times are needed, or when inputs are unbounded. It can be used in various types of data processing tasks, such as:

• Real-time streaming: The SHJ algorithm can be used in real-time streaming scenarios to join data from multiple streams with low latency. For example, it can be used to join data from IoT devices with data from other sources, such as weather data.
• Large datasets with low cardinality: The SHJ algorithm can be used to join large datasets with low cardinality by exploiting sliding window join properties. When the low cardinality columns are used as keys in sliding window joins, a lot of unnecessary computational work is done when we first check the equality, making the join operation inefficient. This can lead to long processing times and high resource usage, making it difficult to meet low-latency processing requirements.

## How does it work?

### Context: Query processing and streams

In a query processing engine, data flows in between different operators or stages within the engine, and each operator or stage performs a specific operation on the data. The query planning step is responsible for determining the optimal execution plan for the query, which simply means selecting the applicable operators, configuring them and connecting them in some optimal sense.

However, when processing streams, this is not as straightforward as it sounds, since not all operators are stream-friendly. Stream-unfriendly operators are often called pipeline-breaking operators, and we need to get the job done by utilizing only stream-friendly operators that allow our data streams to flow smoothly through the entire pipeline.

This means that none of our operators should block processing by waiting for a large amount of input data without producing any output, or by writing intermediate results to disk (if possible). While some operations (such as sorting) are inherently not compatible with this concept (and thus are pipeline-breaking), many use cases allow solutions involving only stream-friendly operators.

### Joins and streams

Are join operators pipeline-breaking? To answer this question, let’s imagine a scenario where we have static data on one side, and a stream on the other:

The current implementation of the ordinary Hash Join algorithm in Datafusion (and in many other engines) materializes the entire left side as the build side, constructing the hash table only once and allowing the right side to stream. The implementation also supports the converse case by simply swapping sides. Circling back to our question, we see that the answer is yes for the build side, and no for the probe side.

Obviously, materializing one side is not an option if both sides are infinite streams — enter the SHJ algorithm, which partially materializes both sides incrementally:

This makes SHJ stream-friendly for both sides. However, it still does not give us a carte-blanche to join two infinite streams in general. If we did that, the hash tables would simply grow over time and we would eventually run out of memory. Therefore, there needs to be a mechanism to limit the sizes of these tables if we were to ever use the SHJ algorithm on infinite streams.

Fortunately, a stream join is performed on a finite time window in many practical use cases, which gives us an avenue to limit sizes of these hash tables. We can employ SHJ so long as the inner batch buffer is flushed frequently in accordance with the finite sliding window, maintaining a compact hash table.

Obviously, if we have some other join filtering logic that limits table sizes instead of a sliding time window, we would need to flush according to that filtering logic. Therefore, to solve this task in a generic fashion, one needs a way to calculate bounds on arbitrary expressions when certain favorable conditions are present in the data stream (like increasing timestamps/column values).

⚠️ For more information about stream processing methods, see:
[1] Mironov, Eugene. "In-Stream Big Data Processing." Highly Scalable Blog, 20 Aug. 2013,
https://highlyscalable.wordpress.com/2013/08/20/in-stream-big-data-processing/


The figure above explains what happens when a new collection of rows (called a RecordBatch in Datafusion) arrives on one side:

1. We use the join keys to look up the corresponding row on the other side’s inner buffer (hash table). If a match is found, the rows are joined and recorded as visited.
2. We prune the rows that fall outside the range of possible join key values.

As long as we maintain a reasonably sized inner buffer for each side and there is some filtering that limits buffer sizes, infinite streams can be joined correctly just like static data.

### Existing Symmetric Hash Join implementations

In a typical stream processing library like Apache Flink or Apache Spark, the join operation can be performed using watermarks. Let's examine a query taken from the Apache Spark docstring:

SELECT * FROM left_table, right_table
WHERE
left_key = right_key AND
left_time > right_time - INTERVAL 12 MINUTES AND
left_time < right_time + INTERVAL 2 HOUR

In this query (), say each join side has a time column, named "left_time" and "right_time", and there is a join condition "left_time > right_time - 8 min". While processing, say the watermark on the right input is "12:34". This means that from henceforth, only right inputs rows with "right_time > 12:34" will be processed, and any older rows will be considered as "too late" and therefore dropped. Then, the left side buffer only needs to keep rows where "left_time > right_time - 8 min > 12:34 - 8m > 12:26". That is, the left state watermark is 12:26, and any rows older than that can be dropped from the state. In other words, the operator will discard all states where the timestamp in state value (input rows) < state watermark.

Actually, this is part of the picture, not the whole. In theory, range-based pruning can be done with any sorted field (not just the watermark field) and with any arbitrary join filter condition that is amenable to this type of data pruning. However, Apache Spark overfits to timestamps and associates the pruning operation with a watermark. Let’s follow a different approach and examine the following query from a more general, first-principles perspective:

SELECT * FROM left_table, right_table
WHERE
left_key = right_key AND
left_sorted > right_sorted - 3 AND
left_sorted < right_sorted + 10


If sort orders of the two columns (left_sorted and right_sorted) are ascending, and the join condition is left_sorted > right_sorted - 3, and the latest value on the right input is 1234, then the left side buffer only has to keep rows where left_sorted > 1231 and any rows coming before this boundary can be dropped from the buffer. Note that this example is in no way specific; similar scenarios can manifest with a variety of orderings and join filter expressions.

This becomes very powerful when we consider unified batch and stream processing as range conditions can be used in a wide variety of use cases. Let’s examine a slightly more complex join:

SELECT * FROM left_table, right_table
WHERE
left_key = right_key AND
left_1 + left_2 > right_sorted - 3 AND
left_1 + left_2 < right_sorted + 10


If sort orders of the two expressions (left_1 + left_2 and right_sorted) are ascending, we can still prune each side. However, calculating the expression/variable bounds might be challenging since the involved math is now getting more complex. In our work, we applied a well-known, systematic approach from the domain of mathematical optimization to this context, using interval arithmetic to calculate expression bounds and constraint propagation to refine variable bounds.

## Interval Arithmetic

Interval arithmetic is a numeric technique used by many successful global optimization and equation-solving algorithms. It extends ordinary arithmetic to interval values, similar to how ternary logic extends boolean logic. Following the rules of interval arithmetic, one can estimate the range of possible values an expression can take on in a single evaluation pass. Optimizers and solvers utilize this capability to eliminate infeasible regions of their search spaces early on so as to focus on viable regions. In the context of SHJ, we will use interval arithmetic to prune non-viable rows from our hash tables.

⚠️ For more information about interval arithmetic, see:
[2] Kabak, Mehmet Ozan. Analog Circuit Start-Up Behavior Analysis: An Interval Arithmetic Based Approach, Chapter 4. Stanford University, 2015.
[3] Moore, Ramon E. Interval analysis. Vol. 4. Englewood Cliffs: Prentice-Hall, 1966.


### Interval operators

Interval arithmetic provides a set of mathematical operators that can be used to perform calculations on intervals. These operators include basic arithmetic operations such as addition, subtraction, multiplication, and division, as well as more advanced operations such as intersection, union, and set difference. One can compose these operators to calculate the range of possible values for a function in a specific region of the search space, facilitating rapid identification and elimination of infeasible regions of the search space. The elementary arithmetic operations are defined as:

1. Addition: ${\displaystyle [x_{1},x_{2}]+[y_{1},y_{2}]=[x_{1}+y_{1},x_{2}+y_{2}]}.$ For example, if we had two intervals A = [1, 2] and B = [3, 4], the sum of these intervals would be A + B = [1 + 3, 2 + 4] = [4, 6].
2. Subtraction: ${\displaystyle [x_{1},x_{2}]-[y_{1},y_{2}]=[x_{1}-y_{2},x_{2}-y_{1}]}.$ For example, if we have two intervals A = [1, 2] and B = [3, 4], the difference of these intervals would be A - B = [1 - 4, 2 - 3] = [-3, -1].
3. Multiplication: ${\displaystyle [x_{1},x_{2}]\cdot [y_{1},y_{2}]=[\min\{x_{1}y_{1},x_{1}y_{2},x_{2}y_{1},x_{2}y_{2}\},\max\{x_{1}y_{1},x_{1}y_{2},x_{2}y_{1},x_{2}y_{2}\}]}$
4. Division:
${\displaystyle [x_{1},x_{2}] \div [y_{1},y_{2}] \equiv {\frac {[x_{1},x_{2}]}{[y_{1},y_{2}]}}=[x_{1},x_{2}]\cdot {\frac {1}{[y_{1},y_{2}]}},}$ where {\displaystyle {\begin{aligned}{\frac {1}{[y_{1},y_{2}]}}&=\left[{\tfrac {1}{y_{2}}},{\tfrac {1}{y_{1}}}\right]&&{\textrm {if}}\;0\notin [y_{1},y_{2}]\\{\frac {1}{[y_{1},0]}}&=\left[-\infty ,{\tfrac {1}{y_{1}}}\right]\\{\frac {1}{[0,y_{2}]}}&=\left[{\tfrac {1}{y_{2}}},\infty \right]\\{\frac {1}{[y_{1},y_{2}]}}&=\left[-\infty ,{\tfrac {1}{y_{1}}}\right]\cup \left[{\tfrac {1}{y_{2}}},\infty \right]\subseteq [-\infty ,\infty ]&&{\textrm {if}}\;0\in (y_{1},y_{2})\end{aligned}}}

It also possible to define interval operators for more complex operations such as trigonometric functions, exponentials and logarithms, roots, etc.

### Interval calculations

Interval arithmetic provides a structured approach to calculating the bounds of a complex expression by combining the bounds of its simpler components. This hierarchical computation is well-suited for a graph-based implementation, where an equation system is represented by a directed acyclic expression graph (DAEG). In this graph, each leaf node corresponds to a variable (or a constant), and each root node corresponds to an individual equation or constraint. The internal nodes represent intermediate terms in the equations/constraints and may be shared among multiple root nodes.

In our implementation, each node contains the physical expression itself and an interval [L, U]. Interval [L, U] indicates the computed bounds for the node. By using this structure, the desired hierarchical computation procedure can be achieved by simply performing a bottom-up evaluation of the graph, also known as a post-order traversal.

### Constraint Propagation

After executing a bottom-up traversal on the expression graph, we obtain interval bounds for each node, with the root node ultimately corresponding to the full join filter condition. With these intervals in place, we can then propagate constraints from parent nodes to child nodes via a subsequent top-down traversal, which enables us to shrink expression intervals when possible. In this top-down traversal, we use the following propagation rules:

• z = x + y → xX (Z - Y)  ∧ yY (Z - X)
• z = x - y → xX (Z + Y)  ∧ yY (X - Z)
• z = x × y → xX (Z ÷ Y)  ∧ yY (Z ÷ X)
• z = x ÷ y → xX (Z × Y)  ∧ yY (X ÷ Z)

where expressions x and y are children of the expression z; with  X, Y and Z respectively denoting the intervals of expressions x, y and z.

Basically, these rules enable us to propagate a constraint, placed on the result of an operation, to the arguments of the operation. Since we are in possession of the entire expression graph (i.e. the DAEG), we can use these rules to propagate each of our equality or inequality constraints (i.e. f(x) = 0, f(x) > 0 etc.) all the way down to the variables (i.e. columns).

To illustrate in the context of a concrete example, we revisit the example from Figure 4. Only this time, we analyze it from the constraint propagation perspective. Particularly, consider the equality condition x^2 + y = 4, where intervals of x and y are [1, 2] and [-3, 1] respectively. Assume that we have just finished our bottom-up evaluation of the DAEG. To propagate the mentioned equality constraint, we just perform an additional, top-down evaluation of the DAEG using the rules above. You can inspect the details of this procedure in Figure 5.

### Use case: Join filters

Having gone through the basics of interval calculations, let’s take a look at how we can make use of these techniques on join filters for the purposes of data pruning. Consider the following query:

SELECT * FROM left_t, right_t
WHERE
left_t_key = right_t_key AND
left_t.a > right_t.b - 3 AND
left_t.a < right_t.b + 10


This query will produce the following DAEG:

Every time we process a row (or a batch of rows depending on the batch size) within the SHJ algorithm, we need to compute viable ranges for a and b so that we can limit our buffer sizes. For this purpose, we will first evaluate the DAEG bottom-up, and then propagate the constraint that the logical conjunction is true from the root node top-down. Interested in even more details of how this happens? Read on.

# Buffer Pruning with Interval Arithmetic

Consider the following query:

SELECT * FROM left_t, right_t
WHERE
left_t_key = right_t_key AND
left_t.a > right_t.b - 3 AND
left_t.a < right_t.b + 10


When a new RecordBatch arrives at the right side, the condition a > b - 3 will possibly indicate a prunable range for the left side. Conversely, when a new RecordBatch arrives at the left side, the condition a < b + 10 will possibly indicate prunability for the right side. In general, every inequality condition will indicate prunability for one side of the join. As sliding window filters involve two inequalities with opposite directions, we can prune both sides as we consume the inputs. Note that a more stringent composite join condition works in favor of prunability; we get more opportunities for pruning if we have a conjunction of many sub-conditions.

Now, let’s go through a concrete example. Assuming that the left side is the build side (i.e. a new RecordBatch arrives at the right side), we can picture the process as follows:

How exactly do we use interval arithmetic for pruning here? This is done by first creating an interval for join filter values on the build side of the join, which spans [-, FV] or [FV, ] depending on the ordering (descending/ascending) of the filter expression. Here, FV denotes the first value on the build side. This range is then compared with the probe side interval, which either spans [-, LV] or [LV, ] depending on the ordering (ascending/descending) of the probe side. Here, LV denotes the last value on the probe side.

Going back to our concrete example,

• The interval for column a is [1, ∞] (as the first a value on the left is 1),
• The interval for column b is [6, ∞] (as the latest b value in the newly arriving batch is 6).

Then, we evaluate the join conditions using interval arithmetic, and propagate the constraint by traversing the expression graph top-down. In this specific example, only column a's range shrinks, and its new interval becomes [3, ∞] while the interval for column b does not change. Once these new intervals are computed, they are then used to determine which rows in the build side can be safely discarded without affecting the final join result. These rows are subsequently removed from the buffers since they can never match with any probe-side row in the future.

## Generating unmatched row results

### Context: Inner and Outer joins

Inner join is a type of join that returns only the records that have a match in both tables. This type of join is used when both tables have related data that needs to be matched between the two tables.

Left/right outer joins return all the records from one of the tables, even if there is no match in the other table. In other words, left (right) join returns all the records from the left (right) table and the matching records from the right (left) table (if any). There is also a full outer join, which returns all the records from both the tables, whether there is a match or not.

Consider two tables, Table1 and Table2, with the following data:

Table1:

a b
1 x
2 y
3 z

Table2:

a b
1 x
2 y
3 z

A query computing a left join of Table1 and Table2 on the a column looks like:

SELECT a, Table1.b, Table2.b
FROM Table1
LEFT JOIN Table2
ON Table1.a = Table2.a;


which would generate a result like this:

a Table1.b Table2.b
1 x a
2 y b
3 z NULL

As you can see, the result includes all the rows from Table1 and the matching rows from Table2. Note that the last row has a NULL value for the column Table2.b, because there was no matching row for it in Table2. For more examples, see the Wikipedia article explaining outer joins.

### Outer joins and infinite streams

When (outer) joining infinite streams, special care has to be taken when dealing with unmatched rows. If we are executing vanilla SHJ on data without ordering guarantees, a currently-unmatched row can always have a matching result in the future as new data comes in. However, when operating on data that lends itself to pruning, we can gracefully handle unmatched rows incrementally: Whenever we decide that a part of the inner buffer is not joinable in the future, we can check the expiring rows to see if they were matched before. If not, we can generate the necessary output consisting of any unmatched rows right there and then, allowing incremental processing.

# Performance analysis

Whenever join conditions and data ordering lends itself to pruning (e.g. sliding window filtering), we expect SHJ to exhibit higher performance than ordinary Hash Join, since it requires less memory. In many cases, SHJ can perform the join entirely in memory while ordinary Hash Join can not. This is certainly true for unbounded streams; but it is also possible with large, static data if join conditions and data ordering are favorable.

An intelligent query planning algorithm should inspect data ordering, nature of the join conditions, and join key cardinality when choosing between SHJ and ordinary HJ. This decision can also be analyzed within a cost-based optimization framework, which is topic we plan to discuss in the future.

## Experiment setup

When comparing the performance of SHJ and ordinary Hash Join algorithms in terms of execution time, there are three important parameters to consider: Batch size of the data, cardinality of the join key, and obviously the table size.

Batch size of the data refers to the number of rows we process in a single iteration of the join operation. A larger batch size can result in a more efficient join operation, as more data can be processed at once. However, a larger batch size can also decrease the number of prune operations, thus it may increase transient memory usage and reduce cache efficiency.

Cardinality of the join key refers to the number of unique values the join key can take. A lower cardinality can result in fewer hash table entries. However, since Datafusion’s current Hash Join implementation (called HashJoinExec) relies on a total join calculation before filtering, the intermediate result can be huge, increasing execution time and memory consumption dramatically.

Table size refers to the number of rows in each table involved in the join operation. A larger table size result in longer execution times, as there are more rows to process.

Considering these parameters in our study, we can get a relatively comprehensive view algorithmic performance for both SHJ and ordinary Hash Join algorithms.

To highlight the importance of build-side pruning, we will be using a query with a sliding window filter, which reads:

l.sorted_ascending + 1 > r.sorted_ascending + 5 AND
l.sorted_ascending + 3 < r.sorted_ascending + 10


The input tables are created by the following code:

let initial_range = 0..table_size;
let left_batch = build_record_batch(vec![
(
"key",
Arc::new(Int32Array::from_iter(
initial_range.clone().map(|_| rng.gen_range(0..cardinality)).collect::>(),
)),
),
(
"sorted_ascending",
Arc::new(Int32Array::from_iter(
initial_range.clone().map(|x| {
if rng.gen_bool(0.3) && x > 0 {x-1} else {x}
}).collect::>(),
)),
),

])?;
let right_batch = build_record_batch(vec![
(
"key",
Arc::new(Int32Array::from_iter(
initial_range.clone().map(|_| rng.gen_range(0..cardinality)).collect::>(),
)),
),
(
"sorted_ascending",
Arc::new(Int32Array::from_iter(
initial_range.clone().map(|x| {
if rng.gen_bool(0.3) && x > 0 {x-1} else {x}
}).collect::>(),
)),
),
// Low cardinality column
])?;


The parameter set is selected as

• Join type: Inner join,
• Batch size: 128, 512, 2048, 4096,
• Cardinality: 8 (Low), 64(Medium), 1024(High),
• Table size: 8192, 16384, 32768, 65536.

## Low cardinality

In this case, we are expecting HashJoinExec) to perform badly, since it is operating under very unfavorable conditions resulting in intermediate batch building and then filtering.

We can see that the efficient inner buffers of SymmetricHashJoinExec makes intermediate batch building much faster. We maintain a nearly constant-size buffer for this use case.

## Medium cardinality

In this case, we are expecting HashJoinExec to perform relatively better than the previous setting, since the intermediate batch size gets smaller. However, the superlinear nature of the operation remains.

Again, we get much higher performance from SymmetricHashJoinExec due to low buffer sizes.

## High cardinality

As cardinality increases, we get better results in terms of execution time from both algorithms.

However, the superlinear nature of HashJoinExec does not change, and the performance improvement persists.

Our initial implementation for Apache Datafusion does not include all of the potential benefits of the SHJ algorithm. The main features included in our initial PR are:

• An initial library for interval arithmetic, which includes basic arithmetic operations (addition and subtraction) and comparison operations (greater than and less than) for integer types, and supports the logical conjunction operator.
• An API for performing interval calculations, which can be used for other purposes, such as range pruning in Parquet. Within the context of this PR, we use this functionality to calculate filter expression bounds for pruning purposes.
• A constraint propagation module to construct expression DAGs from physical expression trees and update column bounds efficiently for data pruning purposes.
• An initial implementation of SHJ, which is limited to the partitioned mode and does not yet have full support for output order information.
• A new physical optimization sub rule for to utilize SHJ instead of ordinary Hash Join when joining two (unbounded) streams.

In order to have a PR with a manageable size, some features have been excluded for now, but will be added in the future. These include:

• Improved support for interval arithmetic, such as support for open/closed intervals, multiply/divide operations, additional comparison and logical operators, floating point numbers, and time intervals.
• Improved constant propagation, including the ability to determine monotonicity properties of complex physical expression trees.
• An improved SHJ algorithm, including support for collect left/right/all modes, intermediate buffers for complex expressions, and an improved output ordering flag.

# Parting words

When we first contemplated to write this blog post, we had two goals: (1) Announcing a cool algorithmic contribution to Apache Datafusion, and (2) creating a good resource for anyone who has an interest in stream joins. For these purposes, we dug deep into the Symmetric Hash Join algorithm, explaining how it works at a relatively accessible format. We also discussed how optimizations such as build-side pruning work, and showed why/how these optimizations result in better performance and lower resource utilization.

We believe that this new feature will be a valuable addition to Apache Datafusion, as it will improve join performance on large datasets and create new opportunities when working with unbounded stream data. If you have any questions, comments, or ideas for further improvements to either SHJ or Datafusion in general, feel free to reach out to us.