Implementing joins

Contents:
1. Background
2. Nested-loop joins
3. Two-phase multiway merge sort
4. Sort joins
5. Hash joins

1. Background

The following type of SQL query is quite common:

SELECT ______
FROM   R JOIN S ON R.a = S.b

This is called a join of R and S. Sometimes it's called an inner join to distinguish it from a left join and an outer join.

Inner joins are very common, but a straightforward implementation performs very badly in terms of how many disk accesses. So we'll examine several possible algorithms. To analyze the performance of each, we need to define some mathematical variables.

2. Nested-loop joins

Approach A: (This is the straightforward approach.)

for each tuple r in R:
    for each tuple s in S:
        if r.a == s.b:
            output rs

The number of disk accesses is T(R) + T(RT(S)

Approach B:

for each tuple r in R:
    for each block U in S:
        for each tuple s in U:
            if r.a == s.b:
                output rs

The number of disk accesses is T(R) + T(RB(S)

Approach C:

for each block T in R:
    for each block U in S:
        for each tuple r in T:
            for each tuple s in U:
                if r.a == s.b:
                    output rs

The number of disk accesses is B(R) + B(RB(S)

Approach D:

while R is not exhausted:
    load next M−1 blocks from R
    for each block U in S:
        for each tuple r in the M-1 blocks:
            for each tuple s in U:
                if r.a == s.b:
                    output rs

The number of disk accesses is B(R) + (B(R) / (M − 1)) B(S)

This is quite effective, and some simple DBMSs do very little more than this last approach. That said, it is problematic when the relations are quite large, since the amount of time is proportional to the product of what could be two very large numbers.

3. Two-phase multiway merge sort

Some of the other approaches to joining that we'll see will rely on sorting it according to the join attribute. If the relation is reasonably small, we can simply load the entire relation into memory and sort it. But if it is too large to fit into memory, we'll want to use an algorithm especially designed to minimize disk accesses.

Two-phase multiway merge sort (abbreviated 2PMMS) is designed for disk-based sorting.

  1. Phase 1: While R is not exhausted, load next M blocks from R into memory; we'll call each group of M blocks a “chunk.” Sort the chunk using an in-memory sort (such as quicksort), and then write the sorted chunk to disk.
  2. Phase 2: Load the first block from each chunk into memory. Commence merging these blocks into another memory block. When the destination block is filled, output it as a result of the sort and start with the block anew. When one of the source blocks is exhausted, load the next block from the corresponding chunk.

This only works if the number of chunks is at most M − 1, so that Phase 2 can fit the first block of each chunk into memory leaving a block left over for the result of the merge. Since each chunk has M blocks in it, the sort only works when the number of blocks overall is at most M² − M.

The number of disk accesses for a relation with B blocks will be 3 B: The first phase requires that we load each block once and then save each block, for a total of 2 B disk accesses. And the second phase will load B blocks; we don't count the output, since we may be able to avoid saving the result to disk.

4. Sorted joins

2PMMS leads to the following algorithm.

  1. Use 2PMMS to sort R, saving result to disk.
  2. Use 2PMMS to sort S.
  3. As the output of S comes out, “zip” it with the sorted R. That is, start from the beginning block of each relation, and repeatedly compare the current tuples in each relation. If a < b, then go to the next tuple of R; if a > b, then go to the next tuple of S; and if a = b, then find all such tuples with the same value for a or b and output all pairs of such tuples. Whenever we deplete the current block for either relation, we load the succeeding block.

This algorithm requires that each relation be small enough that 2PMMS would apply; thus, we need both B(R) and B(S) to be at most M² − M. Actually, B(S) would need to be at most M² − 2 M since we need a second block to be available for reading successive blocks from R.

Step 1 takes 4 B(R) disk accesses: 2PMMS requires 3 B(R) accesses, and then saving the result to disk takes another B(R) accesses. Similarly, step 2 takes 3 B(S) accesses. And step 3 reads through each block of R exactly once, for a further B(R) disk accesses. In sum, then, we have 5 B(R) + 3 B(S) disk accesses.

If R and S are a bit smaller, we can improve on this number of disk accesses essentially by doing combining Phase 2 of each 2PMMS with step 3 of the above algorithm.

  1. Perform first phase of 2PMMS on R, resulting in several sorted chunks saved to disk.
  2. Perform first phase of 2PMMS on S, resulting in several sorted chunks saved to disk.
  3. Load the first block of each of the chunks into memory, and “zip” all of these blocks together. That is, we determine which of the current tuples from each chunk has the least value for a or b; if there is only one such tuple with that value, then we essentially discard that tuple from further consideration, but if multiple ones with that value are found, we consider whether they are from different relations in which case we emit them as matched tuples for the join.

Step 1 takes 2 B(R) disk accesses, and step 2 takes 2 B(S) accesses. And step 3 reads through each block of each chunk of each relation exactly once, for B(R) + B(S) disk accesses. In sum, then, we have 3 B(R) + 3 B(S) disk accesses.

However, the constraints are stronger since now we can have at most M − 1 chunks across both relations together. So we need B(R) + B(S) to be at most M² − 1.

5. Hash join

  1. Load each successive block of R, hashing each tuple in the block into M − 1 in-memory buckets. When a bucket's block becomes full, flush it to disk and start on another in-memory block.
  2. Do the same for S, using the same hash function.
  3. For each of the M − 1 pairs of corresponding buckets from R and S, load the smaller of the two buckets into memory (we're assuming it fits into M − 1 blocks), sort it, and then go through each successive block from the corresponding bucket to join each tuple of the block with the appropriate tuples from the in-memory bucket.

Step 1 takes 2 B(R) disk accesses; step 2 takes 2 B(S) disk accesses; and step 3 takes B(R) + B(S) disk accesses. Overall, then, this takes 3 B(R) + 3 B(S) accesses.

This is the same as our faster sorted-join algorithm. But the necessary assumptions are weaker: Each pair of buckets needs just one of the two buckets to fit into memory. If R has (M − 1)² blocks and our hash function works well, then we'd expect the average bucket to have M − 1 blocks; of course, some would be larger, but if R is substantially smaller (say (M − 1)² / (2 ln² M)), then likely all of R's buckets would have at most M − 1 blocks. If this happens, then the join would work even if S is huge. (Obviously, the join would also work if R is huge but S meets this constraint.)