nodeHashjoin.c 40.5 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * nodeHashjoin.c
4
 *	  Routines to handle hash join nodes
5
 *
6
 * Portions Copyright (c) 2005-2008, Greenplum inc
7
 * Portions Copyright (c) 2012-Present Pivotal Software, Inc.
8
 * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
B
Add:  
Bruce Momjian 已提交
9
 * Portions Copyright (c) 1994, Regents of the University of California
10 11 12
 *
 *
 * IDENTIFICATION
13
 *	  $PostgreSQL: pgsql/src/backend/executor/nodeHashjoin.c,v 1.93 2008/01/01 19:45:49 momjian Exp $
14 15 16
 *
 *-------------------------------------------------------------------------
 */
17

B
Bruce Momjian 已提交
18
#include "postgres.h"
19 20

#include "executor/executor.h"
21
#include "executor/hashjoin.h"
22
#include "executor/instrument.h"	/* Instrumentation */
23 24
#include "executor/nodeHash.h"
#include "executor/nodeHashjoin.h"
25
#include "utils/faultinjector.h"
26 27
#include "utils/memutils.h"

28
#include "cdb/cdbvars.h"
29
#include "miscadmin.h"			/* work_mem */
30

31
static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
B
Bruce Momjian 已提交
32 33
						  HashJoinState *hjstate,
						  uint32 *hashvalue);
34 35
static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
						  HashJoinBatchSide *side,
36
						  uint32 *hashvalue,
B
Bruce Momjian 已提交
37
						  TupleTableSlot *tupleSlot);
38
static int	ExecHashJoinNewBatch(HashJoinState *hjstate);
39
static bool isNotDistinctJoin(List *qualList);
40

41 42
static void ReleaseHashTable(HashJoinState *node);
static bool isHashtableEmpty(HashJoinTable hashtable);
43

44
static void SpillCurrentBatch(HashJoinState *node);
45 46
static bool ExecHashJoinReloadHashTable(HashJoinState *hjstate);

47
/* ----------------------------------------------------------------
48
 *		ExecHashJoin
49
 *
50
 *		This function implements the Hybrid Hashjoin algorithm.
51 52 53
 *
 *		Note: the relation we build hash table on is the "inner"
 *			  the other one is "outer".
54 55
 * ----------------------------------------------------------------
 */
56
TupleTableSlot *				/* return: a tuple or NULL */
57
ExecHashJoin(HashJoinState *node)
58
{
59
	EState	   *estate;
60 61
	PlanState  *outerNode;
	HashState  *hashNode;
62 63
	List	   *joinqual;
	List	   *otherqual;
64
	TupleTableSlot *inntuple;
65 66
	ExprContext *econtext;
	HashJoinTable hashtable;
67
	HashJoinTuple curtuple;
68
	TupleTableSlot *outerTupleSlot;
69 70
	uint32		hashvalue;
	int			batchno;
71

72 73
	/*
	 * get information from HashJoin node
74
	 */
75 76 77 78 79
	estate = node->js.ps.state;
	joinqual = node->js.joinqual;
	otherqual = node->js.ps.qual;
	hashNode = (HashState *) innerPlanState(node);
	outerNode = outerPlanState(node);
80

81
	/*
82
	 * get information from HashJoin state
83
	 */
84 85
	hashtable = node->hj_HashTable;
	econtext = node->js.ps.ps_ExprContext;
86

87
	/*
B
Bruce Momjian 已提交
88 89 90
	 * If we're doing an IN join, we want to return at most one row per outer
	 * tuple; so we can stop scanning the inner scan if we matched on the
	 * previous try.
91
	 */
92
	if (node->js.jointype == JOIN_IN && node->hj_MatchedOuter)
93 94
		node->hj_NeedNewOuter = true;

95 96
	/*
	 * Reset per-tuple memory context to free any expression evaluation
B
Bruce Momjian 已提交
97 98
	 * storage allocated in the previous tuple cycle.  Note this can't happen
	 * until we're done projecting out tuples from a join tuple.
99 100 101
	 */
	ResetExprContext(econtext);

102 103 104
	/*
	 * if this is the first call, build the hash table for inner relation
	 */
105
	if (hashtable == NULL)
106 107
	{
		/*
108 109 110 111 112
		 * MPP-4165: My fix for MPP-3300 was correct in that we avoided
		 * the *deadlock* but had very unexpected (and painful)
		 * performance characteristics: we basically de-pipeline and
		 * de-parallelize execution of any query which has motion below
		 * us.
113
		 *
114 115
		 * So now prefetch_inner is set (see createplan.c) if we have *any* motion
		 * below us. If we don't have any motion, it doesn't matter.
116 117
		 *
		 * See motion_sanity_walker() for details on how a deadlock may occur.
118
		 */
119
		if (!node->prefetch_inner)
120
		{
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
			/*
			 * If the outer relation is completely empty, we can quit without
			 * building the hash table.  However, for an inner join it is only a
			 * win to check this when the outer relation's startup cost is less
			 * than the projected cost of building the hash table.	Otherwise it's
			 * best to build the hash table first and see if the inner relation is
			 * empty.  (When it's an outer join, we should always make this check,
			 * since we aren't going to be able to skip the join on the strength
			 * of an empty inner relation anyway.)
			 *
			 * If we are rescanning the join, we make use of information gained on
			 * the previous scan: don't bother to try the prefetch if the previous
			 * scan found the outer relation nonempty.	This is not 100% reliable
			 * since with new parameters the outer relation might yield different
			 * results, but it's a good heuristic.
			 *
			 * The only way to make the check is to try to fetch a tuple from the
			 * outer plan node.  If we succeed, we have to stash it away for later
			 * consumption by ExecHashJoinOuterGetTuple.
			 */
			if ((node->js.jointype == JOIN_LEFT) ||
					(node->js.jointype == JOIN_LASJ) ||
					(node->js.jointype == JOIN_LASJ_NOTIN) ||
					(outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
						!node->hj_OuterNotEmpty))
146
			{
147 148
				TupleTableSlot *slot;

149
				slot = ExecProcNode(outerNode);
150 151 152 153 154 155 156 157 158 159 160 161 162

				node->hj_FirstOuterTupleSlot = slot;
				if (TupIsNull(node->hj_FirstOuterTupleSlot))
				{
					node->hj_OuterNotEmpty = false;

					/* CDB: Tell inner subtree that its data will not be needed. */
					ExecSquelchNode((PlanState *)hashNode);

					return NULL;
				}
				else
					node->hj_OuterNotEmpty = true;
163 164
			}
			else
165
				node->hj_FirstOuterTupleSlot = NULL;
166
		}
167
		else
168 169 170
		{
			/* see MPP-989 comment above, for now we assume that we have
			 * at least one row on the outer. */
171
			node->hj_FirstOuterTupleSlot = NULL;
172 173
		}

174
		/*
175
		 * create the hash table
176
		 */
177 178 179
		hashtable = ExecHashTableCreate(hashNode,
										node,
										node->hj_HashOperators,
180
										PlanStateOperatorMemKB((PlanState *) hashNode));
181
		node->hj_HashTable = hashtable;
182

183 184 185 186 187
		/*
		 * CDB: Offer extra info for EXPLAIN ANALYZE.
		 */
		if (estate->es_instrument)
			ExecHashTableExplainInit(hashNode, node, hashtable);
188 189


190
		/*
191
		 * execute the Hash node, to build the hash table
192
		 */
193
		hashNode->hashtable = hashtable;
194

195
		/*
196 197 198 199 200
		 * Only if doing a LASJ_NOTIN join, we want to quit as soon as we find
		 * a NULL key on the inner side
		 */
		hashNode->hs_quit_if_hashkeys_null = (node->js.jointype == JOIN_LASJ_NOTIN);

201 202 203 204
		/*
		 * Store pointer to the HashJoinState in the hashtable, as we will
		 * need the HashJoin plan when creating the spill file set
		 */
205 206
		hashtable->hjstate = node;

207 208
		/* Execute the Hash node and build the hashtable */
		(void) MultiExecProcNode((PlanState *) hashNode);
209 210 211 212 213 214 215

#ifdef HJDEBUG
		elog(gp_workfile_caching_loglevel, "HashJoin built table with %.1f tuples by executing subplan for batch 0", hashtable->totalTuples);
#endif

		/**
		 * If LASJ_NOTIN and a null was found on the inner side, then clean out.
216
		 */
217 218 219
		if (node->js.jointype == JOIN_LASJ_NOTIN && hashNode->hs_hashkeys_null)
		{
			/*
220 221 222
			 * CDB: We'll read no more from outer subtree. To keep sibling QEs
			 * from being starved, tell source QEs not to clog up the pipeline
			 * with our never-to-be-consumed data.
223 224 225
			 */
			ExecSquelchNode(outerNode);
			/* end of join */
226 227 228

			ExecEagerFreeHashJoin(node);

229
			return NULL;
230
		}
231

232
		/*
233 234 235
		 * We just scanned the entire inner side and built the hashtable
		 * (and its overflow batches). Check here and remember if the inner
		 * side is empty.
236
		 */
237 238 239 240 241 242 243
		node->hj_InnerEmpty = isHashtableEmpty(hashtable);

		/*
		 * If the inner relation is completely empty, and we're not doing an
		 * outer join, we can quit without scanning the outer relation.
		 */
		if (node->js.jointype != JOIN_LEFT
244 245 246 247
			&& node->js.jointype != JOIN_LASJ
			&& node->js.jointype != JOIN_LASJ_NOTIN
			&& node->hj_InnerEmpty)
		{
248
			/*
249 250 251
			 * CDB: We'll read no more from outer subtree. To keep sibling QEs
			 * from being starved, tell source QEs not to clog up the pipeline
			 * with our never-to-be-consumed data.
252 253 254
			 */
			ExecSquelchNode(outerNode);
			/* end of join */
255 256 257

			ExecEagerFreeHashJoin(node);

258
			return NULL;
259
		}
260 261 262

		/*
		 * Reset OuterNotEmpty for scan.  (It's OK if we fetched a tuple
B
Bruce Momjian 已提交
263 264
		 * above, because ExecHashJoinOuterGetTuple will immediately set it
		 * again.)
265 266
		 */
		node->hj_OuterNotEmpty = false;
267
	}
268

269 270 271 272 273 274 275 276 277 278
	/* For a rescannable hash table we might need to reload batch 0 during rescan */
	if (hashtable->curbatch == -1 && !hashtable->first_pass)
	{
		hashtable->curbatch = 0;
		if (!ExecHashJoinReloadHashTable(node))
		{
			return NULL;
		}
	}

279
	/*
280
	 * run the hash join process
281
	 */
282
	for (;;)
283
	{
284 285
		/* We must never use an eagerly released hash table */
		Assert(!hashtable->eagerlyReleased);
286

287
		/*
288
		 * If we don't have an outer tuple, get the next one
289
		 */
290
		if (node->hj_NeedNewOuter)
291
		{
292 293 294
			outerTupleSlot = ExecHashJoinOuterGetTuple(outerNode,
													   node,
													   &hashvalue);
295
			if (TupIsNull(outerTupleSlot))
296 297
			{
				/* end of join */
298
				if (!node->reuse_hashtable)
H
Haisheng Yuan 已提交
299
					ExecEagerFreeHashJoin(node);
300

301 302 303
				return NULL;
			}

304
			Gpmon_Incr_Rows_In(GpmonPktFromHashJoinState(node));
305
			CheckSendPlanStateGpmonPkt(&node->js.ps);
306 307 308 309 310 311
			node->js.ps.ps_OuterTupleSlot = outerTupleSlot;
			econtext->ecxt_outertuple = outerTupleSlot;
			node->hj_NeedNewOuter = false;
			node->hj_MatchedOuter = false;

			/*
B
Bruce Momjian 已提交
312 313
			 * now we have an outer tuple, find the corresponding bucket for
			 * this tuple from the hash table
314 315 316 317 318 319 320
			 */
			node->hj_CurHashValue = hashvalue;
			ExecHashGetBucketAndBatch(hashtable, hashvalue,
									  &node->hj_CurBucketNo, &batchno);
			node->hj_CurTuple = NULL;

			/*
B
Bruce Momjian 已提交
321 322
			 * Now we've got an outer tuple and the corresponding hash bucket,
			 * but this tuple may not belong to the current batch.
323
			 */
324
			if (batchno != hashtable->curbatch)
325 326
			{
				/*
B
Bruce Momjian 已提交
327 328
				 * Need to postpone this outer tuple to a later batch. Save it
				 * in the corresponding outer-batch file.
329
				 */
330
				Assert(batchno != 0);
331
				Assert(batchno > hashtable->curbatch);
332
				ExecHashJoinSaveTuple(&node->js.ps, ExecFetchSlotMemTuple(outerTupleSlot, false),
333
									  hashvalue,
334
									  hashtable,
335
									  &hashtable->batches[batchno]->outerside,
336
									  hashtable->bfCxt);
337
				node->hj_NeedNewOuter = true;
B
Bruce Momjian 已提交
338
				continue;		/* loop around for a new outer tuple */
339
			}
340
		}
341

342 343
		/*
		 * OK, scan the selected hash bucket for matches
344
		 */
345
		for (;;)
346
		{
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361
			/*
			 * OPT-3325: Handle NULLs in the outer side of LASJ_NOTIN
			 *  - if tuple is NULL and inner is not empty, drop outer tuple
			 *  - if tuple is NULL and inner is empty, keep going as we'll
			 *    find no match for this tuple in the inner side
			 */
			if (node->js.jointype == JOIN_LASJ_NOTIN &&
					!node->hj_InnerEmpty &&
					isJoinExprNull(node->hj_OuterHashKeys,econtext))
			{
				node->hj_MatchedOuter = true;
				break;		/* loop around for a new outer tuple */
			}

			curtuple = ExecScanHashBucket(hashNode, node, econtext);
362 363
			if (curtuple == NULL)
				break;			/* out of matches */
B
Bruce Momjian 已提交
364

365
			/*
366
			 * we've got a match, but still need to test non-hashed quals
367
			 */
368
			inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(curtuple),
369 370
											 node->hj_HashTupleSlot,
											 false);	/* don't pfree */
371
			econtext->ecxt_innertuple = inntuple;
372

373
			/* reset temp memory each time to avoid leaks from qual expr */
374 375
			ResetExprContext(econtext);

376 377
			/*
			 * if we pass the qual, then save state for next call and have
B
Bruce Momjian 已提交
378 379
			 * ExecProject form the projection, store it in the tuple table,
			 * and return the slot.
380
			 *
B
Bruce Momjian 已提交
381 382
			 * Only the joinquals determine MatchedOuter status, but all quals
			 * must pass to actually return the tuple.
383
			 */
384
			if (joinqual == NIL || ExecQual(joinqual, econtext, false))
385
			{
386
				node->hj_MatchedOuter = true;
387

388 389
				/* In an antijoin, we never return a matched tuple */
				if (node->js.jointype == JOIN_LASJ || node->js.jointype == JOIN_LASJ_NOTIN)
390
				{
391 392 393
					node->hj_NeedNewOuter = true;
					break;		/* out of loop over hash bucket */
				}
394

395 396
				if (otherqual == NIL || ExecQual(otherqual, econtext, false))
				{
397
					Gpmon_Incr_Rows_Out(GpmonPktFromHashJoinState(node));
398 399
					CheckSendPlanStateGpmonPkt(&node->js.ps);
					return ExecProject(node->js.ps.ps_ProjInfo, NULL);
400
				}
401

B
Bruce Momjian 已提交
402
				/*
B
Bruce Momjian 已提交
403
				 * If we didn't return a tuple, may need to set NeedNewOuter
B
Bruce Momjian 已提交
404
				 */
405 406 407 408 409
				if (node->js.jointype == JOIN_IN)
				{
					node->hj_NeedNewOuter = true;
					break;		/* out of loop over hash bucket */
				}
410 411 412
			}
		}

413 414
		/*
		 * Now the current outer tuple has run out of matches, so check
B
Bruce Momjian 已提交
415 416
		 * whether to emit a dummy outer-join tuple. If not, loop around to
		 * get a new outer tuple.
417
		 */
418
		node->hj_NeedNewOuter = true;
419

420
		if (!node->hj_MatchedOuter &&
421
			(node->js.jointype == JOIN_LEFT ||
422 423
			 node->js.jointype == JOIN_LASJ ||
			 node->js.jointype == JOIN_LASJ_NOTIN))
424 425
		{
			/*
B
Bruce Momjian 已提交
426 427 428
			 * We are doing an outer join and there were no join matches for
			 * this outer tuple.  Generate a fake join tuple with nulls for
			 * the inner tuple, and return it if it passes the non-join quals.
429
			 */
430
			econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
431

432
			if (otherqual == NIL || ExecQual(otherqual, econtext, false))
433
			{
434
				Gpmon_Incr_Rows_Out(GpmonPktFromHashJoinState(node));
435 436
				CheckSendPlanStateGpmonPkt(&node->js.ps);
				return ExecProject(node->js.ps.ps_ProjInfo, NULL);
437 438
			}
		}
439
	}
440 441 442
}

/* ----------------------------------------------------------------
443
 *		ExecInitHashJoin
444
 *
445
 *		Init routine for HashJoin node.
446 447
 * ----------------------------------------------------------------
 */
448
HashJoinState *
449
ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
450
{
451 452 453
	HashJoinState *hjstate;
	Plan	   *outerNode;
	Hash	   *hashNode;
454 455
	List	   *lclauses;
	List	   *rclauses;
456
	List	   *hoperators;
457
	ListCell   *l;
458

459 460 461
	/* check for unsupported flags */
	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));

462
	/*
463 464 465
	 * create state structure
	 */
	hjstate = makeNode(HashJoinState);
466 467
	hjstate->js.ps.plan = (Plan *) node;
	hjstate->js.ps.state = estate;
468
	hjstate->reuse_hashtable = (eflags & EXEC_FLAG_REWIND) != 0;
469

470 471
	/*
	 * Miscellaneous initialization
472
	 *
473
	 * create expression context for node
474
	 */
475 476
	ExecAssignExprContext(estate, &hjstate->js.ps);

477
	if (node->hashqualclauses != NIL)
478 479
	{
		/* CDB: This must be an IS NOT DISTINCT join!  */
480
		Insist(isNotDistinctJoin(node->hashqualclauses));
481 482 483 484 485
		hjstate->hj_nonequijoin = true;
	}
	else
		hjstate->hj_nonequijoin = false;

486 487 488 489
	/*
	 * initialize child expressions
	 */
	hjstate->js.ps.targetlist = (List *)
490
		ExecInitExpr((Expr *) node->join.plan.targetlist,
491 492
					 (PlanState *) hjstate);
	hjstate->js.ps.qual = (List *)
493
		ExecInitExpr((Expr *) node->join.plan.qual,
494 495 496
					 (PlanState *) hjstate);
	hjstate->js.jointype = node->join.jointype;
	hjstate->js.joinqual = (List *)
497
		ExecInitExpr((Expr *) node->join.joinqual,
498 499
					 (PlanState *) hjstate);
	hjstate->hashclauses = (List *)
500
		ExecInitExpr((Expr *) node->hashclauses,
501
					 (PlanState *) hjstate);
502 503

	if (node->hashqualclauses != NIL)
504 505 506 507 508 509 510 511
	{
		hjstate->hashqualclauses = (List *)
			ExecInitExpr((Expr *) node->hashqualclauses,
						 (PlanState *) hjstate);
	}
	else
	{
		hjstate->hashqualclauses = hjstate->hashclauses;
512
	}
513

514 515 516 517
	/*
	 * MPP-3300, we only pre-build hashtable if we need to (this is relaxing
	 * the fix to MPP-989)
	 */
518
	hjstate->prefetch_inner = node->join.prefetch_inner;
519

520
	/*
521
	 * initialize child nodes
522 523 524 525
	 *
	 * Note: we could suppress the REWIND flag for the inner input, which
	 * would amount to betting that the hash will be a single batch.  Not
	 * clear if this would be a win or not.
526
	 */
527
	hashNode = (Hash *) innerPlan(node);
528
	outerNode = outerPlan(node);
529

530 531 532 533 534 535 536 537
	/* 
	 * XXX The following order are significant.  We init Hash first, then the outerNode
	 * this is the same order as we execute (in the sense of the first exec called).
	 * Until we have a better way to uncouple, share input needs this to be true.  If the
	 * order is wrong, when both hash and outer node have share input and (both ?) have 
	 * a subquery node, share input will fail because the estate of the nodes can not be
	 * set up correctly.
	 */
538
	innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate, eflags);
539 540 541
	((HashState *) innerPlanState(hjstate))->hs_keepnull = hjstate->hj_nonequijoin;

	outerPlanState(hjstate) = ExecInitNode(outerNode, estate, eflags);
542

543
#define HASHJOIN_NSLOTS 3
544 545 546

	/*
	 * tuple table initialization
547
	 */
548
	ExecInitResultTupleSlot(estate, &hjstate->js.ps);
549 550 551 552 553
	hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate);

	switch (node->join.jointype)
	{
		case JOIN_INNER:
554
		case JOIN_IN:
555 556
			break;
		case JOIN_LEFT:
557 558
		case JOIN_LASJ:
		case JOIN_LASJ_NOTIN:
559 560
			hjstate->hj_NullInnerTupleSlot =
				ExecInitNullTupleSlot(estate,
B
Bruce Momjian 已提交
561
								 ExecGetResultType(innerPlanState(hjstate)));
562 563
			break;
		default:
564
			elog(ERROR, "unrecognized join type: %d",
565 566 567
				 (int) node->join.jointype);
	}

568
	/*
B
Bruce Momjian 已提交
569 570 571 572 573
	 * now for some voodoo.  our temporary tuple slot is actually the result
	 * tuple slot of the Hash node (which is our inner plan).  we do this
	 * because Hash nodes don't return tuples via ExecProcNode() -- instead
	 * the hash join node uses ExecScanHashBucket() to get at the contents of
	 * the hash table.	-cim 6/9/91
574 575
	 */
	{
576 577
		HashState  *hashstate = (HashState *) innerPlanState(hjstate);
		TupleTableSlot *slot = hashstate->ps.ps_ResultTupleSlot;
578 579 580 581

		hjstate->hj_HashTupleSlot = slot;
	}

582 583
	/*
	 * initialize tuple type and projection info
584
	 */
585
	ExecAssignResultTypeFromTL(&hjstate->js.ps);
586
	ExecAssignProjectionInfo(&hjstate->js.ps, NULL);
587

588
	ExecSetSlotDescriptor(hjstate->hj_OuterTupleSlot,
589
						  ExecGetResultType(outerPlanState(hjstate)));
590

591 592
	/*
	 * initialize hash-specific info
593
	 */
594
	hjstate->hj_HashTable = NULL;
595
	hjstate->hj_FirstOuterTupleSlot = NULL;
596

597
	hjstate->hj_CurHashValue = 0;
598
	hjstate->hj_CurBucketNo = 0;
599
	hjstate->hj_CurTuple = NULL;
600 601

	/*
B
Bruce Momjian 已提交
602 603 604 605
	 * Deconstruct the hash clauses into outer and inner argument values, so
	 * that we can evaluate those subexpressions separately.  Also make a list
	 * of the hash operator OIDs, in preparation for looking up the hash
	 * functions to use.
606
	 */
607 608
	lclauses = NIL;
	rclauses = NIL;
609
	hoperators = NIL;
610
	foreach(l, hjstate->hashclauses)
611
	{
612
		FuncExprState *fstate = (FuncExprState *) lfirst(l);
613
		OpExpr	   *hclause;
614

615 616
		Assert(IsA(fstate, FuncExprState));
		hclause = (OpExpr *) fstate->xprstate.expr;
617
		Assert(IsA(hclause, OpExpr));
618
		lclauses = lappend(lclauses, linitial(fstate->args));
619
		rclauses = lappend(rclauses, lsecond(fstate->args));
620
		hoperators = lappend_oid(hoperators, hclause->opno);
621
	}
622 623
	hjstate->hj_OuterHashKeys = lclauses;
	hjstate->hj_InnerHashKeys = rclauses;
624
	hjstate->hj_HashOperators = hoperators;
625 626
	/* child Hash node needs to evaluate inner hash keys, too */
	((HashState *) innerPlanState(hjstate))->hashkeys = rclauses;
627

628
	hjstate->js.ps.ps_OuterTupleSlot = NULL;
629 630
	hjstate->hj_NeedNewOuter = true;
	hjstate->hj_MatchedOuter = false;
631
	hjstate->hj_OuterNotEmpty = false;
632

633 634
	initGpmonPktForHashJoin((Plan *) node, &hjstate->js.ps.gpmon_pkt, estate);

635
	return hjstate;
636 637 638
}

int
639
ExecCountSlotsHashJoin(HashJoin *node)
640
{
641
	return ExecCountSlotsNode(outerPlan(node)) +
642 643
		ExecCountSlotsNode(innerPlan(node)) +
		HASHJOIN_NSLOTS;
644 645 646
}

/* ----------------------------------------------------------------
647
 *		ExecEndHashJoin
648
 *
649
 *		clean up routine for HashJoin node
650 651 652
 * ----------------------------------------------------------------
 */
void
653
ExecEndHashJoin(HashJoinState *node)
654
{
655
	/*
656
	 * Free hash table
657
	 */
658
	if (node->hj_HashTable)
659
	{
660 661
		if (!node->hj_HashTable->eagerlyReleased)
		{
662 663
			HashState  *hashState = (HashState *) innerPlanState(node);

664 665 666
			ExecHashTableDestroy(hashState, node->hj_HashTable);
		}
		pfree(node->hj_HashTable);
667
		node->hj_HashTable = NULL;
668 669
	}

670
	/*
671
	 * Free the exprcontext
672
	 */
673
	ExecFreeExprContext(&node->js.ps);
674

675 676
	/*
	 * clean out the tuple table
677
	 */
678 679 680
	ExecClearTuple(node->js.ps.ps_ResultTupleSlot);
	ExecClearTuple(node->hj_OuterTupleSlot);
	ExecClearTuple(node->hj_HashTupleSlot);
681

682 683 684 685 686
	/*
	 * clean up subtrees
	 */
	ExecEndNode(outerPlanState(node));
	ExecEndNode(innerPlanState(node));
687 688

	EndPlanStateGpmonPkt(&node->js.ps);
689 690
}

691 692
/*
 * ExecHashJoinOuterGetTuple
693
 *
694
 *		get the next outer tuple for hashjoin: either by
695 696 697 698 699 700
 *		executing a plan node in the first pass, or from
 *		the temp files for the hashjoin batches.
 *
 * Returns a null slot if no more outer tuples.  On success, the tuple's
 * hash value is stored at *hashvalue --- this is either originally computed,
 * or re-read from the temp file.
701 702
 */
static TupleTableSlot *
703
ExecHashJoinOuterGetTuple(PlanState *outerNode,
704 705
						  HashJoinState *hjstate,
						  uint32 *hashvalue)
706
{
B
Bruce Momjian 已提交
707 708
	HashJoinTable hashtable = hjstate->hj_HashTable;
	int			curbatch = hashtable->curbatch;
709
	TupleTableSlot *slot;
710 711
	ExprContext *econtext;
	HashState  *hashState = (HashState *) innerPlanState(hjstate);
B
Bruce Momjian 已提交
712

713 714
	/* Read tuples from outer relation only if it's the first batch */
	if (curbatch == 0)
715 716
	{
		for (;;)
717 718
		{
			/*
719 720
			 * Check to see if first outer tuple was already fetched by
			 * ExecHashJoin() and not used yet.
721
			 */
722 723 724 725 726
			slot = hjstate->hj_FirstOuterTupleSlot;
			if (!TupIsNull(slot))
				hjstate->hj_FirstOuterTupleSlot = NULL;
			else
			{
727
				slot = ExecProcNode(outerNode);
728 729 730 731
			}

			if (TupIsNull(slot))
				break;
732

733 734 735 736
			/*
			 * We have to compute the tuple's hash value.
			 */
			econtext = hjstate->js.ps.ps_ExprContext;
737 738
			econtext->ecxt_outertuple = slot;

739 740 741 742 743 744
			bool hashkeys_null = false;
			bool keep_nulls = (hjstate->js.jointype == JOIN_LEFT) ||
					(hjstate->js.jointype == JOIN_LASJ) ||
					(hjstate->js.jointype == JOIN_LASJ_NOTIN) ||
					hjstate->hj_nonequijoin;
			if (ExecHashGetHashValue(hashState, hashtable, econtext,
745
									 hjstate->hj_OuterHashKeys,
B
Bruce Momjian 已提交
746
									 true,		/* outer tuple */
747 748 749
									 keep_nulls,
									 hashvalue,
									 &hashkeys_null))
750 751 752
			{
				/* remember outer relation is not empty for possible rescan */
				hjstate->hj_OuterNotEmpty = true;
753

754 755
				return slot;
			}
756

757
			/*
B
Bruce Momjian 已提交
758 759
			 * That tuple couldn't match because of a NULL, so discard it and
			 * continue with the next one.
760
			 */
761
		}
762

763
		/*
B
Bruce Momjian 已提交
764 765
		 * We have just reached the end of the first pass. Try to switch to a
		 * saved batch.
766
		 */
767 768

		/* SFR: This can cause re-spill! */
769
		curbatch = ExecHashJoinNewBatch(hjstate);
770 771 772 773 774 775


#ifdef HJDEBUG
		elog(gp_workfile_caching_loglevel, "HashJoin built table with %.1f tuples for batch %d", hashtable->totalTuples, curbatch);
#endif

776
		Gpmon_Incr_Rows_Out(GpmonPktFromHashJoinState(hjstate));
777 778
		CheckSendPlanStateGpmonPkt(&hjstate->js.ps);
	} /* if (curbatch == 0) */
779 780

	/*
B
Bruce Momjian 已提交
781 782 783
	 * Try to read from a temp file. Loop allows us to advance to new batches
	 * as needed.  NOTE: nbatch could increase inside ExecHashJoinNewBatch, so
	 * don't try to optimize this loop.
784
	 */
785
	while (curbatch < hashtable->nbatch)
786
	{
787
		/*
788
		 * For batches > 0, we can be reading many many outer tuples from disk
789 790 791
		 * and probing them against the hashtable. If we don't find any
		 * matches, we'll keep coming back here to read tuples from disk and
		 * returning them (MPP-23213). Break this long tight loop here.
792 793 794 795 796 797
		 */
		CHECK_FOR_INTERRUPTS();

		if (QueryFinishPending)
			return NULL;

798 799
		slot = ExecHashJoinGetSavedTuple(hjstate,
										 &hashtable->batches[curbatch]->outerside,
800
										 hashvalue,
801
										 hjstate->hj_OuterTupleSlot);
B
Bruce Momjian 已提交
802
		if (!TupIsNull(slot))
803 804
			return slot;
		curbatch = ExecHashJoinNewBatch(hjstate);
805 806 807 808 809 810 811 812

#ifdef HJDEBUG
		elog(gp_workfile_caching_loglevel, "HashJoin built table with %.1f tuples for batch %d", hashtable->totalTuples, curbatch);
#endif

		CheckSendPlanStateGpmonPkt(&hjstate->js.ps);
	}

813 814
	/* Out of batches... */
	return NULL;
815 816
}

817 818
/*
 * ExecHashJoinNewBatch
819
 *		switch to a new hashjoin batch
820 821 822
 *
 * Returns the number of the new batch (1..nbatch-1), or nbatch if no more.
 * We will never return a batch number that has an empty outer batch file.
823
 */
824
static int
825
ExecHashJoinNewBatch(HashJoinState *hjstate)
826
{
827
	HashJoinTable hashtable = hjstate->hj_HashTable;
828
	HashJoinBatchData *batch;
829 830
	int			nbatch;
	int			curbatch;
831

832
	SIMPLE_FAULT_INJECTOR(FaultExecHashJoinNewBatch);
833

834
	HashState  *hashState = (HashState *) innerPlanState(hjstate);
835

836 837 838 839
start_over:
	nbatch = hashtable->nbatch;
	curbatch = hashtable->curbatch;

840 841
	if (curbatch >= nbatch)
		return nbatch;
842

843 844
	if (curbatch >= 0 && hashtable->stats)
		ExecHashTableExplainBatchEnd(hashState, hashtable);
845

846
	if (curbatch > 0)
847 848
	{
		/*
B
Bruce Momjian 已提交
849 850
		 * We no longer need the previous outer batch file; close it right
		 * away to free disk space.
851
		 */
852 853
		batch = hashtable->batches[curbatch];
		if (batch->outerside.workfile != NULL)
854
		{
855
			workfile_mgr_close_file(hashtable->work_set, batch->outerside.workfile);
856
		}
857
		batch->outerside.workfile = NULL;
858 859
	}

860
	/*
861 862 863 864 865 866 867 868 869 870 871
	 * If we want to keep the hash table around, for re-scan, then write
	 * the current batch's state to disk before moving to the next one.
	 * It's possible that we increase the number of batches later, so that
	 * by the time we reload this file, some of the tuples we wrote here
	 * will logically belong to a later file. ExecHashJoinReloadHashTable
	 * will move such tuples when the file is reloaded.
	 *
	 * If we have already re-scanned, we might still have the old file
	 * around, in which case there's no need to write it again.
	 * XXX: Currently, we actually always re-create it, see comments in
	 * ExecHashJoinReloadHashTable.
872
	 */
873 874
	if (nbatch > 1 && hjstate->reuse_hashtable &&
		hashtable->batches[curbatch]->innerside.workfile == NULL)
875
	{
876
		SpillCurrentBatch(hjstate);
877 878
	}

879
	/*
880 881 882 883
	 * We can always skip over any batches that are completely empty on both
	 * sides.  We can sometimes skip over batches that are empty on only one
	 * side, but there are exceptions:
	 *
B
Bruce Momjian 已提交
884 885
	 * 1. In a LEFT JOIN, we have to process outer batches even if the inner
	 * batch is empty.
886
	 *
887 888
	 * 2. If we have increased nbatch since the initial estimate, we have to
	 * scan inner batches since they might contain tuples that need to be
B
Bruce Momjian 已提交
889
	 * reassigned to later inner batches.
890
	 *
891 892 893
	 * 3. Similarly, if we have increased nbatch since starting the outer
	 * scan, we have to rescan outer batches in case they contain tuples that
	 * need to be reassigned.
894
	 */
895 896
	curbatch++;
	while (curbatch < nbatch &&
897 898 899
		   (hashtable->batches[curbatch]->outerside.workfile == NULL ||
			hashtable->batches[curbatch]->innerside.workfile == NULL))

900
	{
901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929
		/*
		 * For rescannable we must complete respilling on first batch
		 *
		 * Consider case 2: the inner workfile is not null. We are on the first pass
		 * (before ReScan was called). I.e., we are processing a join for the base
		 * case of a recursive CTE. If the base case does not have tuples for batch
		 * k (i.e., the outer workfile for batch k is null), and we never increased
		 * the initial number of batches, then we will skip the inner batchfile (case 2).
		 *
		 * However, one iteration of recursive CTE is no guarantee that the future outer
		 * batch will also not match batch k on the inner. Therefore, we may have a
		 * non-null outer batch k on some future iteration.
		 *
		 * If during loading batch k inner workfile for future iteration triggers a re-spill
		 * we will be forced to increase number of batches. This will result in wrong result
		 * as we will not write any inner tuples (we consider inner workfiles read-only after
		 * a rescan call).
		 *
		 * So, to produce wrong result, without this guard, the following conditions have
		 * to be true:
		 *
		 * 1. Outer batchfile for batch k is null
		 * 2. Inner batchfile for batch k not null
		 * 3. No resizing of nbatch for batch (0...(k-1))
		 * 4. Inner batchfile for batch k is too big to fit in memory
		 */
		if (hjstate->reuse_hashtable)
			break;

930
		batch = hashtable->batches[curbatch];
931
		if (batch->outerside.workfile != NULL &&
932
			((hjstate->js.jointype == JOIN_LEFT) ||
933 934
			 (hjstate->js.jointype == JOIN_LASJ) ||
			 (hjstate->js.jointype == JOIN_LASJ_NOTIN)))
935
			break;				/* must process due to rule 1 */
936
		if (batch->innerside.workfile != NULL &&
937 938
			nbatch != hashtable->nbatch_original)
			break;				/* must process due to rule 2 */
939
		if (batch->outerside.workfile != NULL &&
940 941 942 943
			nbatch != hashtable->nbatch_outstart)
			break;				/* must process due to rule 3 */
		/* We can ignore this batch. */
		/* Release associated temp files right away. */
944
		if (batch->innerside.workfile != NULL && !hjstate->reuse_hashtable)
945 946
		{
			workfile_mgr_close_file(hashtable->work_set, batch->innerside.workfile);
947
			batch->innerside.workfile = NULL;
948
		}
949

950 951 952 953 954 955
		if (batch->outerside.workfile != NULL)
		{
			workfile_mgr_close_file(hashtable->work_set, batch->outerside.workfile);
		}
		batch->outerside.workfile = NULL;

956
		curbatch++;
957
	}
958

959 960 961
	hashtable->curbatch = curbatch;		/* CDB: upd before return, even if no
										 * more data, so stats logic can see
										 * whether join was run to completion */
962

963 964
	if (curbatch >= nbatch)
		return curbatch;		/* no more batches */
965

966
	batch = hashtable->batches[curbatch];
967

968
	if (!ExecHashJoinReloadHashTable(hjstate))
969
	{
970 971
		/* We no longer continue as we couldn't load the batch */
		return nbatch;
972
	}
973 974 975
	/*
	 * If there's no outer batch file, advance to next batch.
	 */
976
	if (batch->outerside.workfile == NULL)
977 978 979 980 981 982
		goto start_over;

	/*
	 * Rewind outer batch file, so that we can start reading it.
	 */
	bool		result = ExecWorkFile_Rewind(batch->outerside.workfile);
983

984
	if (!result)
985
	{
986 987 988
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("could not access temporary file")));
989
	}
990 991

	return curbatch;
992 993
}

994 995 996
/*
 * ExecHashJoinSaveTuple
 *		save a tuple to a batch file.
997
 *
998
 * The data recorded in the file for each tuple is its hash value,
999
 * then the tuple in MinimalTuple format.
1000
 *
1001 1002 1003
 * Note: it is important always to call this in the regular executor
 * context, not in a shorter-lived context; else the temp file buffers
 * will get messed up.
1004
 */
1005
void
1006 1007 1008
ExecHashJoinSaveTuple(PlanState *ps, MemTuple tuple, uint32 hashvalue,
					  HashJoinTable hashtable, struct HashJoinBatchSide *batchside,
					  MemoryContext bfCxt)
1009
{
1010 1011 1012 1013 1014 1015 1016 1017
	if (hashtable->work_set == NULL)
	{
		hashtable->hjstate->workfiles_created = true;
		if (hashtable->hjstate->js.ps.instrument)
		{
			hashtable->hjstate->js.ps.instrument->workfileCreated = true;
		}

1018 1019
		MemoryContext oldcxt;

1020
		oldcxt = MemoryContextSwitchTo(bfCxt);
1021

1022 1023
		hashtable->work_set = workfile_mgr_create_set(gp_workfile_type_hashjoin,
				true, /* can_be_reused */
1024
				&hashtable->hjstate->js.ps);
1025

1026 1027 1028 1029
		/*
		 * First time spilling. Before creating any spill files, create a
		 * metadata file
		 */
1030 1031 1032 1033 1034 1035 1036
		hashtable->state_file = workfile_mgr_create_fileno(hashtable->work_set, WORKFILE_NUM_HASHJOIN_METADATA);
		elog(gp_workfile_caching_loglevel, "created state file %s", ExecWorkFile_GetFileName(hashtable->state_file));

		MemoryContextSwitchTo(oldcxt);
	}

	if (batchside->workfile == NULL)
1037
	{
1038 1039
		MemoryContext oldcxt;

1040 1041 1042 1043 1044
		oldcxt = MemoryContextSwitchTo(bfCxt);

		/* First write to this batch file, so create it */
		Assert(hashtable->work_set != NULL);
		batchside->workfile = workfile_mgr_create_file(hashtable->work_set);
1045

1046 1047 1048 1049 1050
		elog(gp_workfile_caching_loglevel, "create batch file %s with gp_workfile_compress_algorithm=%d",
			 ExecWorkFile_GetFileName(batchside->workfile),
			 hashtable->work_set->metadata.bfz_compress_type);

		MemoryContextSwitchTo(oldcxt);
1051 1052
	}

1053
	if (!ExecWorkFile_Write(batchside->workfile, (void *) &hashvalue, sizeof(uint32)))
1054 1055 1056
	{
		workfile_mgr_report_error();
	}
1057

1058
	if (!ExecWorkFile_Write(batchside->workfile, (void *) tuple, memtuple_get_size(tuple)))
1059 1060 1061 1062 1063 1064
	{
		workfile_mgr_report_error();
	}

	batchside->total_tuples++;

1065
	if (ps)
1066 1067 1068
	{
		CheckSendPlanStateGpmonPkt(ps);
	}
1069
}
V
Vadim B. Mikheev 已提交
1070

1071 1072
/*
 * ExecHashJoinGetSavedTuple
B
Bruce Momjian 已提交
1073
 *		read the next tuple from a batch file.	Return NULL if no more.
1074 1075 1076 1077 1078
 *
 * On success, *hashvalue is set to the tuple's hash value, and the tuple
 * itself is stored in the given slot.
 */
static TupleTableSlot *
1079 1080
ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
						  HashJoinBatchSide *batchside,
1081 1082 1083
						  uint32 *hashvalue,
						  TupleTableSlot *tupleSlot)
{
1084
	uint32		header[2];
1085
	size_t		nread;
1086
	MemTuple	tuple;
1087

1088
	/*
B
Bruce Momjian 已提交
1089 1090 1091
	 * Since both the hash value and the MinimalTuple length word are uint32,
	 * we can read them both in one BufFileRead() call without any type
	 * cheating.
1092
	 */
1093 1094
	nread = ExecWorkFile_Read(batchside->workfile, (void *) header, sizeof(header));
	if (nread != sizeof(header))				/* end of file */
1095 1096 1097 1098
	{
		ExecClearTuple(tupleSlot);
		return NULL;
	}
1099

1100
	*hashvalue = header[0];
1101
	tuple = (MemTuple) palloc(memtuple_size_from_uint32(header[1]));
1102
	memtuple_set_mtlen(tuple, header[1]);
1103 1104 1105 1106 1107 1108

	nread = ExecWorkFile_Read(batchside->workfile,
							  (void *) ((char *) tuple + sizeof(uint32)),
							  memtuple_size_from_uint32(header[1]) - sizeof(uint32));
	
	if (nread != memtuple_size_from_uint32(header[1]) - sizeof(uint32))
1109 1110
		ereport(ERROR,
				(errcode_for_file_access(),
1111
				 errmsg("could not read from hash-join temporary file")));
1112
	return ExecStoreMinimalTuple(tuple, tupleSlot, true);
1113 1114
}

1115

V
Vadim B. Mikheev 已提交
1116
void
1117
ExecReScanHashJoin(HashJoinState *node, ExprContext *exprCtxt)
1118
{
1119
	/*
B
Bruce Momjian 已提交
1120 1121 1122 1123 1124
	 * In a multi-batch join, we currently have to do rescans the hard way,
	 * primarily because batch temp files may have already been released. But
	 * if it's a single-batch join, and there is no parameter change for the
	 * inner subnode, then we can just re-use the existing hash table without
	 * rebuilding it.
V
Vadim B. Mikheev 已提交
1125
	 */
1126
	if (node->hj_HashTable != NULL)
V
Vadim B. Mikheev 已提交
1127
	{
1128 1129 1130
		node->hj_HashTable->first_pass = false;

		if (((PlanState *) node)->righttree->chgParam == NULL
1131
			&& !node->hj_HashTable->eagerlyReleased)
1132
		{
1133 1134 1135
			/*
			 * okay to reuse the hash table; needn't rescan inner, either.
			 *
B
Bruce Momjian 已提交
1136 1137
			 * What we do need to do is reset our state about the emptiness of
			 * the outer relation, so that the new scan of the outer will
1138 1139 1140 1141 1142 1143 1144 1145
			 * update it correctly if it turns out to be empty this time.
			 * (There's no harm in clearing it now because ExecHashJoin won't
			 * need the info.  In the other cases, where the hash table
			 * doesn't exist or we are destroying it, we leave this state
			 * alone because ExecHashJoin will need it the first time
			 * through.)
			 */
			node->hj_OuterNotEmpty = false;
1146

1147 1148 1149 1150 1151 1152 1153 1154 1155 1156
			if (node->hj_HashTable->nbatch > 1)
			{
				/* Force reloading batch 0 upon next ExecHashJoin */
				node->hj_HashTable->curbatch = -1;
			}
			else
			{
				/* MPP-1600: reset the batch number */
				node->hj_HashTable->curbatch = 0;
			}
1157 1158 1159 1160
		}
		else
		{
			/* must destroy and rebuild hash table */
1161 1162
			if (!node->hj_HashTable->eagerlyReleased)
			{
1163 1164
				HashState  *hashState = (HashState *) innerPlanState(node);

1165 1166 1167
				ExecHashTableDestroy(hashState, node->hj_HashTable);
			}
			pfree(node->hj_HashTable);
1168
			node->hj_HashTable = NULL;
B
Bruce Momjian 已提交
1169

1170 1171 1172 1173 1174 1175 1176
			/*
			 * if chgParam of subnode is not null then plan will be re-scanned
			 * by first ExecProcNode.
			 */
			if (((PlanState *) node)->righttree->chgParam == NULL)
				ExecReScan(((PlanState *) node)->righttree, exprCtxt);
		}
V
Vadim B. Mikheev 已提交
1177
	}
1178

1179
	/* Always reset intra-tuple state */
1180
	node->hj_CurHashValue = 0;
1181
	node->hj_CurBucketNo = 0;
1182
	node->hj_CurTuple = NULL;
V
Vadim B. Mikheev 已提交
1183

1184
	node->js.ps.ps_OuterTupleSlot = NULL;
1185 1186
	node->hj_NeedNewOuter = true;
	node->hj_MatchedOuter = false;
1187
	node->hj_FirstOuterTupleSlot = NULL;
1188 1189

	/*
B
Bruce Momjian 已提交
1190 1191
	 * if chgParam of subnode is not null then plan will be re-scanned by
	 * first ExecProcNode.
V
Vadim B. Mikheev 已提交
1192
	 */
1193 1194
	if (((PlanState *) node)->lefttree->chgParam == NULL)
		ExecReScan(((PlanState *) node)->lefttree, exprCtxt);
V
Vadim B. Mikheev 已提交
1195
}
1196 1197 1198 1199 1200

/**
 * This method releases the hash table's memory. It maintains some of the other
 * aspects of the hash table like memory usage statistics. These may be required
 * during an explain analyze. A hash table that has been released cannot perform
1201
 * any useful function anymore.
1202
 */
1203 1204
static void
ReleaseHashTable(HashJoinState *node)
1205 1206 1207
{
	if (node->hj_HashTable)
	{
1208
		HashState *hashState = (HashState *) innerPlanState(node);
1209 1210 1211

		/* This hashtable should not have been released already! */
		Assert(!node->hj_HashTable->eagerlyReleased);
1212 1213 1214 1215 1216
		if (node->hj_HashTable->stats)
		{
			/* Report on batch in progress. */
			ExecHashTableExplainBatchEnd(hashState, node->hj_HashTable);
		}
1217 1218 1219
		ExecHashTableDestroy(hashState, node->hj_HashTable);
		node->hj_HashTable->eagerlyReleased = true;
	}
1220

1221 1222 1223 1224 1225 1226 1227 1228
	/* Always reset intra-tuple state */
	node->hj_CurHashValue = 0;
	node->hj_CurBucketNo = 0;
	node->hj_CurTuple = NULL;

	node->js.ps.ps_OuterTupleSlot = NULL;
	node->hj_NeedNewOuter = true;
	node->hj_MatchedOuter = false;
1229
	node->hj_FirstOuterTupleSlot = NULL;
1230 1231 1232 1233 1234 1235 1236 1237 1238

}

/* Is this an IS-NOT-DISTINCT-join qual list (as opposed the an equijoin)?
 *
 * XXX We perform an abbreviated test based on the assumptions that 
 *     these are the only possibilities and that all conjuncts are 
 *     alike in this regard.
 */
1239 1240
bool
isNotDistinctJoin(List *qualList)
1241
{
1242 1243 1244
	ListCell   *lc;

	foreach(lc, qualList)
1245
	{
1246
		BoolExpr   *bex = (BoolExpr *) lfirst(lc);
1247
		DistinctExpr *dex;
1248 1249

		if (IsA(bex, BoolExpr) &&bex->boolop == NOT_EXPR)
1250
		{
1251 1252 1253 1254
			dex = (DistinctExpr *) linitial(bex->args);

			if (IsA(dex, DistinctExpr))
				return true;	/* We assume the rest follow suit! */
1255 1256 1257 1258 1259 1260 1261 1262 1263
		}
	}
	return false;
}

void
initGpmonPktForHashJoin(Plan *planNode, gpmon_packet_t *gpmon_pkt, EState *estate)
{
	Assert(planNode != NULL && gpmon_pkt != NULL && IsA(planNode, HashJoin));
1264

1265
	InitPlanNodeGpmonPkt(planNode, gpmon_pkt, estate);
1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279
}

void
ExecEagerFreeHashJoin(HashJoinState *node)
{
	if (node->hj_HashTable != NULL && !node->hj_HashTable->eagerlyReleased)
	{
		ReleaseHashTable(node);
	}
}

/*
 * isHashtableEmpty
 *
1280 1281
 *	After populating the hashtable with all the tuples from the innerside,
 *	scan all the batches and return true if the hashtable is completely empty
1282 1283 1284 1285 1286
 *
 */
static bool
isHashtableEmpty(HashJoinTable hashtable)
{
1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308
	int			i;
	bool		isEmpty = true;

	/* Is there a nonempty batch? */
	for (i = 0; i < hashtable->nbatch; i++)
	{
		/*
		 * For batch 0, the number of inner tuples is stored in
		 * batches[i].innertuples. For batches on disk (1 and above), the
		 * batches[i].innertuples is 0, but batches[i].innerside.workfile is
		 * non-NULL if any tuples were written to disk. Check both here.
		 */
		if ((hashtable->batches[i]->innertuples > 0) ||
			(NULL != hashtable->batches[i]->innerside.workfile))
		{
			/* Found a non-empty batch, stop the search */
			isEmpty = false;
			break;
		}
	}

	return isEmpty;
1309 1310
}

1311 1312
/*
 * In our hybrid hash join we either spill when we increase number of batches
1313 1314 1315 1316
 * or when we re-spill. As we go, we normally destroy the batch file of the
 * batch that we have already processed. But if we need to support re-scanning
 * of the outer tuples, without also re-scanning the inner side, we need to
 * save the current hash for the next re-scan, instead.
1317 1318
 */
static void
1319
SpillCurrentBatch(HashJoinState *node)
1320 1321 1322 1323 1324 1325
{
	HashJoinTable hashtable = node->hj_HashTable;
	HashJoinBatchData *fullbatch = hashtable->batches[hashtable->curbatch];
	HashJoinTuple tuple;
	int			i;

1326
	Assert(fullbatch->innerside.workfile == NULL);
1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352

	for (i = 0; i < hashtable->nbuckets; i++)
	{
		tuple = hashtable->buckets[i];

		while (tuple != NULL)
		{
			ExecHashJoinSaveTuple(NULL, HJTUPLE_MINTUPLE(tuple),
								  tuple->hashvalue,
								  hashtable,
								  &fullbatch->innerside,
								  hashtable->bfCxt);
			tuple = tuple->next;
		}
	}
}

static bool
ExecHashJoinReloadHashTable(HashJoinState *hjstate)
{
	HashState  *hashState = (HashState *) innerPlanState(hjstate);
	HashJoinTable hashtable = hjstate->hj_HashTable;
	TupleTableSlot *slot;
	uint32		hashvalue;
	int curbatch = hashtable->curbatch;
	HashJoinBatchData *batch = hashtable->batches[curbatch];
1353 1354 1355 1356
	int			nmoved = 0;
#if 0
	int			orignbatch = hashtable->nbatch;
#endif
1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391

	/*
	 * Reload the hash table with the new inner batch (which could be empty)
	 */
	ExecHashTableReset(hashState, hashtable);

	if (batch->innerside.workfile != NULL)
	{
		/* Rewind batch file */
		bool		result = ExecWorkFile_Rewind(batch->innerside.workfile);

		if (!result)
		{
			ereport(ERROR, (errcode_for_file_access(),
							errmsg("could not access temporary file")));
		}

		for (;;)
		{
			CHECK_FOR_INTERRUPTS();

			if (QueryFinishPending)
				return false;

			slot = ExecHashJoinGetSavedTuple(hjstate,
											 &batch->innerside,
											 &hashvalue,
											 hjstate->hj_HashTupleSlot);
			if (!slot)
				break;

			/*
			 * NOTE: some tuples may be sent to future batches.  Also, it is
			 * possible for hashtable->nbatch to be increased here!
			 */
1392 1393
			if (!ExecHashTableInsert(hashState, hashtable, slot, hashvalue))
				nmoved++;
1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408
		}

		/*
		 * after we build the hash table, the inner batch file is no longer
		 * needed
		 */
		if (hjstate->js.ps.instrument)
		{
			Assert(hashtable->stats);
			hashtable->stats->batchstats[curbatch].innerfilesize =
				ExecWorkFile_Tell64(hashtable->batches[curbatch]->innerside.workfile);
		}

		SIMPLE_FAULT_INJECTOR(WorkfileHashJoinFailure);

1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422
		/*
		 * If we want to re-use the hash table after a re-scan, don't
		 * delete it yet. But if we did not load the batch file into memory as is,
		 * because some tuples were sent to later batches, then delete it now, so
		 * that it will be recreated with just the remaining tuples, after processing
		 * this batch.
		 *
		 * XXX: Currently, we actually always close the file, and recreate it
		 * afterwards, even if there are no changes. That's because the workfile
		 * API doesn't support appending to a file that's already been read from.
		 */
#if 0
		if (!hjstate->reuse_hashtable || nmoved > 0 || hashtable->nbatch != orignbatch)
#endif
1423 1424 1425 1426 1427 1428 1429 1430 1431
		{
			workfile_mgr_close_file(hashtable->work_set, batch->innerside.workfile);
			batch->innerside.workfile = NULL;
		}
	}

	return true;
}

1432
/* EOF */