As the Lambda architecture that separates batch and stream processing systems gives way to the Kappa architecture which argues for a unified approach, a significant amount of attention is again focusing on stream joins in the data space. At Synnada, we strongly support the ideas and motivations behind the Kappa architecture, and we have been working hard on readying Apache Datafusion for this transformation.

Today, we are excited to announce the addition of Symmetric Hash Join to Apache Datafusion. This new feature will greatly enhance the capabilities of our query processing engine and give users even more powerful tools for querying, analyzing, and managing large datasets in real time.

Symmetric Hash Join will integrate into DataFusion's existing API seamlessly, fully compatible with Datafusion's existing static data sources and streams. Whether you have static data with cardinality constraints or streaming data, Symmetric Hash Join will help you process large datasets faster and more efficiently, enabling you to generate low-latency, data-driven results.

Our implementation of this feature includes an advanced optimization technique called build-side pruning, which allows the algorithm to detect early on when certain rows no longer matter to the join result and remove them from memory. This results in a significant performance boost and more efficient use of computational resources.

In this blog post, we'll take a closer look at the Symmetric Hash Join algorithm. We will explain how it works, its advantages, and the benefits of using it. We'll also show you how to use it in your Datafusion pipeline and provide tips and tricks for getting the most out of it.

We hope that this new addition will help you to achieve your big data goals more easily and with greater speed, and we welcome your feedback and questions.

To explain the Symmetric Hash Join (SHJ) algorithm, let’s first quickly remind ourselves how the ordinary Hash Join algorithm works. At a high-level, the latter works in a two-step fashion: It first consumes one of its inputs (the build side) to build a hash table, and then consults this hash table while consuming its other input (the probe side) to determine matches. The SHJ algorithm eliminates this build/probe side asymmetry by maintaining a hash table for each input, and consults these tables continuously as it consumes both its inputs.

The SHJ algorithm is typically useful when quick response times are needed, or when inputs are unbounded. It can be used in various types of data processing tasks, such as:

**Real-time streaming**: The SHJ algorithm can be used in real-time streaming scenarios to join data from multiple streams with low latency. For example, it can be used to join data from IoT devices with data from other sources, such as weather data.**Large datasets with low cardinality**: The SHJ algorithm can be used to join large datasets with low cardinality by exploiting sliding window join properties. When the low cardinality columns are used as keys in sliding window joins, a lot of unnecessary computational work is done when we first check the equality, making the join operation inefficient. This can lead to long processing times and high resource usage, making it difficult to meet low-latency processing requirements.

In a query processing engine, data flows in between different operators or stages within the engine, and each operator or stage performs a specific operation on the data. The query planning step is responsible for determining the optimal execution plan for the query, which simply means selecting the applicable operators, configuring them and connecting them in some optimal sense.

However, when processing streams, this is not as straightforward as it sounds, since not all operators are stream-friendly. Stream-unfriendly operators are often called *pipeline-breaking* operators, and we need to get the job done by utilizing only stream-friendly operators that allow our data streams to flow smoothly through the entire pipeline.

This means that none of our operators should block processing by waiting for a large amount of input data without producing any output, or by writing intermediate results to disk (if possible). While some operations (such as sorting) are inherently not compatible with this concept (and thus are pipeline-breaking), many use cases allow solutions involving only stream-friendly operators.

Are join operators pipeline-breaking? To answer this question, let’s imagine a scenario where we have static data on one side, and a stream on the other:

The current implementation of the ordinary Hash Join algorithm in Datafusion (and in many other engines) materializes the entire left side as the build side, constructing the hash table only once and allowing the right side to stream. The implementation also supports the converse case by simply swapping sides. Circling back to our question, we see that the answer is yes for the build side, and no for the probe side.

Obviously, materializing one side is not an option if both sides are infinite streams — enter the SHJ algorithm, which *partially* materializes both sides incrementally:

This makes SHJ stream-friendly for both sides. However, it still does not give us a carte-blanche to join two infinite streams in general. If we did that, the hash tables would simply grow over time and we would eventually run out of memory. Therefore, there needs to be a mechanism to limit the sizes of these tables if we were to ever use the SHJ algorithm on infinite streams.

Fortunately, a stream join is performed on a finite time window in many practical use cases, which gives us an avenue to limit sizes of these hash tables. We can employ SHJ so long as the inner batch buffer is flushed frequently in accordance with the finite sliding window, maintaining a compact hash table.

Obviously, if we have some other join filtering logic that limits table sizes instead of a sliding time window, we would need to flush according to that filtering logic. Therefore, to solve this task in a generic fashion, one needs a way to calculate bounds on arbitrary expressions when certain favorable conditions are present in the data stream (like increasing timestamps/column values).

- We use the join keys to look up the corresponding row on the other side’s inner buffer (hash table). If a match is found, the rows are joined and recorded as visited.
- We prune the rows that fall outside the range of possible join key values.

As long as we maintain a reasonably sized inner buffer for each side and there is some filtering that limits buffer sizes, infinite streams can be joined correctly just like static data.

In a typical stream processing library like Apache Flink or Apache Spark, the join operation can be performed using watermarks. Let's examine a query taken from the Apache Spark docstring:

In this query (), say each join side has a time column, named "left_time" and "right_time", and there is a join condition "left_time > right_time - 8 min". While processing, say the watermark on the right input is "12:34". This means that from henceforth, only right inputs rows with "right_time > 12:34" will be processed, and any older rows will be considered as "too late" and therefore dropped. Then, the left side buffer only needs to keep rows where "left_time > right_time - 8 min > 12:34 - 8m > 12:26". That is, the left state watermark is 12:26, and any rows older than that can be dropped from the state. In other words, the operator will discard all states where the timestamp in state value (input rows) < state watermark.

Actually, this is part of the picture, not the whole. In theory, range-based pruning can be done with any sorted field (not just the watermark field) and with any arbitrary join filter condition that is amenable to this type of data pruning. However, Apache Spark overfits to timestamps and associates the pruning operation with a watermark. Let’s follow a different approach and examine the following query from a more general, first-principles perspective:

This becomes very powerful when we consider unified batch and stream processing as range conditions can be used in a wide variety of use cases. Let’s examine a slightly more complex join:

Interval arithmetic is a numeric technique used by many successful global optimization and equation-solving algorithms. It extends ordinary arithmetic to interval values, similar to how ternary logic extends boolean logic. Following the rules of interval arithmetic, one can estimate the range of possible values an expression can take on in a single evaluation pass. Optimizers and solvers utilize this capability to eliminate infeasible regions of their search spaces early on so as to focus on viable regions. In the context of SHJ, we will use interval arithmetic to prune non-viable rows from our hash tables.

Interval arithmetic provides a set of mathematical operators that can be used to perform calculations on intervals. These operators include basic arithmetic operations such as addition, subtraction, multiplication, and division, as well as more advanced operations such as intersection, union, and set difference. One can compose these operators to calculate the range of possible values for a function in a specific region of the search space, facilitating rapid identification and elimination of infeasible regions of the search space. The elementary arithmetic operations are defined as:

It also possible to define interval operators for more complex operations such as trigonometric functions, exponentials and logarithms, roots, etc.

Interval arithmetic provides a structured approach to calculating the bounds of a complex expression by combining the bounds of its simpler components. This hierarchical computation is well-suited for a graph-based implementation, where an equation system is represented by a directed acyclic expression graph (DAEG). In this graph, each leaf node corresponds to a variable (or a constant), and each root node corresponds to an individual equation or constraint. The internal nodes represent intermediate terms in the equations/constraints and may be shared among multiple root nodes.

In our implementation, each node contains the physical expression itself and an interval [L, U]. Interval [L, U] indicates the computed bounds for the node. By using this structure, the desired hierarchical computation procedure can be achieved by simply performing a bottom-up evaluation of the graph, also known as a post-order traversal.

After executing a bottom-up traversal on the expression graph, we obtain interval bounds for each node, with the root node ultimately corresponding to the full join filter condition. With these intervals in place, we can then propagate constraints from parent nodes to child nodes via a subsequent top-down traversal, which enables us to shrink expression intervals when possible. In this top-down traversal, we use the following propagation rules:

*z = x*+*y → x*∈*X**∩*(-*Z*) ∧*Y**y*∈*Y**∩*(-*Z*)*X**z*=*x*-*y**→ x*∈*X**∩*(+*Z*) ∧*Y**y*∈*Y**∩*()*X - Z**z*=*x*×*y**→ x*∈*X**∩*(÷*Z*) ∧*Y**y*∈*Y**∩*(÷*Z*)*X**z*=*x*÷*y**→ x*∈*X**∩*(×*Z*) ∧*Y**y*∈*Y**∩*(÷*X*)*Z*

where expressions *x* and *y* are children of the expression *z*; with ** X**,

Basically, these rules enable us to propagate a constraint, placed on the result of an operation, to the arguments of the operation. Since we are in possession of the entire expression graph (i.e. the DAEG), we can use these rules to propagate each of our equality or inequality constraints (i.e. f(x) = 0, f(x) > 0 etc.) all the way down to the variables (i.e. columns).

Having gone through the basics of interval calculations, let’s take a look at how we can make use of these techniques on join filters for the purposes of data pruning. Consider the following query:

This query will produce the following DAEG:

Consider the following query:

How exactly do we use interval arithmetic for pruning here? This is done by first creating an interval for join filter values on the build side of the join, which spans [-**∞**, FV] or [FV, **∞**] depending on the ordering (descending/ascending) of the filter expression. Here, FV denotes the first value on the build side. This range is then compared with the probe side interval, which either spans [-**∞**, LV] or [LV, **∞**] depending on the ordering (ascending/descending) of the probe side. Here, LV denotes the last value on the probe side.

Going back to our concrete example,

Inner join is a type of join that returns only the records that have a match in both tables. This type of join is used when both tables have related data that needs to be matched between the two tables.

Left/right outer joins return all the records from one of the tables, even if there is no match in the other table. In other words, left (right) join returns all the records from the left (right) table and the matching records from the right (left) table (if any). There is also a full outer join, which returns all the records from both the tables, whether there is a match or not.

Table1:

Table2:

which would generate a result like this:

When (outer) joining infinite streams, special care has to be taken when dealing with unmatched rows. If we are executing vanilla SHJ on data without ordering guarantees, a currently-unmatched row can always have a matching result in the future as new data comes in. However, when operating on data that lends itself to pruning, we can gracefully handle unmatched rows incrementally: Whenever we decide that a part of the inner buffer is not joinable in the future, we can check the expiring rows to see if they were matched before. If not, we can generate the necessary output consisting of any unmatched rows right there and then, allowing incremental processing.

Whenever join conditions and data ordering lends itself to pruning (e.g. sliding window filtering), we expect SHJ to exhibit higher performance than ordinary Hash Join, since it requires less memory. In many cases, SHJ can perform the join entirely in memory while ordinary Hash Join can not. This is certainly true for unbounded streams; but it is also possible with large, static data if join conditions and data ordering are favorable.

An intelligent query planning algorithm should inspect data ordering, nature of the join conditions, and join key cardinality when choosing between SHJ and ordinary HJ. This decision can also be analyzed within a cost-based optimization framework, which is topic we plan to discuss in the future.

When comparing the performance of SHJ and ordinary Hash Join algorithms in terms of execution time, there are three important parameters to consider: Batch size of the data, cardinality of the join key, and obviously the table size.

Batch size of the data refers to the number of rows we process in a single iteration of the join operation. A larger batch size can result in a more efficient join operation, as more data can be processed at once. However, a larger batch size can also decrease the number of prune operations, thus it may increase transient memory usage and reduce cache efficiency.

Table size refers to the number of rows in each table involved in the join operation. A larger table size result in longer execution times, as there are more rows to process.

Considering these parameters in our study, we can get a relatively comprehensive view algorithmic performance for both SHJ and ordinary Hash Join algorithms.

To highlight the importance of build-side pruning, we will be using a query with a sliding window filter, which reads:

The input tables are created by the following code:

The parameter set is selected as

- Join type: Inner join,
- Batch size: 128, 512, 2048, 4096,
- Cardinality: 8 (Low), 64(Medium), 1024(High),
- Table size: 8192, 16384, 32768, 65536.

As cardinality increases, we get better results in terms of execution time from both algorithms.

Our initial implementation for Apache Datafusion does not include all of the potential benefits of the SHJ algorithm. The main features included in our initial PR are:

- An initial library for interval arithmetic, which includes basic arithmetic operations (addition and subtraction) and comparison operations (greater than and less than) for integer types, and supports the logical conjunction operator.
- An API for performing interval calculations, which can be used for other purposes, such as range pruning in Parquet. Within the context of this PR, we use this functionality to calculate filter expression bounds for pruning purposes.
- A constraint propagation module to construct expression DAGs from physical expression trees and update column bounds efficiently for data pruning purposes.
- An initial implementation of SHJ, which is limited to the partitioned mode and does not yet have full support for output order information.
- A new physical optimization sub rule for to utilize SHJ instead of ordinary Hash Join when joining two (unbounded) streams.

In order to have a PR with a manageable size, some features have been excluded for now, but will be added in the future. These include:

- Improved support for interval arithmetic, such as support for open/closed intervals, multiply/divide operations, additional comparison and logical operators, floating point numbers, and time intervals.
- Improved constant propagation, including the ability to determine monotonicity properties of complex physical expression trees.
- An improved SHJ algorithm, including support for collect left/right/all modes, intermediate buffers for complex expressions, and an improved output ordering flag.

When we first contemplated to write this blog post, we had two goals: (1) Announcing a cool algorithmic contribution to Apache Datafusion, and (2) creating a good resource for anyone who has an interest in stream joins. For these purposes, we dug deep into the Symmetric Hash Join algorithm, explaining how it works at a relatively accessible format. We also discussed how optimizations such as build-side pruning work, and showed why/how these optimizations result in better performance and lower resource utilization.

We believe that this new feature will be a valuable addition to Apache Datafusion, as it will improve join performance on large datasets and create new opportunities when working with unbounded stream data. If you have any questions, comments, or ideas for further improvements to either SHJ or Datafusion in general, feel free to reach out to us.