hashjoin源码 sortmergejoin源码
时间: 2023-07-05 09:07:11 浏览: 116
这里提供 PostgreSQL 数据库中 hash join 和 sort merge join 实现的相关源码,仅供参考:
Hash Join 实现源码:
```c
/*
* ExecHashJoin
* Implements the hashjoin algorithm.
*
* Returns the join relation.
*
* Parallel version: we distribute the outer relation into a number of
* partitions with a hash function, and process each partition
* independently of the others. The inner relation is replicated to
* all workers, so that each can perform the join independently.
* This works best if the inner relation is smaller than the outer.
*/
static TupleTableSlot *
ExecHashJoin(PlanState *pstate)
{
HashJoinState *node = castNode(HashJoinState, pstate);
ExprContext *econtext = node->js.ps.ps_ExprContext;
TupleTableSlot *slot;
CHECK_FOR_INTERRUPTS();
/*
* If we're still building the hash table, do that, else fetch the current
* batch of outer tuples to probe the existing hash table.
*/
if (!node->hj_JoinState)
ExecBuildHashTable(node);
else
node->hj_OuterTupleSlot = ExecProcNode(outerPlanState(node));
/*
* Now loop, returning join tuples as we find them.
*/
for (;;)
{
CHECK_FOR_INTERRUPTS();
/*
* If we don't have an outer tuple, get the next one and reset our
* state machine for new tuple.
*/
if (TupIsNull(node->hj_OuterTupleSlot))
{
if (!ExecScanHashTableForUnmatched(node))
{
/* no more unmatched tuples */
return NULL;
}
/* Found unmatched outer, so compute its hash value */
ResetExprContext(econtext);
econtext->ecxt_outertuple = node->hj_OuterTupleSlot;
node->hj_CurHashValue = ExecHashGetHashValue(node->hj_HashTable,
econtext,
node->hj_OuterHashKeys);
node->hj_JoinState = HJ_NEED_NEW_OUTER;
/*
* Now we have an outer tuple and its hash value.
*/
}
/* inner loop over all matching inner tuples */
while (node->hj_JoinState != HJ_NEED_NEW_OUTER)
{
/* Fetch next tuple from inner side */
slot = ExecScanHashTable(node);
/* if there are no more inner tuples... */
if (TupIsNull(slot))
{
node->hj_JoinState = HJ_NEED_NEW_OUTER;
break; /* ... out of inner loop */
}
/* we have a new join tuple, return it */
econtext->ecxt_innertuple = slot;
return ExecProject(node->js.ps.ps_ProjInfo);
}
}
}
```
Sort Merge Join 实现源码:
```c
/*
* ExecSortMergeJoin
* Implements the sort/merge join algorithm.
*
* Returns the join relation.
*
* Parallel version: we distribute the outer relation into a number of
* partitions with a hash function, and sort the inner relation on the
* join key. We then perform the join independently for each partition,
* with each worker performing the merge join of its partition with the
* sorted inner relation.
*/
static TupleTableSlot *
ExecSortMergeJoin(PlanState *pstate)
{
SortMergeJoinState *node = castNode(SortMergeJoinState, pstate);
ExprContext *econtext = node->js.ps.ps_ExprContext;
TupleTableSlot *slot;
CHECK_FOR_INTERRUPTS();
/* First call? */
if (node->smj_JoinState == SMJ_STARTUP)
{
PlanState *outerNode;
PlanState *innerNode;
List *inInfo;
ListCell *l;
List *outInfo;
AttrNumber *match;
int nMatch;
/*
* We need to do some initialization for outer and inner nodes. Also,
* we figure out which join keys are being used, and build equality
* operators for them.
*/
outerNode = outerPlanState(node);
innerNode = innerPlanState(node);
inInfo = innerNode->plan->targetlist;
outInfo = outerNode->plan->targetlist;
nMatch = 0;
match = palloc(list_length(node->smj_MergingClauses) *
sizeof(AttrNumber));
foreach(l, node->smj_MergingClauses)
{
OpExpr *clause = (OpExpr *) lfirst(l);
Var *innerVar;
Var *outerVar;
Oid eqop;
/*
* Currently, only "simple" cross-type comparisons work. See
* comments in src/backend/utils/adt/genfile.c.
*/
if (!is_simple_eq_op(clause->opno))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("mergejoin operator must be a btree equality operator")));
innerVar = (Var *) get_leftop((Expr *) clause);
outerVar = (Var *) get_rightop((Expr *) clause);
/* We don't need to output these columns in the result */
innerVar->varno = INNER_VAR;
outerVar->varno = OUTER_VAR;
/*
* We may have to look up the operator___ in the opfamily to check that
* it is compatible with sorting.
*/
eqop = get_opfamily_member(clause->opfamily,
innerVar->vartype,
outerVar->vartype,
BTEqualStrategyNumber);
if (eqop == InvalidOid)
elog(ERROR, "no operator___ matching clause");
match[nMatch] = outInfo ? ExecFindMatchingJoinVar(outerVar, outInfo) :
ExecFindMatchingJoinVar(innerVar, inInfo);
nMatch++;
}
node->js.ps.ps_ExprContext->ecxt_per_tuple_memory =
node->smj_RuntimeContext;
ExecAssignExprContext(node->js.ps.ps_ExprContext,
outerNode->parent);
/*
* Initialize tuplesort state variables used in merging phase, and in
* state where we're reading inner relation.
*/
node->smj_OuterSkipQual =
ExecInitQual(node->js.ps.qual, outerNode);
node->smj_InnerSkipQual =
ExecInitQual(node->js.ps.qual, innerNode);
node->smj_MatchedOuter = false;
node->smj_MatchedInner = false;
node->smj_OuterTupleSlot = ExecProcNode(outerNode);
if (TupIsNull(node->smj_OuterTupleSlot))
{
/* empty outer relation */
node->smj_JoinState = SMJ_NEEDS_INNER;
return NULL;
}
node->smj_SortKeys =
ExecBuildSortKey(node, inInfo, outInfo, match, nMatch);
/* can't handle non-heap tuplesort methods here */
if (!node->smj_SortKeys->abbrev_converter && node->smj_PresortedKeys == NIL)
node->smj_SortStates[0] =
tuplesort_begin_merge(node->smj_SortKeys->sortFunction,
node->smj_WorkMem, node->ssps_TempFileSpaces,
node->smj_SortKeys->abbrev_full_comparator,
node);
else
node->smj_SortStates[0] =
tuplesort_begin_datum(node->smj_SortKeys->sortFunction,
node->smj_SortKeys->abbrev_converter,
node->smj_SortKeys->abbrev_full_comparator,
node->smj_WorkMem, node->ssps_TempFileSpaces,
node);
/*
* Begin scanning the inner relation. We'll read tuples in sorted
* order, so the main loop will be able to use a simple and fast
* algorithm for advancing the outer relation and resetting the inner
* scan.
*/
node->smj_JoinState = SMJ_NEEDS_INNER;
node->smj_MatchedOuter = false;
node->smj_MatchedInner = false;
/*
* Set up tuplestore and materialize the inner relation. We only need to
* materialize the inner relation if we are in a parallel plan.
*/
if (node->js.ps.plan->parallel_aware)
{
Assert(node->js.ps.ps_ExecProcNode == ExecSortMergeJoin);
node->smj_InnerTupleSlot = outerNode->ps_ResultTupleSlot;
/*
* If we are in a parallel plan, and if the inner side of this join
* was not fully gathered (because it was too large), then we must
* materialize the inner tuples before proceeding with the join.
*/
if (outerNode->ps_Flow->flotype == FLOW_REPLICATE &&
innerNode->ps_Flow->flotype == FLOW_PARTITIONED &&
!innerNode->ps_Flow->initialized)
{
Assert(innerNode->ps_ResultTupleSlot->tts_tupleDescriptor !=
NULL);
/* Create tuplestore to store the entire inner relation. */
node->ss.ps.ps_TupFromTlist = false;
node->ss.ps.ps_ProjInfo = NULL;
node->ss.ps.ps_ExprContext = node->js.ps.ps_ExprContext;
node->ss.ps.ps_TupSlot =
tuplestore_begin_heap(false, false, work_mem);
node->ss.ps.ps_ResultTupleSlot = node->smj_InnerTupleSlot;
node->ss.ps.ps_ProjInfo = NULL;
/* Materialize all inner tuples. */
while (!TupIsNull(slot = ExecProcNode(innerNode)))
{
tuplestore_puttupleslot(node->ss.ps.ps_TupSlot, slot);
}
/* Seek back to start of the materialized inner relation. */
tuplestore_rescan(node->ss.ps.ps_TupSlot);
}
else
{
/*
* If the inner side is fully gathered (i.e., if it is a
* shared-nothing table), then we can simply use the existing
* outer slot as the inner slot as well.
*/
node->smj_InnerTupleSlot = node->smj_OuterTupleSlot;
}
}
else
{
node->smj_InnerTupleSlot = ExecProcNode(innerNode);
/* if empty inner relation, advance to next outer tuple */
if (TupIsNull(node->smj_InnerTupleSlot))
node->smj_JoinState = SMJ_NEEDS_OUTER;
}
}
/*
* The main loop advances the outer scan, possibly reinitializing the
* inner scan, and checks for matches between outer tuples and inner
* tuples.
*/
for (;;)
{
CHECK_FOR_INTERRUPTS();
switch (node->smj_JoinState)
{
case SMJ_NEEDS_INNER:
/* Reset the inner scan. */
if (node->js.ps.plan->parallel_aware)
{
/*
* If we are in a parallel plan, and if the inner side of
* this join was not fully gathered (because it was too
* large), then we must read from the materialized inner
* relation that was created earlier. We have to switch to
* the other worker's partition if we've reached the end of
* our own. Otherwise, we can simply rescan the materialized
* inner relation.
*/
if (outerPlanState(node)->ps_Flow->flotype == FLOW_REPLICATE &&
innerPlanState(node)->ps_Flow->flotype == FLOW_PARTITIONED &&
!innerPlanState(node)->ps_Flow->initialized)
{
if (node->ss.ps.ps_TupSlot &&
!tuplestore_gettupleslot(node->ss.ps.ps_TupSlot, true,
false, node->smj_InnerTupleSlot))
{
/*
* We've reached the end of our own partition, but
* there may be more partitions. Advance to the
* next partition by updating our slice table entry
* and resetting the tuplestore so that we can read
* from the new partition. If there are no more
* partitions, we're done.
*/
if (!ExecParallelUpdatePartitionInfo(node, true))
{
node->smj_JoinState = SMJ_NEEDS_OUTER;
break;
}
tuplestore_clear(node->ss.ps.ps_TupSlot);
tuplestore_rescan(node->ss.ps.ps_TupSlot);
continue;
}
}
else
{
/*
* If the inner side is fully gathered (i.e., if it is
* a shared-nothing table), then we can simply rescan
* the existing outer slot as the inner slot as well.
*/
ExecClearTuple(node->smj_InnerTupleSlot);
tuplestore_rescan(node->ss.ps.ps_TupSlot);
}
}
else
{
/* advance inner scan */
ExecClearTuple(node->smj_InnerTupleSlot);
node->smj_InnerTupleSlot = ExecProcNode(innerPlanState(node));
}
if (TupIsNull(node->smj_InnerTupleSlot))
{
/* end of inner scan */
node->smj_JoinState = SMJ_NEEDS_OUTER;
break;
}
/*
* We know the new inner tuple is not distinct from the last one
* returned, so we update matched_inner accordingly.
*/
node->smj_MatchedInner = true;
/*
* Set up the state for matching tuples.
*/
ResetExprContext(econtext);
econtext->ecxt_innertuple = node->smj_InnerTupleSlot;
econtext->ecxt_outertuple = node->smj_OuterTupleSlot;
/* Skip non-matching tuples based on previously established
* skip qual */
if (node->smj_InnerSkipQual)
{
ExprState *qualexpr = node->smj_InnerSkipQual;
if (!ExecQual(qualexpr, econtext))
{
/* not matched */
continue;
}
}
/*
* Now we check the merge condition(s).
*/
if (ExecQualAndReset(node->smj_MergeClauses, econtext))
{
/* matched */
node->smj_JoinState = SMJ_JOINEDMATCHING;
return ExecProject(node->js.ps.ps_ProjInfo);
}
/*
* Not joined, so try next tuple from inner side.
*/
break;
case SMJ_JOINEDMATCHING:
case SMJ_JOINEDNONMATCHING:
/* Try to advance inner-side tuple */
ExecClearTuple(node->smj_InnerTupleSlot);
node->smj_InnerTupleSlot = ExecProcNode(innerPlanState(node));
if (TupIsNull(node->smj_InnerTupleSlot))
{
/* end of inner scan */
if (node->smj_JoinState == SMJ_JOINEDMATCHING)
{
node->smj_JoinState = SMJ_NEEDS_INNER;
node->smj_MatchedInner = false;
/* try to fetch next outer tuple */
ExecClearTuple(node->smj_OuterTupleSlot);
node->smj_OuterTupleSlot = ExecProcNode(outerPlanState(node));
if (TupIsNull(node->smj_OuterTupleSlot))
{
/* end of outer scan */
node->smj_JoinState = SMJ_NEEDS_INNER;
break;
}
}
else
{
node->smj_JoinState = SMJ_NEEDS_OUTER;
break;
}
}
node->smj_MatchedInner = false;
/*
* Set up the state for matching tuples.
*/
ResetExprContext(econtext);
econtext->ecxt_innertuple = node->smj_InnerTupleSlot;
econtext->ecxt_outertuple = node->smj_OuterTupleSlot;
/* Skip non-matching tuples based on previously established
* skip qual */
if (node->smj_InnerSkipQual)
{
ExprState *qualexpr = node->smj_InnerSkipQual;
if (!ExecQual(qualexpr, econtext))
{
/* not matched */
continue;
}
}
/*
* Now we check the merge condition(s).
*/
if (ExecQualAndReset(node->smj_MergeClauses, econtext))
{
/* matched */
node->smj_MatchedInner = true;
node->smj_JoinState = SMJ_JOINEDMATCHING;
return ExecProject(node->js.ps.ps_ProjInfo);
}
/*
* Not joined, so try again with next tuple from inner side.
*/
break;
case SMJ_NEEDS_OUTER:
/* Try to advance outer-side tuple */
ExecClearTuple(node->smj_OuterTupleSlot);
node->smj_OuterTupleSlot = ExecProcNode(outerPlanState(node));
if (TupIsNull(node->smj_OuterTupleSlot))
{
/* end of outer scan */
node->smj_JoinState = SMJ_NEEDS_INNER;
break;
}
/*
* New outer tuple; try to match it to first inner tuple.
*/
node->smj_JoinState = SMJ_FIRST_INNER;
/* FALL THRU */
case SMJ_FIRST_INNER:
/*
* We know the new outer tuple is not distinct from the last one
* returned, so we update matched_outer accordingly.
*/
node->smj_MatchedOuter = true;
/*
* Set up the state for matching tuples.
*/
ResetExprContext(econtext);
econtext->ecxt_innertuple = node->smj_InnerTupleSlot;
econtext->ecxt_outertuple = node->smj_OuterTupleSlot;
/* Skip non-matching tuples based on previously established
* skip qual */
if (node->smj_OuterSkipQual)
{
ExprState *qualexpr = node->smj_OuterSkipQual;
if (!ExecQual(qualexpr, econtext))
{
/* not
阅读全文