nodeHashjoin.c 38.2 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * nodeHashjoin.c
4
 *	  Routines to handle hash join nodes
5
 *
6 7
 * Portions Copyright (c) 2005-2008, Greenplum inc
 * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
B
Add:  
Bruce Momjian 已提交
8
 * Portions Copyright (c) 1994, Regents of the University of California
9 10 11
 *
 *
 * IDENTIFICATION
12
 *	  $PostgreSQL: pgsql/src/backend/executor/nodeHashjoin.c,v 1.93 2008/01/01 19:45:49 momjian Exp $
13 14 15
 *
 *-------------------------------------------------------------------------
 */
16

B
Bruce Momjian 已提交
17
#include "postgres.h"
18 19

#include "executor/executor.h"
20
#include "executor/hashjoin.h"
21
#include "executor/instrument.h"        /* Instrumentation */
22 23
#include "executor/nodeHash.h"
#include "executor/nodeHashjoin.h"
24
#include "utils/faultinjector.h"
25 26
#include "utils/memutils.h"

27 28 29 30
#include "cdb/cdbvars.h"
#include "miscadmin.h" /* work_mem */

#define EMPTY_WORKFILE_NAME "empty_workfile"
31

32
static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
B
Bruce Momjian 已提交
33 34
						  HashJoinState *hjstate,
						  uint32 *hashvalue);
35
static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinBatchSide *side,
36
						  uint32 *hashvalue,
B
Bruce Momjian 已提交
37
						  TupleTableSlot *tupleSlot);
38
static int	ExecHashJoinNewBatch(HashJoinState *hjstate);
39
static bool isNotDistinctJoin(List *qualList);
40

41 42
static void ReleaseHashTable(HashJoinState *node);
static bool isHashtableEmpty(HashJoinTable hashtable);
43

44
/* ----------------------------------------------------------------
45
 *		ExecHashJoin
46
 *
47
 *		This function implements the Hybrid Hashjoin algorithm.
48 49 50
 *
 *		Note: the relation we build hash table on is the "inner"
 *			  the other one is "outer".
51 52
 * ----------------------------------------------------------------
 */
53
TupleTableSlot *				/* return: a tuple or NULL */
54
ExecHashJoin(HashJoinState *node)
55
{
56
	EState	   *estate;
57 58
	PlanState  *outerNode;
	HashState  *hashNode;
59 60
	List	   *joinqual;
	List	   *otherqual;
61
	TupleTableSlot *inntuple;
62 63
	ExprContext *econtext;
	HashJoinTable hashtable;
64
	HashJoinTuple curtuple;
65
	TupleTableSlot *outerTupleSlot;
66 67
	uint32		hashvalue;
	int			batchno;
68

69 70
	/*
	 * get information from HashJoin node
71
	 */
72 73 74 75 76
	estate = node->js.ps.state;
	joinqual = node->js.joinqual;
	otherqual = node->js.ps.qual;
	hashNode = (HashState *) innerPlanState(node);
	outerNode = outerPlanState(node);
77

78
	/*
79
	 * get information from HashJoin state
80
	 */
81 82
	hashtable = node->hj_HashTable;
	econtext = node->js.ps.ps_ExprContext;
83

84
	/*
B
Bruce Momjian 已提交
85 86 87
	 * If we're doing an IN join, we want to return at most one row per outer
	 * tuple; so we can stop scanning the inner scan if we matched on the
	 * previous try.
88
	 */
89
	if (node->js.jointype == JOIN_IN && node->hj_MatchedOuter)
90 91
		node->hj_NeedNewOuter = true;

92 93
	/*
	 * Reset per-tuple memory context to free any expression evaluation
B
Bruce Momjian 已提交
94 95
	 * storage allocated in the previous tuple cycle.  Note this can't happen
	 * until we're done projecting out tuples from a join tuple.
96 97 98
	 */
	ResetExprContext(econtext);

99 100 101
	/*
	 * if this is the first call, build the hash table for inner relation
	 */
102
	if (hashtable == NULL)
103 104
	{
		/*
105 106 107 108 109
		 * MPP-4165: My fix for MPP-3300 was correct in that we avoided
		 * the *deadlock* but had very unexpected (and painful)
		 * performance characteristics: we basically de-pipeline and
		 * de-parallelize execution of any query which has motion below
		 * us.
110
		 *
111 112
		 * So now prefetch_inner is set (see createplan.c) if we have *any* motion
		 * below us. If we don't have any motion, it doesn't matter.
113
		 */
114
		if (!node->prefetch_inner)
115
		{
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
			/*
			 * If the outer relation is completely empty, we can quit without
			 * building the hash table.  However, for an inner join it is only a
			 * win to check this when the outer relation's startup cost is less
			 * than the projected cost of building the hash table.	Otherwise it's
			 * best to build the hash table first and see if the inner relation is
			 * empty.  (When it's an outer join, we should always make this check,
			 * since we aren't going to be able to skip the join on the strength
			 * of an empty inner relation anyway.)
			 *
			 * If we are rescanning the join, we make use of information gained on
			 * the previous scan: don't bother to try the prefetch if the previous
			 * scan found the outer relation nonempty.	This is not 100% reliable
			 * since with new parameters the outer relation might yield different
			 * results, but it's a good heuristic.
			 *
			 * The only way to make the check is to try to fetch a tuple from the
			 * outer plan node.  If we succeed, we have to stash it away for later
			 * consumption by ExecHashJoinOuterGetTuple.
			 */
			if ((node->js.jointype == JOIN_LEFT) ||
					(node->js.jointype == JOIN_LASJ) ||
					(node->js.jointype == JOIN_LASJ_NOTIN) ||
					(outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
						!node->hj_OuterNotEmpty))
141
			{
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
				TupleTableSlot *slot;

                slot = ExecProcNode(outerNode);

				node->hj_FirstOuterTupleSlot = slot;
				if (TupIsNull(node->hj_FirstOuterTupleSlot))
				{
					node->hj_OuterNotEmpty = false;

					/* CDB: Tell inner subtree that its data will not be needed. */
					ExecSquelchNode((PlanState *)hashNode);

					return NULL;
				}
				else
					node->hj_OuterNotEmpty = true;
158 159
			}
			else
160
				node->hj_FirstOuterTupleSlot = NULL;
161
		}
162
		else
163 164 165
		{
			/* see MPP-989 comment above, for now we assume that we have
			 * at least one row on the outer. */
166
			node->hj_FirstOuterTupleSlot = NULL;
167 168
		}

169
		/*
170
		 * create the hash table
171
		 */
172 173 174
		hashtable = ExecHashTableCreate(hashNode,
										node,
										node->hj_HashOperators,
175
										PlanStateOperatorMemKB((PlanState *) hashNode));
176
		node->hj_HashTable = hashtable;
177

178 179 180 181 182 183 184
        /*
         * CDB: Offer extra info for EXPLAIN ANALYZE.
         */
        if (estate->es_instrument)
            ExecHashTableExplainInit(hashNode, node, hashtable);


185
		/*
186
		 * execute the Hash node, to build the hash table
187
		 */
188
		hashNode->hashtable = hashtable;
189

190
		/*
191 192 193 194 195 196 197 198 199
		 * Only if doing a LASJ_NOTIN join, we want to quit as soon as we find
		 * a NULL key on the inner side
		 */
		hashNode->hs_quit_if_hashkeys_null = (node->js.jointype == JOIN_LASJ_NOTIN);

		/* Store pointer to the HashJoinState in the hashtable, as we will need
		 * the HashJoin plan when creating the spill file set */
		hashtable->hjstate = node;

200 201
		/* Execute the Hash node and build the hashtable */
		(void) MultiExecProcNode((PlanState *) hashNode);
202 203 204 205 206 207 208

#ifdef HJDEBUG
		elog(gp_workfile_caching_loglevel, "HashJoin built table with %.1f tuples by executing subplan for batch 0", hashtable->totalTuples);
#endif

		/**
		 * If LASJ_NOTIN and a null was found on the inner side, then clean out.
209
		 */
210 211 212 213 214 215 216 217 218 219 220 221 222
		if (node->js.jointype == JOIN_LASJ_NOTIN && hashNode->hs_hashkeys_null)
		{
			/*
			 * CDB: We'll read no more from outer subtree. To keep sibling
			 * QEs from being starved, tell source QEs not to clog up the
			 * pipeline with our never-to-be-consumed data.
			 */
			ExecSquelchNode(outerNode);
			/* end of join */
			if (gp_eager_hashtable_release)
			{
				ExecEagerFreeHashJoin(node);
			}
223
			return NULL;
224
		}
225

226
		/*
227 228 229
		 * We just scanned the entire inner side and built the hashtable
		 * (and its overflow batches). Check here and remember if the inner
		 * side is empty.
230
		 */
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254
		node->hj_InnerEmpty = isHashtableEmpty(hashtable);

		/*
		 * If the inner relation is completely empty, and we're not doing an
		 * outer join, we can quit without scanning the outer relation.
		 */
		if (node->js.jointype != JOIN_LEFT
				&& node->js.jointype != JOIN_LASJ
				&& node->js.jointype != JOIN_LASJ_NOTIN
				&& node->hj_InnerEmpty)
        {
			/*
			 * CDB: We'll read no more from outer subtree. To keep sibling
			 * QEs from being starved, tell source QEs not to clog up the
			 * pipeline with our never-to-be-consumed data.
			 */
			ExecSquelchNode(outerNode);
			/* end of join */
			if (gp_eager_hashtable_release)
			{
				ExecEagerFreeHashJoin(node);
			}
			return NULL;
        }
255 256 257

		/*
		 * Reset OuterNotEmpty for scan.  (It's OK if we fetched a tuple
B
Bruce Momjian 已提交
258 259
		 * above, because ExecHashJoinOuterGetTuple will immediately set it
		 * again.)
260 261
		 */
		node->hj_OuterNotEmpty = false;
262
	}
263

264
	/*
265
	 * run the hash join process
266
	 */
267
	for (;;)
268
	{
269 270 271
		/* We must never use an eagerly released hash table */
		Assert(!hashtable->eagerlyReleased);
		
272
		/*
273
		 * If we don't have an outer tuple, get the next one
274
		 */
275
		if (node->hj_NeedNewOuter)
276
		{
277 278 279
			outerTupleSlot = ExecHashJoinOuterGetTuple(outerNode,
													   node,
													   &hashvalue);
280
			if (TupIsNull(outerTupleSlot))
281 282
			{
				/* end of join */
283 284 285 286
				if (gp_eager_hashtable_release)
				{
					ExecEagerFreeHashJoin(node);
				}
287 288 289
				return NULL;
			}

290 291
			Gpmon_M_Incr(GpmonPktFromHashJoinState(node), GPMON_QEXEC_M_ROWSIN); 
			CheckSendPlanStateGpmonPkt(&node->js.ps);
292 293 294 295 296 297
			node->js.ps.ps_OuterTupleSlot = outerTupleSlot;
			econtext->ecxt_outertuple = outerTupleSlot;
			node->hj_NeedNewOuter = false;
			node->hj_MatchedOuter = false;

			/*
B
Bruce Momjian 已提交
298 299
			 * now we have an outer tuple, find the corresponding bucket for
			 * this tuple from the hash table
300 301 302 303 304 305 306
			 */
			node->hj_CurHashValue = hashvalue;
			ExecHashGetBucketAndBatch(hashtable, hashvalue,
									  &node->hj_CurBucketNo, &batchno);
			node->hj_CurTuple = NULL;

			/*
B
Bruce Momjian 已提交
307 308
			 * Now we've got an outer tuple and the corresponding hash bucket,
			 * but this tuple may not belong to the current batch.
309
			 */
310
			if (batchno != hashtable->curbatch)
311 312
			{
				/*
B
Bruce Momjian 已提交
313 314
				 * Need to postpone this outer tuple to a later batch. Save it
				 * in the corresponding outer-batch file.
315
				 */
316
				Assert(batchno != 0);
317
				Assert(batchno > hashtable->curbatch);
318
				ExecHashJoinSaveTuple(&node->js.ps, ExecFetchSlotMemTuple(outerTupleSlot, false),
319
									  hashvalue,
320 321 322
									  hashtable,
                                      &hashtable->batches[batchno]->outerside,
									  hashtable->bfCxt);
323
				node->hj_NeedNewOuter = true;
B
Bruce Momjian 已提交
324
				continue;		/* loop around for a new outer tuple */
325
			}
326
		}
327

328 329
		/*
		 * OK, scan the selected hash bucket for matches
330
		 */
331
		for (;;)
332
		{
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347
			/*
			 * OPT-3325: Handle NULLs in the outer side of LASJ_NOTIN
			 *  - if tuple is NULL and inner is not empty, drop outer tuple
			 *  - if tuple is NULL and inner is empty, keep going as we'll
			 *    find no match for this tuple in the inner side
			 */
			if (node->js.jointype == JOIN_LASJ_NOTIN &&
					!node->hj_InnerEmpty &&
					isJoinExprNull(node->hj_OuterHashKeys,econtext))
			{
				node->hj_MatchedOuter = true;
				break;		/* loop around for a new outer tuple */
			}

			curtuple = ExecScanHashBucket(hashNode, node, econtext);
348 349
			if (curtuple == NULL)
				break;			/* out of matches */
B
Bruce Momjian 已提交
350

351
			/*
352
			 * we've got a match, but still need to test non-hashed quals
353
			 */
354
			inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(curtuple),
355 356
										 node->hj_HashTupleSlot,
										 false);	/* don't pfree */
357
			econtext->ecxt_innertuple = inntuple;
358

359
			/* reset temp memory each time to avoid leaks from qual expr */
360 361
			ResetExprContext(econtext);

362 363
			/*
			 * if we pass the qual, then save state for next call and have
B
Bruce Momjian 已提交
364 365
			 * ExecProject form the projection, store it in the tuple table,
			 * and return the slot.
366
			 *
B
Bruce Momjian 已提交
367 368
			 * Only the joinquals determine MatchedOuter status, but all quals
			 * must pass to actually return the tuple.
369
			 */
370
			if (joinqual == NIL || ExecQual(joinqual, econtext, false /* resultForNull */))
371
			{
372
				node->hj_MatchedOuter = true;
373

374 375
				/* In an antijoin, we never return a matched tuple */
				if (node->js.jointype == JOIN_LASJ || node->js.jointype == JOIN_LASJ_NOTIN)
376
				{
377 378 379
					node->hj_NeedNewOuter = true;
					break;		/* out of loop over hash bucket */
				}
380

381 382 383 384 385
				if (otherqual == NIL || ExecQual(otherqual, econtext, false))
				{
					Gpmon_M_Incr_Rows_Out(GpmonPktFromHashJoinState(node));
					CheckSendPlanStateGpmonPkt(&node->js.ps);
					return ExecProject(node->js.ps.ps_ProjInfo, NULL);
386
				}
387

B
Bruce Momjian 已提交
388
				/*
B
Bruce Momjian 已提交
389
				 * If we didn't return a tuple, may need to set NeedNewOuter
B
Bruce Momjian 已提交
390
				 */
391 392 393 394 395
				if (node->js.jointype == JOIN_IN)
				{
					node->hj_NeedNewOuter = true;
					break;		/* out of loop over hash bucket */
				}
396 397 398
			}
		}

399 400
		/*
		 * Now the current outer tuple has run out of matches, so check
B
Bruce Momjian 已提交
401 402
		 * whether to emit a dummy outer-join tuple. If not, loop around to
		 * get a new outer tuple.
403
		 */
404
		node->hj_NeedNewOuter = true;
405

406
		if (!node->hj_MatchedOuter &&
407 408 409
			(node->js.jointype == JOIN_LEFT ||
			node->js.jointype == JOIN_LASJ ||
			node->js.jointype == JOIN_LASJ_NOTIN))
410 411
		{
			/*
B
Bruce Momjian 已提交
412 413 414
			 * We are doing an outer join and there were no join matches for
			 * this outer tuple.  Generate a fake join tuple with nulls for
			 * the inner tuple, and return it if it passes the non-join quals.
415
			 */
416
			econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
417

418
			if (otherqual == NIL || ExecQual(otherqual, econtext, false))
419
			{
420 421 422
				Gpmon_M_Incr_Rows_Out(GpmonPktFromHashJoinState(node));
				CheckSendPlanStateGpmonPkt(&node->js.ps);
				return ExecProject(node->js.ps.ps_ProjInfo, NULL);
423 424
			}
		}
425
	}
426 427 428
}

/* ----------------------------------------------------------------
429
 *		ExecInitHashJoin
430
 *
431
 *		Init routine for HashJoin node.
432 433
 * ----------------------------------------------------------------
 */
434
HashJoinState *
435
ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
436
{
437 438 439
	HashJoinState *hjstate;
	Plan	   *outerNode;
	Hash	   *hashNode;
440 441
	List	   *lclauses;
	List	   *rclauses;
442
	List	   *hoperators;
443
	ListCell   *l;
444

445 446 447
	/* check for unsupported flags */
	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));

448
	/*
449 450 451
	 * create state structure
	 */
	hjstate = makeNode(HashJoinState);
452 453
	hjstate->js.ps.plan = (Plan *) node;
	hjstate->js.ps.state = estate;
454

455 456
	/*
	 * Miscellaneous initialization
457
	 *
458
	 * create expression context for node
459
	 */
460 461
	ExecAssignExprContext(estate, &hjstate->js.ps);

462 463 464 465 466 467 468 469 470
	if ( node->hashqualclauses != NIL )
	{
		/* CDB: This must be an IS NOT DISTINCT join!  */
		Insist( isNotDistinctJoin(node->hashqualclauses) );
		hjstate->hj_nonequijoin = true;
	}
	else
		hjstate->hj_nonequijoin = false;

471 472 473 474
	/*
	 * initialize child expressions
	 */
	hjstate->js.ps.targetlist = (List *)
475
		ExecInitExpr((Expr *) node->join.plan.targetlist,
476 477
					 (PlanState *) hjstate);
	hjstate->js.ps.qual = (List *)
478
		ExecInitExpr((Expr *) node->join.plan.qual,
479 480 481
					 (PlanState *) hjstate);
	hjstate->js.jointype = node->join.jointype;
	hjstate->js.joinqual = (List *)
482
		ExecInitExpr((Expr *) node->join.joinqual,
483 484
					 (PlanState *) hjstate);
	hjstate->hashclauses = (List *)
485
		ExecInitExpr((Expr *) node->hashclauses,
486
					 (PlanState *) hjstate);
487 488 489 490 491 492 493 494 495 496 497 498 499 500 501
	
	if ( node->hashqualclauses != NIL )
	{
		hjstate->hashqualclauses = (List *)
			ExecInitExpr((Expr *) node->hashqualclauses,
						 (PlanState *) hjstate);
	}
	else
	{
		hjstate->hashqualclauses = hjstate->hashclauses;
	}	

	/* MPP-3300, we only pre-build hashtable if we need to (this is
	 * relaxing the fix to MPP-989) */
	hjstate->prefetch_inner = node->join.prefetch_inner;
502

503
	/*
504
	 * initialize child nodes
505 506 507 508
	 *
	 * Note: we could suppress the REWIND flag for the inner input, which
	 * would amount to betting that the hash will be a single batch.  Not
	 * clear if this would be a win or not.
509
	 */
510
	hashNode = (Hash *) innerPlan(node);
511
	outerNode = outerPlan(node);
512

513 514 515 516 517 518 519 520
	/* 
	 * XXX The following order are significant.  We init Hash first, then the outerNode
	 * this is the same order as we execute (in the sense of the first exec called).
	 * Until we have a better way to uncouple, share input needs this to be true.  If the
	 * order is wrong, when both hash and outer node have share input and (both ?) have 
	 * a subquery node, share input will fail because the estate of the nodes can not be
	 * set up correctly.
	 */
521
	innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate, eflags);
522 523 524
	((HashState *) innerPlanState(hjstate))->hs_keepnull = hjstate->hj_nonequijoin;

	outerPlanState(hjstate) = ExecInitNode(outerNode, estate, eflags);
525

526
#define HASHJOIN_NSLOTS 3
527 528 529

	/*
	 * tuple table initialization
530
	 */
531
	ExecInitResultTupleSlot(estate, &hjstate->js.ps);
532 533 534 535 536
	hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate);

	switch (node->join.jointype)
	{
		case JOIN_INNER:
537
		case JOIN_IN:
538 539
			break;
		case JOIN_LEFT:
540 541
		case JOIN_LASJ:
		case JOIN_LASJ_NOTIN:
542 543
			hjstate->hj_NullInnerTupleSlot =
				ExecInitNullTupleSlot(estate,
B
Bruce Momjian 已提交
544
								 ExecGetResultType(innerPlanState(hjstate)));
545 546
			break;
		default:
547
			elog(ERROR, "unrecognized join type: %d",
548 549 550
				 (int) node->join.jointype);
	}

551
	/*
B
Bruce Momjian 已提交
552 553 554 555 556
	 * now for some voodoo.  our temporary tuple slot is actually the result
	 * tuple slot of the Hash node (which is our inner plan).  we do this
	 * because Hash nodes don't return tuples via ExecProcNode() -- instead
	 * the hash join node uses ExecScanHashBucket() to get at the contents of
	 * the hash table.	-cim 6/9/91
557 558
	 */
	{
559 560
		HashState  *hashstate = (HashState *) innerPlanState(hjstate);
		TupleTableSlot *slot = hashstate->ps.ps_ResultTupleSlot;
561 562 563 564

		hjstate->hj_HashTupleSlot = slot;
	}

565 566
	/*
	 * initialize tuple type and projection info
567
	 */
568
	ExecAssignResultTypeFromTL(&hjstate->js.ps);
569
	ExecAssignProjectionInfo(&hjstate->js.ps, NULL);
570

571
	ExecSetSlotDescriptor(hjstate->hj_OuterTupleSlot,
572
						  ExecGetResultType(outerPlanState(hjstate)));
573

574 575
	/*
	 * initialize hash-specific info
576
	 */
577
	hjstate->hj_HashTable = NULL;
578
	hjstate->hj_FirstOuterTupleSlot = NULL;
579

580
	hjstate->hj_CurHashValue = 0;
581
	hjstate->hj_CurBucketNo = 0;
582
	hjstate->hj_CurTuple = NULL;
583 584

	/*
B
Bruce Momjian 已提交
585 586 587 588
	 * Deconstruct the hash clauses into outer and inner argument values, so
	 * that we can evaluate those subexpressions separately.  Also make a list
	 * of the hash operator OIDs, in preparation for looking up the hash
	 * functions to use.
589
	 */
590 591
	lclauses = NIL;
	rclauses = NIL;
592
	hoperators = NIL;
593
	foreach(l, hjstate->hashclauses)
594
	{
595
		FuncExprState *fstate = (FuncExprState *) lfirst(l);
596
		OpExpr	   *hclause;
597

598 599
		Assert(IsA(fstate, FuncExprState));
		hclause = (OpExpr *) fstate->xprstate.expr;
600
		Assert(IsA(hclause, OpExpr));
601
		lclauses = lappend(lclauses, linitial(fstate->args));
602
		rclauses = lappend(rclauses, lsecond(fstate->args));
603
		hoperators = lappend_oid(hoperators, hclause->opno);
604
	}
605 606
	hjstate->hj_OuterHashKeys = lclauses;
	hjstate->hj_InnerHashKeys = rclauses;
607
	hjstate->hj_HashOperators = hoperators;
608 609
	/* child Hash node needs to evaluate inner hash keys, too */
	((HashState *) innerPlanState(hjstate))->hashkeys = rclauses;
610

611
	hjstate->js.ps.ps_OuterTupleSlot = NULL;
612 613
	hjstate->hj_NeedNewOuter = true;
	hjstate->hj_MatchedOuter = false;
614
	hjstate->hj_OuterNotEmpty = false;
615

616 617
	initGpmonPktForHashJoin((Plan *)node, &hjstate->js.ps.gpmon_pkt, estate);
	
618
	return hjstate;
619 620 621
}

int
622
ExecCountSlotsHashJoin(HashJoin *node)
623
{
624
	return ExecCountSlotsNode(outerPlan(node)) +
625 626
		ExecCountSlotsNode(innerPlan(node)) +
		HASHJOIN_NSLOTS;
627 628 629
}

/* ----------------------------------------------------------------
630
 *		ExecEndHashJoin
631
 *
632
 *		clean up routine for HashJoin node
633 634 635
 * ----------------------------------------------------------------
 */
void
636
ExecEndHashJoin(HashJoinState *node)
637
{
638
	/*
639
	 * Free hash table
640
	 */
641
	if (node->hj_HashTable)
642
	{
643 644 645 646 647 648
		if (!node->hj_HashTable->eagerlyReleased)
		{
			HashState *hashState = (HashState *) innerPlanState(node);
			ExecHashTableDestroy(hashState, node->hj_HashTable);
		}
		pfree(node->hj_HashTable);
649
		node->hj_HashTable = NULL;
650 651
	}

652
	/*
653
	 * Free the exprcontext
654
	 */
655
	ExecFreeExprContext(&node->js.ps);
656

657 658
	/*
	 * clean out the tuple table
659
	 */
660 661 662
	ExecClearTuple(node->js.ps.ps_ResultTupleSlot);
	ExecClearTuple(node->hj_OuterTupleSlot);
	ExecClearTuple(node->hj_HashTupleSlot);
663

664 665 666 667 668
	/*
	 * clean up subtrees
	 */
	ExecEndNode(outerPlanState(node));
	ExecEndNode(innerPlanState(node));
669 670

	EndPlanStateGpmonPkt(&node->js.ps);
671 672
}

673 674
/*
 * ExecHashJoinOuterGetTuple
675
 *
676
 *		get the next outer tuple for hashjoin: either by
677 678 679 680 681 682
 *		executing a plan node in the first pass, or from
 *		the temp files for the hashjoin batches.
 *
 * Returns a null slot if no more outer tuples.  On success, the tuple's
 * hash value is stored at *hashvalue --- this is either originally computed,
 * or re-read from the temp file.
683 684
 */
static TupleTableSlot *
685
ExecHashJoinOuterGetTuple(PlanState *outerNode,
686 687
						  HashJoinState *hjstate,
						  uint32 *hashvalue)
688
{
B
Bruce Momjian 已提交
689 690
	HashJoinTable hashtable = hjstate->hj_HashTable;
	int			curbatch = hashtable->curbatch;
691
	TupleTableSlot *slot;
692 693
	ExprContext    *econtext;
	HashState *hashState = (HashState *) innerPlanState(hjstate);
B
Bruce Momjian 已提交
694

695 696
	/* Read tuples from outer relation only if it's the first batch */
	if (curbatch == 0)
697 698
	{
		for (;;)
699 700
		{
			/*
701 702
			 * Check to see if first outer tuple was already fetched by
			 * ExecHashJoin() and not used yet.
703
			 */
704 705 706 707 708 709 710 711 712 713
			slot = hjstate->hj_FirstOuterTupleSlot;
			if (!TupIsNull(slot))
				hjstate->hj_FirstOuterTupleSlot = NULL;
			else
			{
                slot = ExecProcNode(outerNode);
			}

			if (TupIsNull(slot))
				break;
714

715 716 717 718
			/*
			 * We have to compute the tuple's hash value.
			 */
			econtext = hjstate->js.ps.ps_ExprContext;
719 720
			econtext->ecxt_outertuple = slot;

721 722 723 724 725 726
			bool hashkeys_null = false;
			bool keep_nulls = (hjstate->js.jointype == JOIN_LEFT) ||
					(hjstate->js.jointype == JOIN_LASJ) ||
					(hjstate->js.jointype == JOIN_LASJ_NOTIN) ||
					hjstate->hj_nonequijoin;
			if (ExecHashGetHashValue(hashState, hashtable, econtext,
727
									 hjstate->hj_OuterHashKeys,
B
Bruce Momjian 已提交
728
									 true,		/* outer tuple */
729 730 731
									 keep_nulls,
									 hashvalue,
									 &hashkeys_null))
732 733 734
			{
				/* remember outer relation is not empty for possible rescan */
				hjstate->hj_OuterNotEmpty = true;
735

736 737 738
				return slot;
			}
			/*
B
Bruce Momjian 已提交
739 740
			 * That tuple couldn't match because of a NULL, so discard it and
			 * continue with the next one.
741
			 */
742
		}
743

744
		/*
B
Bruce Momjian 已提交
745 746
		 * We have just reached the end of the first pass. Try to switch to a
		 * saved batch.
747
		 */
748 749

		/* SFR: This can cause re-spill! */
750
		curbatch = ExecHashJoinNewBatch(hjstate);
751 752 753 754 755 756 757 758 759


#ifdef HJDEBUG
		elog(gp_workfile_caching_loglevel, "HashJoin built table with %.1f tuples for batch %d", hashtable->totalTuples, curbatch);
#endif

		Gpmon_M_Incr_Rows_Out(GpmonPktFromHashJoinState(hjstate)); 
		CheckSendPlanStateGpmonPkt(&hjstate->js.ps);
	} /* if (curbatch == 0) */
760 761

	/*
B
Bruce Momjian 已提交
762 763 764
	 * Try to read from a temp file. Loop allows us to advance to new batches
	 * as needed.  NOTE: nbatch could increase inside ExecHashJoinNewBatch, so
	 * don't try to optimize this loop.
765
	 */
766
	while (curbatch < hashtable->nbatch)
767
	{
768 769 770 771 772 773 774 775 776 777 778 779
		/* 
		 * For batches > 0, we can be reading many many outer tuples from disk
		 * and probing them against the hashtable. If we don't find any matches, 
		 * we'll keep coming back here to read tuples from disk and 
		 * returning them (MPP-23213). Break this long tight loop here. 
		 */
		CHECK_FOR_INTERRUPTS();

		if (QueryFinishPending)
			return NULL;

		slot = ExecHashJoinGetSavedTuple(&hashtable->batches[curbatch]->outerside,
780
										 hashvalue,
781
										 hjstate->hj_OuterTupleSlot);
B
Bruce Momjian 已提交
782
		if (!TupIsNull(slot))
783 784
			return slot;
		curbatch = ExecHashJoinNewBatch(hjstate);
785 786 787 788 789 790 791 792 793

#ifdef HJDEBUG
		elog(gp_workfile_caching_loglevel, "HashJoin built table with %.1f tuples for batch %d", hashtable->totalTuples, curbatch);
#endif

		Gpmon_M_Incr(GpmonPktFromHashJoinState(hjstate), GPMON_HASHJOIN_SPILLBATCH);
		CheckSendPlanStateGpmonPkt(&hjstate->js.ps);
	}

794 795
	/* Out of batches... */
	return NULL;
796 797
}

798 799
/*
 * ExecHashJoinNewBatch
800
 *		switch to a new hashjoin batch
801 802 803
 *
 * Returns the number of the new batch (1..nbatch-1), or nbatch if no more.
 * We will never return a batch number that has an empty outer batch file.
804
 */
805
static int
806
ExecHashJoinNewBatch(HashJoinState *hjstate)
807
{
808
	HashJoinTable hashtable = hjstate->hj_HashTable;
809
    HashJoinBatchData  *batch;
810 811
	int			nbatch;
	int			curbatch;
812
	TupleTableSlot *slot;
813
	uint32		hashvalue;
814

815 816 817 818 819 820 821 822 823 824 825
#ifdef FAULT_INJECTOR
	FaultInjector_InjectFaultIfSet(
			FaultExecHashJoinNewBatch,
			DDLNotSpecified,
			"",  // databaseName
			""); // tableName
#endif


	HashState *hashState = (HashState *) innerPlanState(hjstate);

826 827 828 829
start_over:
	nbatch = hashtable->nbatch;
	curbatch = hashtable->curbatch;

830 831 832 833 834 835
    if (curbatch >= nbatch)
        return nbatch;

    if (curbatch >= 0 && hashtable->stats)
        ExecHashTableExplainBatchEnd(hashState, hashtable);

836
	if (curbatch > 0)
837 838
	{
		/*
B
Bruce Momjian 已提交
839 840
		 * We no longer need the previous outer batch file; close it right
		 * away to free disk space.
841
		 */
842 843
		batch = hashtable->batches[curbatch];
		if (batch->outerside.workfile != NULL)
844
		{
845
			workfile_mgr_close_file(hashtable->work_set, batch->outerside.workfile);
846
		}
847
		batch->outerside.workfile = NULL;
848 849
	}

850
	/*
851 852 853 854
	 * We can always skip over any batches that are completely empty on both
	 * sides.  We can sometimes skip over batches that are empty on only one
	 * side, but there are exceptions:
	 *
B
Bruce Momjian 已提交
855 856
	 * 1. In a LEFT JOIN, we have to process outer batches even if the inner
	 * batch is empty.
857
	 *
858 859
	 * 2. If we have increased nbatch since the initial estimate, we have to
	 * scan inner batches since they might contain tuples that need to be
B
Bruce Momjian 已提交
860
	 * reassigned to later inner batches.
861
	 *
862 863 864
	 * 3. Similarly, if we have increased nbatch since starting the outer
	 * scan, we have to rescan outer batches in case they contain tuples that
	 * need to be reassigned.
865
	 */
866 867
	curbatch++;
	while (curbatch < nbatch &&
868 869 870
		   (hashtable->batches[curbatch]->outerside.workfile == NULL ||
			hashtable->batches[curbatch]->innerside.workfile == NULL))

871
	{
872 873 874 875 876
        batch = hashtable->batches[curbatch];
		if (batch->outerside.workfile != NULL &&
			((hjstate->js.jointype == JOIN_LEFT) || 
			 (hjstate->js.jointype == JOIN_LASJ) ||
			 (hjstate->js.jointype == JOIN_LASJ_NOTIN)))
877
			break;				/* must process due to rule 1 */
878
		if (batch->innerside.workfile != NULL &&
879 880
			nbatch != hashtable->nbatch_original)
			break;				/* must process due to rule 2 */
881
		if (batch->outerside.workfile != NULL &&
882 883 884 885
			nbatch != hashtable->nbatch_outstart)
			break;				/* must process due to rule 3 */
		/* We can ignore this batch. */
		/* Release associated temp files right away. */
886 887 888 889 890 891 892 893 894 895 896 897
		if (batch->innerside.workfile != NULL)
		{
			workfile_mgr_close_file(hashtable->work_set, batch->innerside.workfile);
		}
		batch->innerside.workfile = NULL;
		
		if (batch->outerside.workfile != NULL)
		{
			workfile_mgr_close_file(hashtable->work_set, batch->outerside.workfile);
		}
		batch->outerside.workfile = NULL;

898
		curbatch++;
899
	}
900

901 902 903 904
    hashtable->curbatch = curbatch;     /* CDB: upd before return, even if no
                                         * more data, so stats logic can see
                                         * whether join was run to completion
                                         */
905

906 907
    if (curbatch >= nbatch)
	    return curbatch;		/* no more batches */
908

909
    batch = hashtable->batches[curbatch];
910

911 912 913 914
    /*
     * Reload the hash table with the new inner batch (which could be empty)
     */
    ExecHashTableReset(hashState, hashtable);
915

916
	if (batch->innerside.workfile != NULL)
917
	{
918 919 920
		/* Rewind batch file */
		bool result = ExecWorkFile_Rewind(batch->innerside.workfile);
		if (!result)
921
		{
922 923
			ereport(ERROR, (errcode_for_file_access(),
					errmsg("could not access temporary file")));
924 925
		}

926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951
	    for (;;)
	    {
		    CHECK_FOR_INTERRUPTS();

			if (QueryFinishPending)
				return nbatch;

		    slot = ExecHashJoinGetSavedTuple(&batch->innerside,
				    &hashvalue,
				    hjstate->hj_HashTupleSlot);
		    if (!slot)
			    break;

		    /*
		     * NOTE: some tuples may be sent to future batches.  Also, it is
		     * possible for hashtable->nbatch to be increased here!
		     */
		    ExecHashTableInsert(hashState, hashtable, slot, hashvalue);
		    hashtable->totalTuples += 1;
	    }

	    /*
	     * after we build the hash table, the inner batch file is no longer
	     * needed.
		 *
	     */
952 953 954 955 956
	    if (hjstate->js.ps.instrument)
	    {
	    	Assert(hashtable->stats);
	    	hashtable->stats->batchstats[curbatch].innerfilesize =
	    			ExecWorkFile_Tell64(hashtable->batches[curbatch]->innerside.workfile);
957
		}
958 959 960
	    workfile_mgr_close_file(hashtable->work_set, batch->innerside.workfile);
	    batch->innerside.workfile = NULL;
	}
961 962 963 964 965 966 967 968 969 970

    /*
     * If there's no outer batch file, advance to next batch.
     */
	if (batch->outerside.workfile == NULL)
	    goto start_over;

    /*
     * Rewind outer batch file, so that we can start reading it.
     */
971 972
	bool result = ExecWorkFile_Rewind(batch->outerside.workfile);
	if (!result)
973
	{
974 975
		ereport(ERROR, (errcode_for_file_access(),
				errmsg("could not access temporary file")));
976
	}
977 978
	
    return curbatch;
979 980
}

981 982 983
/*
 * ExecHashJoinSaveTuple
 *		save a tuple to a batch file.
984
 *
985
 * The data recorded in the file for each tuple is its hash value,
986
 * then the tuple in MinimalTuple format.
987
 *
988 989 990
 * Note: it is important always to call this in the regular executor
 * context, not in a shorter-lived context; else the temp file buffers
 * will get messed up.
991
 */
992
void
993 994 995
ExecHashJoinSaveTuple(PlanState *ps, MemTuple tuple, uint32 hashvalue,
					  HashJoinTable hashtable, struct HashJoinBatchSide *batchside,
					  MemoryContext bfCxt)
996
{
997 998 999 1000 1001 1002 1003 1004 1005 1006
	if (hashtable->work_set == NULL)
	{
		hashtable->hjstate->workfiles_created = true;
		if (hashtable->hjstate->js.ps.instrument)
		{
			hashtable->hjstate->js.ps.instrument->workfileCreated = true;
		}

		MemoryContext   oldcxt;
		oldcxt = MemoryContextSwitchTo(bfCxt);
1007

1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020
		hashtable->work_set = workfile_mgr_create_set(gp_workfile_type_hashjoin,
				true, /* can_be_reused */
				&hashtable->hjstate->js.ps,
				NULL_SNAPSHOT);

		/* First time spilling. Before creating any spill files, create a metadata file */
		hashtable->state_file = workfile_mgr_create_fileno(hashtable->work_set, WORKFILE_NUM_HASHJOIN_METADATA);
		elog(gp_workfile_caching_loglevel, "created state file %s", ExecWorkFile_GetFileName(hashtable->state_file));

		MemoryContextSwitchTo(oldcxt);
	}

	if (batchside->workfile == NULL)
1021
	{
1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033
		MemoryContext   oldcxt;
		oldcxt = MemoryContextSwitchTo(bfCxt);

		/* First write to this batch file, so create it */
		Assert(hashtable->work_set != NULL);
		batchside->workfile = workfile_mgr_create_file(hashtable->work_set);
		
		elog(gp_workfile_caching_loglevel, "create batch file %s with gp_workfile_compress_algorithm=%d",
			 ExecWorkFile_GetFileName(batchside->workfile),
			 hashtable->work_set->metadata.bfz_compress_type);

		MemoryContextSwitchTo(oldcxt);
1034 1035
	}

1036 1037 1038 1039
	if (!ExecWorkFile_Write(batchside->workfile, (void *)&hashvalue, sizeof(uint32)))
	{
		workfile_mgr_report_error();
	}
1040

1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053
	if (!ExecWorkFile_Write(batchside->workfile, (void *) tuple, memtuple_get_size(tuple, NULL)))
	{
		workfile_mgr_report_error();
	}

	batchside->total_tuples++;

	if(ps)
	{
		Gpmon_M_Incr(&ps->gpmon_pkt, GPMON_HASHJOIN_SPILLTUPLE);
		Gpmon_M_Add(&ps->gpmon_pkt, GPMON_HASHJOIN_SPILLBYTE, memtuple_get_size(tuple, NULL));
		CheckSendPlanStateGpmonPkt(ps);
	}
1054
}
V
Vadim B. Mikheev 已提交
1055

1056 1057
/*
 * ExecHashJoinGetSavedTuple
B
Bruce Momjian 已提交
1058
 *		read the next tuple from a batch file.	Return NULL if no more.
1059 1060 1061 1062 1063
 *
 * On success, *hashvalue is set to the tuple's hash value, and the tuple
 * itself is stored in the given slot.
 */
static TupleTableSlot *
1064
ExecHashJoinGetSavedTuple(HashJoinBatchSide *batchside,
1065 1066 1067
						  uint32 *hashvalue,
						  TupleTableSlot *tupleSlot)
{
1068
	uint32		header[2];
1069
	size_t		nread;
1070
	MemTuple tuple;
1071

1072
	/*
B
Bruce Momjian 已提交
1073 1074 1075
	 * Since both the hash value and the MinimalTuple length word are uint32,
	 * we can read them both in one BufFileRead() call without any type
	 * cheating.
1076
	 */
1077 1078
	nread = ExecWorkFile_Read(batchside->workfile, (void *) header, sizeof(header));
	if (nread != sizeof(header))				/* end of file */
1079 1080 1081 1082
	{
		ExecClearTuple(tupleSlot);
		return NULL;
	}
1083

1084
	*hashvalue = header[0];
1085 1086 1087 1088 1089 1090 1091 1092
	tuple = (MemTuple) palloc(memtuple_size_from_uint32(header[1]));
	memtuple_set_mtlen(tuple, NULL, header[1]);

	nread = ExecWorkFile_Read(batchside->workfile,
							  (void *) ((char *) tuple + sizeof(uint32)),
							  memtuple_size_from_uint32(header[1]) - sizeof(uint32));
	
	if (nread != memtuple_size_from_uint32(header[1]) - sizeof(uint32))
1093 1094
		ereport(ERROR,
				(errcode_for_file_access(),
1095
				 errmsg("could not read from temporary file")));
1096
	return ExecStoreMinimalTuple(tuple, tupleSlot, true);
1097 1098
}

1099

V
Vadim B. Mikheev 已提交
1100
void
1101
ExecReScanHashJoin(HashJoinState *node, ExprContext *exprCtxt)
1102
{
1103
	/*
B
Bruce Momjian 已提交
1104 1105 1106 1107 1108
	 * In a multi-batch join, we currently have to do rescans the hard way,
	 * primarily because batch temp files may have already been released. But
	 * if it's a single-batch join, and there is no parameter change for the
	 * inner subnode, then we can just re-use the existing hash table without
	 * rebuilding it.
V
Vadim B. Mikheev 已提交
1109
	 */
1110
	if (node->hj_HashTable != NULL)
V
Vadim B. Mikheev 已提交
1111
	{
1112
		if (node->hj_HashTable->nbatch == 1 &&
1113 1114
			((PlanState *) node)->righttree->chgParam == NULL
			&& !node->hj_HashTable->eagerlyReleased)
1115
		{
1116 1117 1118
			/*
			 * okay to reuse the hash table; needn't rescan inner, either.
			 *
B
Bruce Momjian 已提交
1119 1120
			 * What we do need to do is reset our state about the emptiness of
			 * the outer relation, so that the new scan of the outer will
1121 1122 1123 1124 1125 1126 1127 1128
			 * update it correctly if it turns out to be empty this time.
			 * (There's no harm in clearing it now because ExecHashJoin won't
			 * need the info.  In the other cases, where the hash table
			 * doesn't exist or we are destroying it, we leave this state
			 * alone because ExecHashJoin will need it the first time
			 * through.)
			 */
			node->hj_OuterNotEmpty = false;
1129 1130 1131

			/* MPP-1600: reset the batch number */
			node->hj_HashTable->curbatch = 0;
1132 1133 1134 1135
		}
		else
		{
			/* must destroy and rebuild hash table */
1136 1137 1138 1139 1140 1141
			if (!node->hj_HashTable->eagerlyReleased)
			{
				HashState *hashState = (HashState *) innerPlanState(node);
				ExecHashTableDestroy(hashState, node->hj_HashTable);
			}
			pfree(node->hj_HashTable);
1142
			node->hj_HashTable = NULL;
B
Bruce Momjian 已提交
1143

1144 1145 1146 1147 1148 1149 1150
			/*
			 * if chgParam of subnode is not null then plan will be re-scanned
			 * by first ExecProcNode.
			 */
			if (((PlanState *) node)->righttree->chgParam == NULL)
				ExecReScan(((PlanState *) node)->righttree, exprCtxt);
		}
V
Vadim B. Mikheev 已提交
1151
	}
1152

1153
	/* Always reset intra-tuple state */
1154
	node->hj_CurHashValue = 0;
1155
	node->hj_CurBucketNo = 0;
1156
	node->hj_CurTuple = NULL;
V
Vadim B. Mikheev 已提交
1157

1158
	node->js.ps.ps_OuterTupleSlot = NULL;
1159 1160
	node->hj_NeedNewOuter = true;
	node->hj_MatchedOuter = false;
1161
	node->hj_FirstOuterTupleSlot = NULL;
1162 1163

	/*
B
Bruce Momjian 已提交
1164 1165
	 * if chgParam of subnode is not null then plan will be re-scanned by
	 * first ExecProcNode.
V
Vadim B. Mikheev 已提交
1166
	 */
1167 1168
	if (((PlanState *) node)->lefttree->chgParam == NULL)
		ExecReScan(((PlanState *) node)->lefttree, exprCtxt);
V
Vadim B. Mikheev 已提交
1169
}
1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370

/**
 * This method releases the hash table's memory. It maintains some of the other
 * aspects of the hash table like memory usage statistics. These may be required
 * during an explain analyze. A hash table that has been released cannot perform
 * any useful function anymore. 
 */
static void ReleaseHashTable(HashJoinState *node)
{
	Assert(gp_eager_hashtable_release);
	
	if (node->hj_HashTable)
	{
		HashState *hashState = (HashState *) innerPlanState(node);

		/* This hashtable should not have been released already! */
		Assert(!node->hj_HashTable->eagerlyReleased);
	    if (node->hj_HashTable->stats)
	    {
	    	/* Report on batch in progress. */
	    	ExecHashTableExplainBatchEnd(hashState, node->hj_HashTable);
	    }
		ExecHashTableDestroy(hashState, node->hj_HashTable);
		node->hj_HashTable->eagerlyReleased = true;
	}
	
	/* Always reset intra-tuple state */
	node->hj_CurHashValue = 0;
	node->hj_CurBucketNo = 0;
	node->hj_CurTuple = NULL;

	node->js.ps.ps_OuterTupleSlot = NULL;
	node->hj_NeedNewOuter = true;
	node->hj_MatchedOuter = false;
	node->hj_FirstOuterTupleSlot = NULL;	

}

/* Is this an IS-NOT-DISTINCT-join qual list (as opposed the an equijoin)?
 *
 * XXX We perform an abbreviated test based on the assumptions that 
 *     these are the only possibilities and that all conjuncts are 
 *     alike in this regard.
 */
bool isNotDistinctJoin(List *qualList)
{
	ListCell *lc;
	
	foreach (lc, qualList)
	{
		BoolExpr *bex = (BoolExpr*)lfirst(lc);
		DistinctExpr *dex;
		
		if ( IsA(bex, BoolExpr) && bex->boolop == NOT_EXPR )
		{
			dex = (DistinctExpr*)linitial(bex->args);
			
			if ( IsA(dex, DistinctExpr) )
				return true; /* We assume the rest follow suit! */
		}
	}
	return false;
}

void
initGpmonPktForHashJoin(Plan *planNode, gpmon_packet_t *gpmon_pkt, EState *estate)
{
	Assert(planNode != NULL && gpmon_pkt != NULL && IsA(planNode, HashJoin));
	
	{
		PerfmonNodeType type = PMNT_Invalid;

		switch(((HashJoin *)planNode)->join.jointype)
		{
			case JOIN_INNER:
				type = PMNT_HashJoin;
				break;
			case JOIN_LEFT:
				type = PMNT_HashLeftJoin;
				break;
			case JOIN_LASJ:
			case JOIN_LASJ_NOTIN:
				type = PMNT_HashLeftAntiSemiJoin;
				break;
			case JOIN_FULL:
				type = PMNT_HashFullJoin;
				break;
			case JOIN_RIGHT:
				type = PMNT_HashRightJoin;
				break;
			case JOIN_IN:
				type = PMNT_HashExistsJoin;
				break;
			case JOIN_REVERSE_IN:
				type = PMNT_HashReverseInJoin;
				break;
			case JOIN_UNIQUE_OUTER:
				type = PMNT_HashUniqueOuterJoin;
				break;
			case JOIN_UNIQUE_INNER:
				type = PMNT_HashUniqueInnerJoin;
				break;
		}

		Assert(type != PMNT_Invalid);
		Assert(GPMON_HASHJOIN_TOTAL <= (int)GPMON_QEXEC_M_COUNT);
		InitPlanNodeGpmonPkt(planNode, gpmon_pkt, estate, type, 
							 (int64)planNode->plan_rows,
							 NULL);
	}
}

void
ExecEagerFreeHashJoin(HashJoinState *node)
{
	if (node->hj_HashTable != NULL && !node->hj_HashTable->eagerlyReleased)
	{
		ReleaseHashTable(node);
	}
}

void
ExecHashJoinSaveFirstInnerBatch(HashJoinTable hashtable)
{

	Assert(hashtable != NULL);

	if (hashtable->nbatch == 1)
	{
		/* Nothing to do, we're not spilling */
		return;
	}

	HashJoinBatchSide *batchside = &hashtable->batches[0]->innerside;

	int i;
	for (i = 0; i < hashtable->nbuckets; i++)
	{
		HashJoinTuple tuple;
		tuple = hashtable->buckets[i];

		while (tuple != NULL)
		{

#ifdef USE_ASSERT_CHECKING
			int			bucketno;
			int			batchno;

			ExecHashGetBucketAndBatch(hashtable, tuple->hashvalue,
					&bucketno, &batchno);
			Assert(bucketno == i);
			Assert(batchno == 0);
#endif

			ExecHashJoinSaveTuple(&hashtable->hjstate->js.ps, HJTUPLE_MINTUPLE(tuple),
								  tuple->hashvalue,
								  hashtable,
                                  batchside,
								  hashtable->bfCxt);


			tuple = tuple->next;
		}
	}
}

/*
 * isHashtableEmpty
 *
 *  After populating the hashtable with all the tuples from the innerside,
 *  scan all the batches and return true if the hashtable is completely empty
 *
 */
static bool
isHashtableEmpty(HashJoinTable hashtable)
{
    int         i;
    bool isEmpty = true;

    /* Is there a nonempty batch? */
    for (i = 0; i < hashtable->nbatch; i++)
    {
        /*
         * For batch 0, the number of inner tuples is stored in batches[i].innertuples.
         * For batches on disk (1 and above), the batches[i].innertuples is 0,
         * but batches[i].innerside.workfile is non-NULL if any tuples were written to disk.
         * Check both here.
         */
        if ((hashtable->batches[i]->innertuples > 0) ||
                (NULL != hashtable->batches[i]->innerside.workfile))
        {
            /* Found a non-empty batch, stop the search */
        	isEmpty = false;
            break;
        }
    }

    return isEmpty;
}

/* EOF */