hashjoin 源码
时间: 2023-07-27 16:06:29 浏览: 144
由于 hash join 算法实现涉及到具体的数据库系统,不同数据库系统的 hash join 实现也会有所不同。这里以 PostgreSQL 数据库为例,简单介绍一下其 hash join 的实现。
在 PostgreSQL 中,hash join 算法主要分为两个阶段:构建哈希表和使用哈希表进行连接操作。
构建哈希表的过程中,首先会根据连接条件,从被连接关系中选择一个关系作为内部关系,并以内部关系的 join key 为基准进行哈希,将哈希值相同的元组放到一个桶中。这样,内部关系中的所有元组就被分配到了不同的桶中。接着,遍历外部关系中的元组,对于每个元组,都会计算其 join key 的哈希值,并在哈希表中查找是否有和该元组 join key 哈希值相同的桶。如果有,则将该元组与桶中的所有元组进行比较,找到符合连接条件的元组组合。如果没有,则该元组与内部关系中所有元组都没有匹配项,会被过滤掉。
构建好哈希表后,就可以使用哈希表进行连接操作了。遍历外部关系中的元组,对于每个元组,都计算其 join key 的哈希值,并在哈希表中查找是否有和该元组 join key 哈希值相同的桶。如果有,则将该元组与桶中的所有元组进行比较,找到符合连接条件的元组组合。如果没有,则该元组与内部关系中所有元组都没有匹配项,会被过滤掉。
以上就是 PostgreSQL 中哈希连接的实现过程,具体的源码实现可以参考 PostgreSQL 的源码。
相关问题
hash join源码
Hash Join 是一种常见的关系型数据库中的 Join 算法,用于将两个表按照连接条件进行 Join 操作。下面是一个简单的 Hash Join 的源代码示例:
```c
/* 定义 hash 表结构体 */
typedef struct hash_entry {
int key;
int value;
struct hash_entry* next;
} hash_entry;
/* 定义 hash 表 */
typedef struct hash_table {
int size; /* hash 表长度 */
hash_entry** table; /* hash 表指针数组 */
} hash_table;
/* 创建 hash 表 */
hash_table* create_hash_table(int size) {
hash_table* table = (hash_table*) malloc(sizeof(hash_table));
table->size = size;
table->table = (hash_entry**) calloc(size, sizeof(hash_entry*));
return table;
}
/* 计算 hash 值 */
int hash(int key, int size) {
return key % size;
}
/* 向 hash 表中插入数据 */
void hash_insert(hash_table* table, int key, int value) {
int index = hash(key, table->size);
hash_entry* entry = table->table[index];
while (entry != NULL) {
if (entry->key == key) {
entry->value = value;
return;
}
entry = entry->next;
}
hash_entry* new_entry = (hash_entry*) malloc(sizeof(hash_entry));
new_entry->key = key;
new_entry->value = value;
new_entry->next = table->table[index];
table->table[index] = new_entry;
}
/* 从 hash 表中查找数据 */
int hash_lookup(hash_table* table, int key) {
int index = hash(key, table->size);
hash_entry* entry = table->table[index];
while (entry != NULL) {
if (entry->key == key) {
return entry->value;
}
entry = entry->next;
}
return -1;
}
/* Hash Join 算法 */
void hash_join(int* R, int* S, int n, int m) {
/* 创建 hash 表 */
hash_table* table = create_hash_table(n);
/* 构建 R 表的 hash 表 */
for (int i = 0; i < n; i++) {
hash_insert(table, R[i], i);
}
/* 查询 S 表中与 R 表相同的元素 */
for (int i = 0; i < m; i++) {
int index = hash_lookup(table, S[i]);
if (index != -1) {
printf("(%d, %d)\n", index, i);
}
}
}
```
上述代码中,我们定义了一个 hash 表结构体和 hash 表操作函数,然后使用 hash 表实现了一个简单的 Hash Join 算法。其中,我们通过创建 hash 表来存储 R 表的所有元素,然后遍历 S 表的所有元素,查找在 hash 表中是否存在相同元素,如果存在,就输出这两个元素的索引。
hashjoin源码 sortmergejoin源码
这里提供 PostgreSQL 数据库中 hash join 和 sort merge join 实现的相关源码,仅供参考:
Hash Join 实现源码:
```c
/*
* ExecHashJoin
* Implements the hashjoin algorithm.
*
* Returns the join relation.
*
* Parallel version: we distribute the outer relation into a number of
* partitions with a hash function, and process each partition
* independently of the others. The inner relation is replicated to
* all workers, so that each can perform the join independently.
* This works best if the inner relation is smaller than the outer.
*/
static TupleTableSlot *
ExecHashJoin(PlanState *pstate)
{
HashJoinState *node = castNode(HashJoinState, pstate);
ExprContext *econtext = node->js.ps.ps_ExprContext;
TupleTableSlot *slot;
CHECK_FOR_INTERRUPTS();
/*
* If we're still building the hash table, do that, else fetch the current
* batch of outer tuples to probe the existing hash table.
*/
if (!node->hj_JoinState)
ExecBuildHashTable(node);
else
node->hj_OuterTupleSlot = ExecProcNode(outerPlanState(node));
/*
* Now loop, returning join tuples as we find them.
*/
for (;;)
{
CHECK_FOR_INTERRUPTS();
/*
* If we don't have an outer tuple, get the next one and reset our
* state machine for new tuple.
*/
if (TupIsNull(node->hj_OuterTupleSlot))
{
if (!ExecScanHashTableForUnmatched(node))
{
/* no more unmatched tuples */
return NULL;
}
/* Found unmatched outer, so compute its hash value */
ResetExprContext(econtext);
econtext->ecxt_outertuple = node->hj_OuterTupleSlot;
node->hj_CurHashValue = ExecHashGetHashValue(node->hj_HashTable,
econtext,
node->hj_OuterHashKeys);
node->hj_JoinState = HJ_NEED_NEW_OUTER;
/*
* Now we have an outer tuple and its hash value.
*/
}
/* inner loop over all matching inner tuples */
while (node->hj_JoinState != HJ_NEED_NEW_OUTER)
{
/* Fetch next tuple from inner side */
slot = ExecScanHashTable(node);
/* if there are no more inner tuples... */
if (TupIsNull(slot))
{
node->hj_JoinState = HJ_NEED_NEW_OUTER;
break; /* ... out of inner loop */
}
/* we have a new join tuple, return it */
econtext->ecxt_innertuple = slot;
return ExecProject(node->js.ps.ps_ProjInfo);
}
}
}
```
Sort Merge Join 实现源码:
```c
/*
* ExecSortMergeJoin
* Implements the sort/merge join algorithm.
*
* Returns the join relation.
*
* Parallel version: we distribute the outer relation into a number of
* partitions with a hash function, and sort the inner relation on the
* join key. We then perform the join independently for each partition,
* with each worker performing the merge join of its partition with the
* sorted inner relation.
*/
static TupleTableSlot *
ExecSortMergeJoin(PlanState *pstate)
{
SortMergeJoinState *node = castNode(SortMergeJoinState, pstate);
ExprContext *econtext = node->js.ps.ps_ExprContext;
TupleTableSlot *slot;
CHECK_FOR_INTERRUPTS();
/* First call? */
if (node->smj_JoinState == SMJ_STARTUP)
{
PlanState *outerNode;
PlanState *innerNode;
List *inInfo;
ListCell *l;
List *outInfo;
AttrNumber *match;
int nMatch;
/*
* We need to do some initialization for outer and inner nodes. Also,
* we figure out which join keys are being used, and build equality
* operators for them.
*/
outerNode = outerPlanState(node);
innerNode = innerPlanState(node);
inInfo = innerNode->plan->targetlist;
outInfo = outerNode->plan->targetlist;
nMatch = 0;
match = palloc(list_length(node->smj_MergingClauses) *
sizeof(AttrNumber));
foreach(l, node->smj_MergingClauses)
{
OpExpr *clause = (OpExpr *) lfirst(l);
Var *innerVar;
Var *outerVar;
Oid eqop;
/*
* Currently, only "simple" cross-type comparisons work. See
* comments in src/backend/utils/adt/genfile.c.
*/
if (!is_simple_eq_op(clause->opno))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("mergejoin operator must be a btree equality operator")));
innerVar = (Var *) get_leftop((Expr *) clause);
outerVar = (Var *) get_rightop((Expr *) clause);
/* We don't need to output these columns in the result */
innerVar->varno = INNER_VAR;
outerVar->varno = OUTER_VAR;
/*
* We may have to look up the operator___ in the opfamily to check that
* it is compatible with sorting.
*/
eqop = get_opfamily_member(clause->opfamily,
innerVar->vartype,
outerVar->vartype,
BTEqualStrategyNumber);
if (eqop == InvalidOid)
elog(ERROR, "no operator___ matching clause");
match[nMatch] = outInfo ? ExecFindMatchingJoinVar(outerVar, outInfo) :
ExecFindMatchingJoinVar(innerVar, inInfo);
nMatch++;
}
node->js.ps.ps_ExprContext->ecxt_per_tuple_memory =
node->smj_RuntimeContext;
ExecAssignExprContext(node->js.ps.ps_ExprContext,
outerNode->parent);
/*
* Initialize tuplesort state variables used in merging phase, and in
* state where we're reading inner relation.
*/
node->smj_OuterSkipQual =
ExecInitQual(node->js.ps.qual, outerNode);
node->smj_InnerSkipQual =
ExecInitQual(node->js.ps.qual, innerNode);
node->smj_MatchedOuter = false;
node->smj_MatchedInner = false;
node->smj_OuterTupleSlot = ExecProcNode(outerNode);
if (TupIsNull(node->smj_OuterTupleSlot))
{
/* empty outer relation */
node->smj_JoinState = SMJ_NEEDS_INNER;
return NULL;
}
node->smj_SortKeys =
ExecBuildSortKey(node, inInfo, outInfo, match, nMatch);
/* can't handle non-heap tuplesort methods here */
if (!node->smj_SortKeys->abbrev_converter && node->smj_PresortedKeys == NIL)
node->smj_SortStates[0] =
tuplesort_begin_merge(node->smj_SortKeys->sortFunction,
node->smj_WorkMem, node->ssps_TempFileSpaces,
node->smj_SortKeys->abbrev_full_comparator,
node);
else
node->smj_SortStates[0] =
tuplesort_begin_datum(node->smj_SortKeys->sortFunction,
node->smj_SortKeys->abbrev_converter,
node->smj_SortKeys->abbrev_full_comparator,
node->smj_WorkMem, node->ssps_TempFileSpaces,
node);
/*
* Begin scanning the inner relation. We'll read tuples in sorted
* order, so the main loop will be able to use a simple and fast
* algorithm for advancing the outer relation and resetting the inner
* scan.
*/
node->smj_JoinState = SMJ_NEEDS_INNER;
node->smj_MatchedOuter = false;
node->smj_MatchedInner = false;
/*
* Set up tuplestore and materialize the inner relation. We only need to
* materialize the inner relation if we are in a parallel plan.
*/
if (node->js.ps.plan->parallel_aware)
{
Assert(node->js.ps.ps_ExecProcNode == ExecSortMergeJoin);
node->smj_InnerTupleSlot = outerNode->ps_ResultTupleSlot;
/*
* If we are in a parallel plan, and if the inner side of this join
* was not fully gathered (because it was too large), then we must
* materialize the inner tuples before proceeding with the join.
*/
if (outerNode->ps_Flow->flotype == FLOW_REPLICATE &&
innerNode->ps_Flow->flotype == FLOW_PARTITIONED &&
!innerNode->ps_Flow->initialized)
{
Assert(innerNode->ps_ResultTupleSlot->tts_tupleDescriptor !=
NULL);
/* Create tuplestore to store the entire inner relation. */
node->ss.ps.ps_TupFromTlist = false;
node->ss.ps.ps_ProjInfo = NULL;
node->ss.ps.ps_ExprContext = node->js.ps.ps_ExprContext;
node->ss.ps.ps_TupSlot =
tuplestore_begin_heap(false, false, work_mem);
node->ss.ps.ps_ResultTupleSlot = node->smj_InnerTupleSlot;
node->ss.ps.ps_ProjInfo = NULL;
/* Materialize all inner tuples. */
while (!TupIsNull(slot = ExecProcNode(innerNode)))
{
tuplestore_puttupleslot(node->ss.ps.ps_TupSlot, slot);
}
/* Seek back to start of the materialized inner relation. */
tuplestore_rescan(node->ss.ps.ps_TupSlot);
}
else
{
/*
* If the inner side is fully gathered (i.e., if it is a
* shared-nothing table), then we can simply use the existing
* outer slot as the inner slot as well.
*/
node->smj_InnerTupleSlot = node->smj_OuterTupleSlot;
}
}
else
{
node->smj_InnerTupleSlot = ExecProcNode(innerNode);
/* if empty inner relation, advance to next outer tuple */
if (TupIsNull(node->smj_InnerTupleSlot))
node->smj_JoinState = SMJ_NEEDS_OUTER;
}
}
/*
* The main loop advances the outer scan, possibly reinitializing the
* inner scan, and checks for matches between outer tuples and inner
* tuples.
*/
for (;;)
{
CHECK_FOR_INTERRUPTS();
switch (node->smj_JoinState)
{
case SMJ_NEEDS_INNER:
/* Reset the inner scan. */
if (node->js.ps.plan->parallel_aware)
{
/*
* If we are in a parallel plan, and if the inner side of
* this join was not fully gathered (because it was too
* large), then we must read from the materialized inner
* relation that was created earlier. We have to switch to
* the other worker's partition if we've reached the end of
* our own. Otherwise, we can simply rescan the materialized
* inner relation.
*/
if (outerPlanState(node)->ps_Flow->flotype == FLOW_REPLICATE &&
innerPlanState(node)->ps_Flow->flotype == FLOW_PARTITIONED &&
!innerPlanState(node)->ps_Flow->initialized)
{
if (node->ss.ps.ps_TupSlot &&
!tuplestore_gettupleslot(node->ss.ps.ps_TupSlot, true,
false, node->smj_InnerTupleSlot))
{
/*
* We've reached the end of our own partition, but
* there may be more partitions. Advance to the
* next partition by updating our slice table entry
* and resetting the tuplestore so that we can read
* from the new partition. If there are no more
* partitions, we're done.
*/
if (!ExecParallelUpdatePartitionInfo(node, true))
{
node->smj_JoinState = SMJ_NEEDS_OUTER;
break;
}
tuplestore_clear(node->ss.ps.ps_TupSlot);
tuplestore_rescan(node->ss.ps.ps_TupSlot);
continue;
}
}
else
{
/*
* If the inner side is fully gathered (i.e., if it is
* a shared-nothing table), then we can simply rescan
* the existing outer slot as the inner slot as well.
*/
ExecClearTuple(node->smj_InnerTupleSlot);
tuplestore_rescan(node->ss.ps.ps_TupSlot);
}
}
else
{
/* advance inner scan */
ExecClearTuple(node->smj_InnerTupleSlot);
node->smj_InnerTupleSlot = ExecProcNode(innerPlanState(node));
}
if (TupIsNull(node->smj_InnerTupleSlot))
{
/* end of inner scan */
node->smj_JoinState = SMJ_NEEDS_OUTER;
break;
}
/*
* We know the new inner tuple is not distinct from the last one
* returned, so we update matched_inner accordingly.
*/
node->smj_MatchedInner = true;
/*
* Set up the state for matching tuples.
*/
ResetExprContext(econtext);
econtext->ecxt_innertuple = node->smj_InnerTupleSlot;
econtext->ecxt_outertuple = node->smj_OuterTupleSlot;
/* Skip non-matching tuples based on previously established
* skip qual */
if (node->smj_InnerSkipQual)
{
ExprState *qualexpr = node->smj_InnerSkipQual;
if (!ExecQual(qualexpr, econtext))
{
/* not matched */
continue;
}
}
/*
* Now we check the merge condition(s).
*/
if (ExecQualAndReset(node->smj_MergeClauses, econtext))
{
/* matched */
node->smj_JoinState = SMJ_JOINEDMATCHING;
return ExecProject(node->js.ps.ps_ProjInfo);
}
/*
* Not joined, so try next tuple from inner side.
*/
break;
case SMJ_JOINEDMATCHING:
case SMJ_JOINEDNONMATCHING:
/* Try to advance inner-side tuple */
ExecClearTuple(node->smj_InnerTupleSlot);
node->smj_InnerTupleSlot = ExecProcNode(innerPlanState(node));
if (TupIsNull(node->smj_InnerTupleSlot))
{
/* end of inner scan */
if (node->smj_JoinState == SMJ_JOINEDMATCHING)
{
node->smj_JoinState = SMJ_NEEDS_INNER;
node->smj_MatchedInner = false;
/* try to fetch next outer tuple */
ExecClearTuple(node->smj_OuterTupleSlot);
node->smj_OuterTupleSlot = ExecProcNode(outerPlanState(node));
if (TupIsNull(node->smj_OuterTupleSlot))
{
/* end of outer scan */
node->smj_JoinState = SMJ_NEEDS_INNER;
break;
}
}
else
{
node->smj_JoinState = SMJ_NEEDS_OUTER;
break;
}
}
node->smj_MatchedInner = false;
/*
* Set up the state for matching tuples.
*/
ResetExprContext(econtext);
econtext->ecxt_innertuple = node->smj_InnerTupleSlot;
econtext->ecxt_outertuple = node->smj_OuterTupleSlot;
/* Skip non-matching tuples based on previously established
* skip qual */
if (node->smj_InnerSkipQual)
{
ExprState *qualexpr = node->smj_InnerSkipQual;
if (!ExecQual(qualexpr, econtext))
{
/* not matched */
continue;
}
}
/*
* Now we check the merge condition(s).
*/
if (ExecQualAndReset(node->smj_MergeClauses, econtext))
{
/* matched */
node->smj_MatchedInner = true;
node->smj_JoinState = SMJ_JOINEDMATCHING;
return ExecProject(node->js.ps.ps_ProjInfo);
}
/*
* Not joined, so try again with next tuple from inner side.
*/
break;
case SMJ_NEEDS_OUTER:
/* Try to advance outer-side tuple */
ExecClearTuple(node->smj_OuterTupleSlot);
node->smj_OuterTupleSlot = ExecProcNode(outerPlanState(node));
if (TupIsNull(node->smj_OuterTupleSlot))
{
/* end of outer scan */
node->smj_JoinState = SMJ_NEEDS_INNER;
break;
}
/*
* New outer tuple; try to match it to first inner tuple.
*/
node->smj_JoinState = SMJ_FIRST_INNER;
/* FALL THRU */
case SMJ_FIRST_INNER:
/*
* We know the new outer tuple is not distinct from the last one
* returned, so we update matched_outer accordingly.
*/
node->smj_MatchedOuter = true;
/*
* Set up the state for matching tuples.
*/
ResetExprContext(econtext);
econtext->ecxt_innertuple = node->smj_InnerTupleSlot;
econtext->ecxt_outertuple = node->smj_OuterTupleSlot;
/* Skip non-matching tuples based on previously established
* skip qual */
if (node->smj_OuterSkipQual)
{
ExprState *qualexpr = node->smj_OuterSkipQual;
if (!ExecQual(qualexpr, econtext))
{
/* not
阅读全文