execMain.c 44.9 KB
Newer Older
1 2 3
/*-------------------------------------------------------------------------
 *
 * execMain.c--
4
 *	  top level executor interface routines
5 6
 *
 * INTERFACE ROUTINES
7 8 9
 *	ExecutorStart()
 *	ExecutorRun()
 *	ExecutorEnd()
10
 *
11 12 13 14 15 16 17 18 19 20 21 22 23
 *	The old ExecutorMain() has been replaced by ExecutorStart(),
 *	ExecutorRun() and ExecutorEnd()
 *
 *	These three procedures are the external interfaces to the executor.
 *	In each case, the query descriptor and the execution state is required
 *	 as arguments
 *
 *	ExecutorStart() must be called at the beginning of any execution of any
 *	query plan and ExecutorEnd() should always be called at the end of
 *	execution of a plan.
 *
 *	ExecutorRun accepts 'feature' and 'count' arguments that specify whether
 *	the plan is to be executed forwards, backwards, and for how many tuples.
24 25 26 27 28
 *
 * Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
29
 *	  $Header: /cvsroot/pgsql/src/backend/executor/execMain.c,v 1.69 1999/01/29 13:24:36 vadim Exp $
30 31 32
 *
 *-------------------------------------------------------------------------
 */
33
#include <string.h>
34
#include "postgres.h"
35
#include "miscadmin.h"
36

37
#include "executor/executor.h"
38 39
#include "executor/execdefs.h"
#include "executor/execdebug.h"
40
#include "executor/nodeIndexscan.h"
41 42 43
#include "utils/builtins.h"
#include "utils/palloc.h"
#include "utils/acl.h"
44
#include "utils/syscache.h"
45
#include "utils/tqual.h"
46
#include "parser/parsetree.h"	/* rt_fetch() */
47
#include "storage/bufmgr.h"
M
Marc G. Fournier 已提交
48
#include "storage/lmgr.h"
49
#include "storage/smgr.h"
50 51 52
#include "commands/async.h"
/* #include "access/localam.h" */
#include "optimizer/var.h"
53
#include "access/heapam.h"
V
Vadim B. Mikheev 已提交
54
#include "access/xact.h"
55
#include "catalog/heap.h"
56
#include "commands/trigger.h"
57

58
void ExecCheckPerms(CmdType operation, int resultRelation, List *rangeTable,
59
			   Query *parseTree);
60 61 62


/* decls for local routines only used within this module */
63
static TupleDesc InitPlan(CmdType operation, Query *parseTree,
64 65
		 Plan *plan, EState *estate);
static void EndPlan(Plan *plan, EState *estate);
66
static TupleTableSlot *ExecutePlan(EState *estate, Plan *plan,
67
			CmdType operation, int numberTuples, ScanDirection direction,
V
Vadim B. Mikheev 已提交
68
			DestReceiver *destfunc);
69
static void ExecRetrieve(TupleTableSlot *slot,
70 71
						 DestReceiver *destfunc,
						 EState *estate);
72
static void ExecAppend(TupleTableSlot *slot, ItemPointer tupleid,
73
		   EState *estate);
74
static void ExecDelete(TupleTableSlot *slot, ItemPointer tupleid,
75
		   EState *estate);
76
static void ExecReplace(TupleTableSlot *slot, ItemPointer tupleid,
77 78 79 80 81
			EState *estate);

TupleTableSlot *EvalPlanQual(EState *estate, Index rti, ItemPointer tid);
static TupleTableSlot *EvalPlanQualNext(EState *estate);

82 83 84

/* end of local decls */

M
Marc G. Fournier 已提交
85
#ifdef QUERY_LIMIT
86
static int	queryLimit = ALL_TUPLES;
87

M
Marc G. Fournier 已提交
88 89 90 91 92 93
#undef ALL_TUPLES
#define ALL_TUPLES queryLimit

int
ExecutorLimit(int limit)
{
94
	return queryLimit = limit;
M
Marc G. Fournier 已提交
95
}
96

B
Bruce Momjian 已提交
97 98 99 100 101 102
int
ExecutorGetLimit()
{
	return queryLimit;
}

103
#endif
M
Marc G. Fournier 已提交
104

105
/* ----------------------------------------------------------------
106 107 108 109 110 111 112
 *		ExecutorStart
 *
 *		This routine must be called at the beginning of any execution of any
 *		query plan
 *
 *		returns (AttrInfo*) which describes the attributes of the tuples to
 *		be returned by the query.
113 114 115 116
 *
 * ----------------------------------------------------------------
 */
TupleDesc
117
ExecutorStart(QueryDesc *queryDesc, EState *estate)
118
{
119
	TupleDesc	result;
120 121 122

	/* sanity checks */
	Assert(queryDesc != NULL);
123

V
Vadim B. Mikheev 已提交
124 125
	if (queryDesc->plantree->nParamExec > 0)
	{
126 127 128
		estate->es_param_exec_vals = (ParamExecData *)
			palloc(queryDesc->plantree->nParamExec * sizeof(ParamExecData));
		memset(estate->es_param_exec_vals, 0, queryDesc->plantree->nParamExec * sizeof(ParamExecData));
V
Vadim B. Mikheev 已提交
129
	}
130

V
Vadim B. Mikheev 已提交
131
	estate->es_snapshot = QuerySnapshot;
132

133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
	result = InitPlan(queryDesc->operation,
					  queryDesc->parsetree,
					  queryDesc->plantree,
					  estate);

	/*
	 * reset buffer refcount.  the current refcounts are saved and will be
	 * restored when ExecutorEnd is called
	 *
	 * this makes sure that when ExecutorRun's are called recursively as for
	 * postquel functions, the buffers pinned by one ExecutorRun will not
	 * be unpinned by another ExecutorRun.
	 */
	BufferRefCountReset(estate->es_refcount);

	return result;
149 150 151
}

/* ----------------------------------------------------------------
152 153 154 155 156 157 158
 *		ExecutorRun
 *
 *		This is the main routine of the executor module. It accepts
 *		the query descriptor from the traffic cop and executes the
 *		query plan.
 *
 *		ExecutorStart must have been called already.
159
 *
160 161 162 163 164 165
 *		the different features supported are:
 *			 EXEC_RUN:	retrieve all tuples in the forward direction
 *			 EXEC_FOR:	retrieve 'count' number of tuples in the forward dir
 *			 EXEC_BACK: retrieve 'count' number of tuples in the backward dir
 *			 EXEC_RETONE: return one tuple but don't 'retrieve' it
 *						   used in postquel function processing
166 167 168 169
 *
 *
 * ----------------------------------------------------------------
 */
170
TupleTableSlot *
171
ExecutorRun(QueryDesc *queryDesc, EState *estate, int feature, int count)
172
{
173 174
	CmdType			operation;
	Plan		   *plan;
175
	TupleTableSlot *result;
176 177
	CommandDest		dest;
	DestReceiver   *destfunc;
178

179
	/******************
180
	 *	sanity checks
181
	 ******************
182
	 */
183 184
	Assert(queryDesc != NULL);

185
	/******************
186 187
	 *	extract information from the query descriptor
	 *	and the query feature.
188
	 ******************
189
	 */
190 191 192
	operation = queryDesc->operation;
	plan = queryDesc->plantree;
	dest = queryDesc->dest;
193
	destfunc = DestToFunction(dest);
194 195 196
	estate->es_processed = 0;
	estate->es_lastoid = InvalidOid;

197 198 199 200 201 202 203 204 205
	/******************
	 *	FIXME: the dest setup function ought to be handed the tuple desc
	 *  for the tuples to be output, but I'm not quite sure how to get that
	 *  info at this point.  For now, passing NULL is OK because no existing
	 *  dest setup function actually uses the pointer.
	 ******************
	 */
	(*destfunc->setup) (destfunc, (TupleDesc) NULL);

206 207 208
	switch (feature)
	{

209 210 211 212 213 214
		case EXEC_RUN:
			result = ExecutePlan(estate,
								 plan,
								 operation,
								 ALL_TUPLES,
								 ForwardScanDirection,
215
								 destfunc);
216 217 218 219 220 221 222
			break;
		case EXEC_FOR:
			result = ExecutePlan(estate,
								 plan,
								 operation,
								 count,
								 ForwardScanDirection,
223
								 destfunc);
224
			break;
225

226
			/******************
227
			 *		retrieve next n "backward" tuples
228
			 ******************
229 230 231 232 233 234 235
			 */
		case EXEC_BACK:
			result = ExecutePlan(estate,
								 plan,
								 operation,
								 count,
								 BackwardScanDirection,
236
								 destfunc);
237
			break;
238

239
			/******************
240 241
			 *		return one tuple but don't "retrieve" it.
			 *		(this is used by the rule manager..) -cim 9/14/89
242
			 ******************
243 244 245 246 247 248 249
			 */
		case EXEC_RETONE:
			result = ExecutePlan(estate,
								 plan,
								 operation,
								 ONE_TUPLE,
								 ForwardScanDirection,
250
								 destfunc);
251 252 253 254 255
			break;
		default:
			result = NULL;
			elog(DEBUG, "ExecutorRun: Unknown feature %d", feature);
			break;
256 257
	}

258 259
	(*destfunc->cleanup) (destfunc);

260
	return result;
261 262 263
}

/* ----------------------------------------------------------------
264 265 266 267 268 269 270
 *		ExecutorEnd
 *
 *		This routine must be called at the end of any execution of any
 *		query plan
 *
 *		returns (AttrInfo*) which describes the attributes of the tuples to
 *		be returned by the query.
271 272 273 274
 *
 * ----------------------------------------------------------------
 */
void
275
ExecutorEnd(QueryDesc *queryDesc, EState *estate)
276
{
277 278
	/* sanity checks */
	Assert(queryDesc != NULL);
279

280
	EndPlan(queryDesc->plantree, estate);
281

282 283
	/* restore saved refcounts. */
	BufferRefCountRestore(estate->es_refcount);
284 285
}

286
void
287
ExecCheckPerms(CmdType operation,
288
			   int resultRelation,
289 290
			   List *rangeTable,
			   Query *parseTree)
291
{
292 293
	int			i = 1;
	Oid			relid;
294
	HeapTuple	htup;
295 296 297 298 299 300 301 302
	List	   *lp;
	List	   *qvars,
			   *tvars;
	int32		ok = 1,
				aclcheck_result = -1;
	char	   *opstr;
	NameData	rname;
	char	   *userName;
303 304 305 306 307 308 309

#define CHECK(MODE)		pg_aclcheck(rname.data, userName, MODE)

	userName = GetPgUserName();

	foreach(lp, rangeTable)
	{
310
		RangeTblEntry *rte = lfirst(lp);
311

M
Marc G. Fournier 已提交
312 313
		if (rte->skipAcl)
		{
314

M
Marc G. Fournier 已提交
315
			/*
316 317 318 319
			 * This happens if the access to this table is due to a view
			 * query rewriting - the rewrite handler checked the
			 * permissions against the view owner, so we just skip this
			 * entry.
M
Marc G. Fournier 已提交
320 321 322 323
			 */
			continue;
		}

324
		relid = rte->relid;
325
		htup = SearchSysCacheTuple(RELOID,
326 327
								   ObjectIdGetDatum(relid),
								   0, 0, 0);
328
		if (!HeapTupleIsValid(htup))
329
			elog(ERROR, "ExecCheckPerms: bogus RT relid: %d",
330
				 relid);
331
		StrNCpy(rname.data,
332
				((Form_pg_class) GETSTRUCT(htup))->relname.data,
333
				NAMEDATALEN);
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348
		if (i == resultRelation)
		{						/* this is the result relation */
			qvars = pull_varnos(parseTree->qual);
			tvars = pull_varnos((Node *) parseTree->targetList);
			if (intMember(resultRelation, qvars) ||
				intMember(resultRelation, tvars))
			{
				/* result relation is scanned */
				ok = ((aclcheck_result = CHECK(ACL_RD)) == ACLCHECK_OK);
				opstr = "read";
				if (!ok)
					break;
			}
			switch (operation)
			{
349 350 351 352 353 354 355 356 357 358 359
				case CMD_INSERT:
					ok = ((aclcheck_result = CHECK(ACL_AP)) == ACLCHECK_OK) ||
						((aclcheck_result = CHECK(ACL_WR)) == ACLCHECK_OK);
					opstr = "append";
					break;
				case CMD_DELETE:
				case CMD_UPDATE:
					ok = ((aclcheck_result = CHECK(ACL_WR)) == ACLCHECK_OK);
					opstr = "write";
					break;
				default:
360
					elog(ERROR, "ExecCheckPerms: bogus operation %d",
361
						 operation);
362 363 364 365 366 367 368
			}
		}
		else
		{
			ok = ((aclcheck_result = CHECK(ACL_RD)) == ACLCHECK_OK);
			opstr = "read";
		}
369
		if (!ok)
370 371
			break;
		++i;
372 373
	}
	if (!ok)
374
		elog(ERROR, "%s: %s", rname.data, aclcheck_error_strings[aclcheck_result]);
375

T
Tom Lane 已提交
376
	if (parseTree != NULL && parseTree->rowMark != NULL)
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400
	{
		foreach(lp, parseTree->rowMark)
		{
			RowMark	   *rm = lfirst(lp);

			if (!(rm->info & ROW_ACL_FOR_UPDATE))
				continue;

			relid = ((RangeTblEntry *)nth(rm->rti - 1, rangeTable))->relid;
			htup = SearchSysCacheTuple(RELOID,
								   ObjectIdGetDatum(relid),
								   0, 0, 0);
			if (!HeapTupleIsValid(htup))
				elog(ERROR, "ExecCheckPerms: bogus RT relid: %d",
					 relid);
			StrNCpy(rname.data,
					((Form_pg_class) GETSTRUCT(htup))->relname.data,
					NAMEDATALEN);
			ok = ((aclcheck_result = CHECK(ACL_WR)) == ACLCHECK_OK);
			opstr = "write";
			if (!ok)
				elog(ERROR, "%s: %s", rname.data, aclcheck_error_strings[aclcheck_result]);
		}
	}
401 402
}

403 404 405 406 407 408 409
/* ===============================================================
 * ===============================================================
						 static routines follow
 * ===============================================================
 * ===============================================================
 */

410 411 412
typedef struct execRowMark
{
	Relation	relation;
413
	Index		rti;
414 415
	char		resname[32];
} execRowMark;
416

417 418 419 420 421 422 423 424
typedef struct evalPlanQual
{
	Plan				   *plan;
	Index					rti;
	EState					estate;
	struct evalPlanQual	   *free;
} evalPlanQual;

425
/* ----------------------------------------------------------------
426 427 428 429
 *		InitPlan
 *
 *		Initializes the query plan: open files, allocate storage
 *		and start up the rule manager
430 431
 * ----------------------------------------------------------------
 */
432
static TupleDesc
433
InitPlan(CmdType operation, Query *parseTree, Plan *plan, EState *estate)
434
{
435 436 437 438 439 440
	List		   *rangeTable;
	int				resultRelation;
	Relation		intoRelationDesc;
	TupleDesc		tupType;
	List		   *targetList;
	int				len;
441

442
	/******************
443
	 *	get information from query descriptor
444
	 ******************
445
	 */
446 447
	rangeTable = parseTree->rtable;
	resultRelation = parseTree->resultRelation;
448

449 450 451 452
#ifndef NO_SECURITY
	ExecCheckPerms(operation, resultRelation, rangeTable, parseTree);
#endif

453
	/******************
454
	 *	initialize the node's execution state
455
	 ******************
456
	 */
457 458
	estate->es_range_table = rangeTable;

459
	/******************
460 461 462 463
	 *	initialize the BaseId counter so node base_id's
	 *	are assigned correctly.  Someday baseid's will have to
	 *	be stored someplace other than estate because they
	 *	should be unique per query planned.
464
	 ******************
465
	 */
466
	estate->es_BaseId = 1;
467

468
	/******************
469
	 *	initialize result relation stuff
470
	 ******************
471
	 */
472

473 474
	if (resultRelation != 0 && operation != CMD_SELECT)
	{
475
		/******************
476 477
		 *	  if we have a result relation, open it and
		 *	  initialize the result relation info stuff.
478
		 ******************
479
		 */
480 481 482 483 484
		RelationInfo *resultRelationInfo;
		Index		resultRelationIndex;
		RangeTblEntry *rtentry;
		Oid			resultRelationOid;
		Relation	resultRelationDesc;
485 486 487 488 489 490 491

		resultRelationIndex = resultRelation;
		rtentry = rt_fetch(resultRelationIndex, rangeTable);
		resultRelationOid = rtentry->relid;
		resultRelationDesc = heap_open(resultRelationOid);

		if (resultRelationDesc->rd_rel->relkind == RELKIND_SEQUENCE)
492
			elog(ERROR, "You can't change sequence relation %s",
493 494
				 resultRelationDesc->rd_rel->relname.data);

V
Vadim B. Mikheev 已提交
495
		LockRelation(resultRelationDesc, RowExclusiveLock);
496 497 498 499 500 501 502

		resultRelationInfo = makeNode(RelationInfo);
		resultRelationInfo->ri_RangeTableIndex = resultRelationIndex;
		resultRelationInfo->ri_RelationDesc = resultRelationDesc;
		resultRelationInfo->ri_NumIndices = 0;
		resultRelationInfo->ri_IndexRelationDescs = NULL;
		resultRelationInfo->ri_IndexRelationInfo = NULL;
503

504
		/******************
505 506
		 *	open indices on result relation and save descriptors
		 *	in the result relation information..
507
		 ******************
508
		 */
V
Vadim B. Mikheev 已提交
509 510
		if (operation != CMD_DELETE)
			ExecOpenIndices(resultRelationOid, resultRelationInfo);
511 512

		estate->es_result_relation_info = resultRelationInfo;
513
	}
514 515
	else
	{
516
		/******************
517
		 *		if no result relation, then set state appropriately
518
		 ******************
519 520 521 522
		 */
		estate->es_result_relation_info = NULL;
	}

523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544
	/*
	 * Have to lock relations selected for update
	 */
	estate->es_rowMark = NULL;
	if (parseTree->rowMark != NULL)
	{
		Relation		relation;
		Oid				relid;
		RowMark		   *rm;
		List		   *l;
		execRowMark	   *erm;

		foreach(l, parseTree->rowMark)
		{
			rm = lfirst(l);
			relid = ((RangeTblEntry *)nth(rm->rti - 1, rangeTable))->relid;
			relation = heap_open(relid);
			LockRelation(relation, RowShareLock);
			if (!(rm->info & ROW_MARK_FOR_UPDATE))
				continue;
			erm = (execRowMark*) palloc(sizeof(execRowMark));
			erm->relation = relation;
545
			erm->rti = rm->rti;
546 547 548 549
			sprintf(erm->resname, "ctid%u", rm->rti);
			estate->es_rowMark = lappend(estate->es_rowMark, erm);
		}
	}
550

551
	/******************
552
	 *	  initialize the executor "tuple" table.
553
	 ******************
554 555
	 */
	{
556 557
		int			nSlots = ExecCountSlotsNode(plan);
		TupleTable	tupleTable = ExecCreateTupleTable(nSlots + 10);		/* why add ten? - jolly */
558

559 560
		estate->es_tupleTable = tupleTable;
	}
561

562
	/******************
563 564 565 566
	 *	   initialize the private state information for
	 *	   all the nodes in the query tree.  This opens
	 *	   files, allocates storage and leaves us ready
	 *	   to start processing tuples..
567
	 ******************
568 569 570
	 */
	ExecInitNode(plan, estate, NULL);

571
	/******************
572 573 574
	 *	   get the tuple descriptor describing the type
	 *	   of tuples to return.. (this is especially important
	 *	   if we are creating a relation with "retrieve into")
575
	 ******************
576 577 578 579 580
	 */
	tupType = ExecGetTupType(plan);		/* tuple descriptor */
	targetList = plan->targetlist;
	len = ExecTargetListLength(targetList);		/* number of attributes */

581
	/******************
582 583 584 585 586 587
	 *	  now that we have the target list, initialize the junk filter
	 *	  if this is a REPLACE or a DELETE query.
	 *	  We also init the junk filter if this is an append query
	 *	  (there might be some rule lock info there...)
	 *	  NOTE: in the future we might want to initialize the junk
	 *	  filter for all queries.
588
	 ******************
589 590
	 *		  SELECT added by daveh@insightdist.com  5/20/98 to allow
	 *		  ORDER/GROUP BY have an identifier missing from the target.
591 592
	 */
	{
593 594 595
		bool		junk_filter_needed = false;
		List	   *tlist;

596
		if (operation == CMD_SELECT)
597 598 599
		{
			foreach(tlist, targetList)
			{
600 601
				TargetEntry *tle = lfirst(tlist);

602 603 604 605 606 607 608 609 610 611 612 613 614
				if (tle->resdom->resjunk)
				{
					junk_filter_needed = true;
					break;
				}
			}
		}

		if (operation == CMD_UPDATE || operation == CMD_DELETE ||
			operation == CMD_INSERT ||
			(operation == CMD_SELECT && junk_filter_needed))
		{
			JunkFilter *j = (JunkFilter *) ExecInitJunkFilter(targetList);
615

616
			estate->es_junkFilter = j;
617

618 619 620 621 622 623
			if (operation == CMD_SELECT)
				tupType = j->jf_cleanTupType;
		}
		else
			estate->es_junkFilter = NULL;
	}
624

625
	/******************
626
	 *	initialize the "into" relation
627
	 ******************
628 629 630 631 632
	 */
	intoRelationDesc = (Relation) NULL;

	if (operation == CMD_SELECT)
	{
633 634 635
		char	   *intoName;
		Oid			intoRelationId;
		TupleDesc	tupdesc;
636 637 638 639 640 641 642 643 644

		if (!parseTree->isPortal)
		{

			/*
			 * a select into table
			 */
			if (parseTree->into != NULL)
			{
645
				/******************
646
				 *	create the "into" relation
647
				 ******************
648 649 650 651 652 653 654 655
				 */
				intoName = parseTree->into;

				/*
				 * have to copy tupType to get rid of constraints
				 */
				tupdesc = CreateTupleDescCopy(tupType);

656
				intoRelationId = heap_create_with_catalog(intoName,
657
											  tupdesc, RELKIND_RELATION);
658

659 660
				FreeTupleDesc(tupdesc);

661
				/******************
662 663 664
				 *	XXX rather than having to call setheapoverride(true)
				 *		and then back to false, we should change the
				 *		arguments to heap_open() instead..
665
				 ******************
666 667 668 669 670 671 672 673 674 675 676 677
				 */
				setheapoverride(true);

				intoRelationDesc = heap_open(intoRelationId);

				setheapoverride(false);
			}
		}
	}

	estate->es_into_relation_descriptor = intoRelationDesc;

678 679 680 681 682
	estate->es_origPlan = plan;
	estate->es_evalPlanQual = NULL;
	estate->es_evTuple = NULL;
	estate->es_useEvalPlan = false;

683
	return tupType;
684 685 686
}

/* ----------------------------------------------------------------
687 688 689
 *		EndPlan
 *
 *		Cleans up the query plan -- closes files and free up storages
690 691 692
 * ----------------------------------------------------------------
 */
static void
693
EndPlan(Plan *plan, EState *estate)
694
{
695 696
	RelationInfo *resultRelationInfo;
	Relation	intoRelationDesc;
697

698
	/******************
699
	 *	get information from state
700
	 ******************
701
	 */
702 703 704
	resultRelationInfo = estate->es_result_relation_info;
	intoRelationDesc = estate->es_into_relation_descriptor;

705
	/******************
706
	 *	 shut down the query
707
	 ******************
708 709 710
	 */
	ExecEndNode(plan, plan);

711
	/******************
712
	 *	  destroy the executor "tuple" table.
713
	 ******************
714 715
	 */
	{
716
		TupleTable	tupleTable = (TupleTable) estate->es_tupleTable;
717 718 719 720 721

		ExecDestroyTupleTable(tupleTable, true);		/* was missing last arg */
		estate->es_tupleTable = NULL;
	}

722
	/******************
723
	 *	 close the result relations if necessary
724
	 ******************
725 726 727
	 */
	if (resultRelationInfo != NULL)
	{
728
		Relation	resultRelationDesc;
729 730 731 732

		resultRelationDesc = resultRelationInfo->ri_RelationDesc;
		heap_close(resultRelationDesc);

733
		/******************
734
		 *	close indices on the result relation
735
		 ******************
736 737 738 739
		 */
		ExecCloseIndices(resultRelationInfo);
	}

740
	/******************
741
	 *	 close the "into" relation if necessary
742
	 ******************
743 744 745
	 */
	if (intoRelationDesc != NULL)
		heap_close(intoRelationDesc);
746 747 748
}

/* ----------------------------------------------------------------
749 750 751 752 753 754 755 756
 *		ExecutePlan
 *
 *		processes the query plan to retrieve 'tupleCount' tuples in the
 *		direction specified.
 *		Retrieves all tuples if tupleCount is 0
 *
 *		result is either a slot containing a tuple in the case
 *		of a RETRIEVE or NULL otherwise.
757 758 759 760 761 762 763 764
 *
 * ----------------------------------------------------------------
 */

/* the ctid attribute is a 'junk' attribute that is removed before the
   user can see it*/

static TupleTableSlot *
765 766
ExecutePlan(EState *estate,
			Plan *plan,
767 768 769
			CmdType operation,
			int numberTuples,
			ScanDirection direction,
770
			DestReceiver* destfunc)
771
{
772
	JunkFilter *junkfilter;
773 774

	TupleTableSlot *slot;
775
	ItemPointer tupleid = NULL;
776
	ItemPointerData tuple_ctid;
777
	int			current_tuple_count;
778 779
	TupleTableSlot *result;

780
	/******************
781
	 *	initialize local variables
782
	 ******************
783
	 */
784 785 786 787
	slot = NULL;
	current_tuple_count = 0;
	result = NULL;

788
	/******************
789
	 *	Set the direction.
790
	 ******************
791
	 */
792 793
	estate->es_direction = direction;

794
	/******************
795 796
	 *	Loop until we've processed the proper number
	 *	of tuples from the plan..
797
	 ******************
798 799 800 801
	 */

	for (;;)
	{
802 803 804 805 806
		/******************
		 *	Execute the plan and obtain a tuple
		 ******************
		 */
		/* at the top level, the parent of a plan (2nd arg) is itself */
807 808 809 810 811 812 813 814 815
lnext:;
		if (estate->es_useEvalPlan)
		{
			slot = EvalPlanQualNext(estate);
			if (TupIsNull(slot))
				slot = ExecProcNode(plan, plan);
		}
		else
			slot = ExecProcNode(plan, plan);
816

817 818 819 820 821 822 823 824 825 826
		/******************
		 *	if the tuple is null, then we assume
		 *	there is nothing more to process so
		 *	we just return null...
		 ******************
		 */
		if (TupIsNull(slot))
		{
			result = NULL;
			break;
827 828
		}

829
		/******************
830 831 832 833 834 835
		 *		if we have a junk filter, then project a new
		 *		tuple with the junk removed.
		 *
		 *		Store this new "clean" tuple in the place of the
		 *		original tuple.
		 *
836 837
		 *		Also, extract all the junk information we need.
		 ******************
838 839 840
		 */
		if ((junkfilter = estate->es_junkFilter) != (JunkFilter *) NULL)
		{
841 842 843
			Datum		datum;
			HeapTuple	newTuple;
			bool		isNull;
844

845
			/******************
846
			 * extract the 'ctid' junk attribute.
847
			 ******************
848 849 850 851 852 853 854 855
			 */
			if (operation == CMD_UPDATE || operation == CMD_DELETE)
			{
				if (!ExecGetJunkAttribute(junkfilter,
										  slot,
										  "ctid",
										  &datum,
										  &isNull))
856
					elog(ERROR, "ExecutePlan: NO (junk) `ctid' was found!");
857 858

				if (isNull)
859
					elog(ERROR, "ExecutePlan: (junk) `ctid' is NULL!");
860 861 862 863 864 865

				tupleid = (ItemPointer) DatumGetPointer(datum);
				tuple_ctid = *tupleid;	/* make sure we don't free the
										 * ctid!! */
				tupleid = &tuple_ctid;
			}
866 867 868 869 870 871
			else if (estate->es_rowMark != NULL)
			{
				List		   *l;
				execRowMark	   *erm;
				Buffer			buffer;
				HeapTupleData	tuple;
872
				TupleTableSlot *newSlot;
873 874
				int				test;

875
lmark:;
876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899
				foreach (l, estate->es_rowMark)
				{
					erm = lfirst(l);
					if (!ExecGetJunkAttribute(junkfilter,
											  slot,
											  erm->resname,
											  &datum,
											  &isNull))
						elog(ERROR, "ExecutePlan: NO (junk) `%s' was found!", erm->resname);

					if (isNull)
						elog(ERROR, "ExecutePlan: (junk) `%s' is NULL!", erm->resname);

					tuple.t_self = *((ItemPointer) DatumGetPointer(datum));
					test = heap_mark4update(erm->relation, &tuple, &buffer);
					ReleaseBuffer(buffer);
					switch (test)
					{
						case HeapTupleSelfUpdated:
						case HeapTupleMayBeUpdated:
							break;

						case HeapTupleUpdated:
							if (XactIsoLevel == XACT_SERIALIZABLE)
900
							{
901
								elog(ERROR, "Can't serialize access due to concurrent update");
902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920
								return(NULL);
							}
							else if (!(ItemPointerEquals(&(tuple.t_self), 
										(ItemPointer)DatumGetPointer(datum))))
							{
 								newSlot = EvalPlanQual(estate, erm->rti, &(tuple.t_self));
								if (!(TupIsNull(newSlot)))
								{
									slot = newSlot;
									estate->es_useEvalPlan = true;
									goto lmark;
								}
							}
							/* 
							 * if tuple was deleted or PlanQual failed
							 * for updated tuple - we have not return
							 * this tuple!
							 */
							goto lnext;
921 922 923 924 925 926 927

						default:
							elog(ERROR, "Unknown status %u from heap_mark4update", test);
							return(NULL);
					}
				}
			}
928

929
			/******************
930 931
			 * Finally create a new "clean" tuple with all junk attributes
			 * removed
932
			 ******************
933 934 935 936 937 938 939 940 941 942
			 */
			newTuple = ExecRemoveJunk(junkfilter, slot);

			slot = ExecStoreTuple(newTuple,		/* tuple to store */
								  slot, /* destination slot */
								  InvalidBuffer,		/* this tuple has no
														 * buffer */
								  true);		/* tuple should be pfreed */
		}						/* if (junkfilter... */

943
		/******************
944 945 946 947
		 *		now that we have a tuple, do the appropriate thing
		 *		with it.. either return it to the user, add
		 *		it to a relation someplace, delete it from a
		 *		relation, or modify some of it's attributes.
948
		 ******************
949 950 951 952
		 */

		switch (operation)
		{
953 954
			case CMD_SELECT:
				ExecRetrieve(slot,		/* slot containing tuple */
955
							 destfunc,	/* destination's tuple-receiver obj */
956 957 958
							 estate);	/* */
				result = slot;
				break;
959

960 961 962 963
			case CMD_INSERT:
				ExecAppend(slot, tupleid, estate);
				result = NULL;
				break;
964

965 966 967 968
			case CMD_DELETE:
				ExecDelete(slot, tupleid, estate);
				result = NULL;
				break;
969

970
			case CMD_UPDATE:
971
				ExecReplace(slot, tupleid, estate);
972 973
				result = NULL;
				break;
974

975 976
			default:
				elog(DEBUG, "ExecutePlan: unknown operation in queryDesc");
977
				result = NULL;
978
				break;
979
		}
980
		/******************
981 982 983
		 *		check our tuple count.. if we've returned the
		 *		proper number then return, else loop again and
		 *		process more tuples..
984
		 ******************
985 986 987 988
		 */
		current_tuple_count += 1;
		if (numberTuples == current_tuple_count)
			break;
989
	}
990

991
	/******************
992 993
	 *	here, result is either a slot containing a tuple in the case
	 *	of a RETRIEVE or NULL otherwise.
994
	 ******************
995
	 */
996
	return result;
997 998 999
}

/* ----------------------------------------------------------------
1000
 *		ExecRetrieve
1001
 *
1002 1003 1004 1005 1006
 *		RETRIEVEs are easy.. we just pass the tuple to the appropriate
 *		print function.  The only complexity is when we do a
 *		"retrieve into", in which case we insert the tuple into
 *		the appropriate relation (note: this is a newly created relation
 *		so we don't need to worry about indices or locks.)
1007 1008 1009
 * ----------------------------------------------------------------
 */
static void
1010
ExecRetrieve(TupleTableSlot *slot,
V
Vadim B. Mikheev 已提交
1011
			 DestReceiver *destfunc,
1012
			 EState *estate)
1013
{
1014 1015
	HeapTuple	tuple;
	TupleDesc	attrtype;
1016

1017
	/******************
1018
	 *	get the heap tuple out of the tuple table slot
1019
	 ******************
1020 1021 1022 1023
	 */
	tuple = slot->val;
	attrtype = slot->ttc_tupleDescriptor;

1024
	/******************
1025
	 *	insert the tuple into the "into relation"
1026
	 ******************
1027 1028 1029 1030 1031 1032 1033
	 */
	if (estate->es_into_relation_descriptor != NULL)
	{
		heap_insert(estate->es_into_relation_descriptor, tuple);
		IncrAppended();
	}

1034
	/******************
1035
	 *	send the tuple to the front end (or the screen)
1036
	 ******************
1037
	 */
1038
	(*destfunc->receiveTuple) (tuple, attrtype, destfunc);
1039 1040
	IncrRetrieved();
	(estate->es_processed)++;
1041 1042 1043
}

/* ----------------------------------------------------------------
1044
 *		ExecAppend
1045
 *
1046 1047 1048
 *		APPENDs are trickier.. we have to insert the tuple into
 *		the base relation and insert appropriate tuples into the
 *		index relations.
1049 1050 1051 1052
 * ----------------------------------------------------------------
 */

static void
1053
ExecAppend(TupleTableSlot *slot,
1054
		   ItemPointer tupleid,
1055
		   EState *estate)
1056
{
1057 1058 1059 1060 1061
	HeapTuple	tuple;
	RelationInfo *resultRelationInfo;
	Relation	resultRelationDesc;
	int			numIndices;
	Oid			newId;
1062

1063
	/******************
1064
	 *	get the heap tuple out of the tuple table slot
1065
	 ******************
1066 1067 1068
	 */
	tuple = slot->val;

1069
	/******************
1070
	 *	get information on the result relation
1071
	 ******************
1072 1073 1074 1075
	 */
	resultRelationInfo = estate->es_result_relation_info;
	resultRelationDesc = resultRelationInfo->ri_RelationDesc;

1076
	/******************
1077 1078
	 *	have to add code to preform unique checking here.
	 *	cim -12/1/89
1079
	 ******************
1080 1081 1082 1083 1084 1085
	 */

	/* BEFORE ROW INSERT Triggers */
	if (resultRelationDesc->trigdesc &&
	resultRelationDesc->trigdesc->n_before_row[TRIGGER_EVENT_INSERT] > 0)
	{
1086
		HeapTuple	newtuple;
1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100

		newtuple = ExecBRInsertTriggers(resultRelationDesc, tuple);

		if (newtuple == NULL)	/* "do nothing" */
			return;

		if (newtuple != tuple)	/* modified by Trigger(s) */
		{
			Assert(slot->ttc_shouldFree);
			pfree(tuple);
			slot->val = tuple = newtuple;
		}
	}

1101
	/******************
1102
	 * Check the constraints of a tuple
1103
	 ******************
1104 1105 1106 1107
	 */

	if (resultRelationDesc->rd_att->constr)
	{
1108
		ExecConstraints("ExecAppend", resultRelationDesc, tuple);
1109 1110
	}

1111
	/******************
1112
	 *	insert the tuple
1113
	 ******************
1114 1115 1116 1117 1118
	 */
	newId = heap_insert(resultRelationDesc,		/* relation desc */
						tuple); /* heap tuple */
	IncrAppended();

1119
	/******************
1120 1121 1122 1123 1124
	 *	process indices
	 *
	 *	Note: heap_insert adds a new tuple to a relation.  As a side
	 *	effect, the tupleid of the new tuple is placed in the new
	 *	tuple's t_ctid field.
1125
	 ******************
1126 1127 1128
	 */
	numIndices = resultRelationInfo->ri_NumIndices;
	if (numIndices > 0)
1129
		ExecInsertIndexTuples(slot, &(tuple->t_self), estate, false);
1130 1131 1132 1133 1134 1135 1136
	(estate->es_processed)++;
	estate->es_lastoid = newId;

	/* AFTER ROW INSERT Triggers */
	if (resultRelationDesc->trigdesc &&
	 resultRelationDesc->trigdesc->n_after_row[TRIGGER_EVENT_INSERT] > 0)
		ExecARInsertTriggers(resultRelationDesc, tuple);
1137 1138 1139
}

/* ----------------------------------------------------------------
1140
 *		ExecDelete
1141
 *
1142 1143
 *		DELETE is like append, we delete the tuple and its
 *		index tuples.
1144 1145 1146
 * ----------------------------------------------------------------
 */
static void
1147
ExecDelete(TupleTableSlot *slot,
1148
		   ItemPointer tupleid,
1149
		   EState *estate)
1150
{
V
Vadim B. Mikheev 已提交
1151 1152
	RelationInfo	   *resultRelationInfo;
	Relation			resultRelationDesc;
V
Vadim B. Mikheev 已提交
1153
	ItemPointerData		ctid;
V
Vadim B. Mikheev 已提交
1154
	int					result;
1155

1156
	/******************
1157
	 *	get the result relation information
1158
	 ******************
1159 1160 1161 1162 1163 1164 1165 1166
	 */
	resultRelationInfo = estate->es_result_relation_info;
	resultRelationDesc = resultRelationInfo->ri_RelationDesc;

	/* BEFORE ROW DELETE Triggers */
	if (resultRelationDesc->trigdesc &&
	resultRelationDesc->trigdesc->n_before_row[TRIGGER_EVENT_DELETE] > 0)
	{
1167
		bool		dodelete;
1168

V
Vadim B. Mikheev 已提交
1169
		dodelete = ExecBRDeleteTriggers(estate, tupleid);
1170 1171 1172 1173 1174

		if (!dodelete)			/* "do nothing" */
			return;
	}

V
Vadim B. Mikheev 已提交
1175
	/*
1176 1177
	 *	delete the tuple
	 */
1178
ldelete:;
V
Vadim B. Mikheev 已提交
1179 1180 1181 1182 1183 1184 1185 1186 1187 1188
	result = heap_delete(resultRelationDesc, tupleid, &ctid);
	switch (result)
	{
		case HeapTupleSelfUpdated:
			return;

		case HeapTupleMayBeUpdated:
			break;

		case HeapTupleUpdated:
1189 1190
			if (XactIsoLevel == XACT_SERIALIZABLE)
				elog(ERROR, "Can't serialize access due to concurrent update");
1191 1192
			else if (!(ItemPointerEquals(tupleid, &ctid)))
			{
V
Vadim B. Mikheev 已提交
1193
				TupleTableSlot *epqslot = EvalPlanQual(estate, 
1194 1195
						resultRelationInfo->ri_RangeTableIndex, &ctid);

V
Vadim B. Mikheev 已提交
1196
				if (!TupIsNull(epqslot))
1197 1198 1199 1200 1201
				{
					*tupleid = ctid;
					goto ldelete;
				}
			}
V
Vadim B. Mikheev 已提交
1202 1203 1204 1205 1206 1207
			return;

		default:
			elog(ERROR, "Unknown status %u from heap_delete", result);
			return;
	}
1208 1209 1210 1211

	IncrDeleted();
	(estate->es_processed)++;

1212
	/******************
1213 1214 1215 1216 1217 1218 1219 1220
	 *	Note: Normally one would think that we have to
	 *		  delete index tuples associated with the
	 *		  heap tuple now..
	 *
	 *		  ... but in POSTGRES, we have no need to do this
	 *		  because the vacuum daemon automatically
	 *		  opens an index scan and deletes index tuples
	 *		  when it finds deleted heap tuples. -cim 9/27/89
1221
	 ******************
1222 1223 1224 1225 1226
	 */

	/* AFTER ROW DELETE Triggers */
	if (resultRelationDesc->trigdesc &&
	 resultRelationDesc->trigdesc->n_after_row[TRIGGER_EVENT_DELETE] > 0)
V
Vadim B. Mikheev 已提交
1227
		ExecARDeleteTriggers(estate, tupleid);
1228 1229 1230 1231

}

/* ----------------------------------------------------------------
1232
 *		ExecReplace
1233
 *
1234 1235 1236 1237 1238 1239
 *		note: we can't run replace queries with transactions
 *		off because replaces are actually appends and our
 *		scan will mistakenly loop forever, replacing the tuple
 *		it just appended..	This should be fixed but until it
 *		is, we don't want to get stuck in an infinite loop
 *		which corrupts your database..
1240 1241 1242
 * ----------------------------------------------------------------
 */
static void
1243
ExecReplace(TupleTableSlot *slot,
1244
			ItemPointer tupleid,
1245
			EState *estate)
1246
{
V
Vadim B. Mikheev 已提交
1247 1248 1249
	HeapTuple			tuple;
	RelationInfo	   *resultRelationInfo;
	Relation			resultRelationDesc;
V
Vadim B. Mikheev 已提交
1250
	ItemPointerData		ctid;
V
Vadim B. Mikheev 已提交
1251 1252
	int					result;
	int					numIndices;
1253

1254
	/******************
1255
	 *	abort the operation if not running transactions
1256
	 ******************
1257 1258 1259 1260 1261 1262 1263
	 */
	if (IsBootstrapProcessingMode())
	{
		elog(DEBUG, "ExecReplace: replace can't run without transactions");
		return;
	}

1264
	/******************
1265
	 *	get the heap tuple out of the tuple table slot
1266
	 ******************
1267 1268 1269
	 */
	tuple = slot->val;

1270
	/******************
1271
	 *	get the result relation information
1272
	 ******************
1273 1274 1275 1276
	 */
	resultRelationInfo = estate->es_result_relation_info;
	resultRelationDesc = resultRelationInfo->ri_RelationDesc;

1277
	/******************
1278 1279 1280 1281
	 *	have to add code to preform unique checking here.
	 *	in the event of unique tuples, this becomes a deletion
	 *	of the original tuple affected by the replace.
	 *	cim -12/1/89
1282
	 ******************
1283 1284 1285 1286 1287 1288
	 */

	/* BEFORE ROW UPDATE Triggers */
	if (resultRelationDesc->trigdesc &&
	resultRelationDesc->trigdesc->n_before_row[TRIGGER_EVENT_UPDATE] > 0)
	{
1289
		HeapTuple	newtuple;
1290

V
Vadim B. Mikheev 已提交
1291
		newtuple = ExecBRUpdateTriggers(estate, tupleid, tuple);
1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303

		if (newtuple == NULL)	/* "do nothing" */
			return;

		if (newtuple != tuple)	/* modified by Trigger(s) */
		{
			Assert(slot->ttc_shouldFree);
			pfree(tuple);
			slot->val = tuple = newtuple;
		}
	}

1304
	/******************
1305
	 * Check the constraints of a tuple
1306
	 ******************
1307 1308 1309 1310
	 */

	if (resultRelationDesc->rd_att->constr)
	{
1311
		ExecConstraints("ExecReplace", resultRelationDesc, tuple);
1312 1313
	}

V
Vadim B. Mikheev 已提交
1314
	/*
1315 1316
	 *	replace the heap tuple
	 */
1317
lreplace:;
V
Vadim B. Mikheev 已提交
1318 1319 1320 1321 1322 1323 1324 1325 1326 1327
	result = heap_replace(resultRelationDesc, tupleid, tuple, &ctid);
	switch (result)
	{
		case HeapTupleSelfUpdated:
			return;

		case HeapTupleMayBeUpdated:
			break;

		case HeapTupleUpdated:
1328 1329
			if (XactIsoLevel == XACT_SERIALIZABLE)
				elog(ERROR, "Can't serialize access due to concurrent update");
1330 1331
			else if (!(ItemPointerEquals(tupleid, &ctid)))
			{
V
Vadim B. Mikheev 已提交
1332
				TupleTableSlot *epqslot = EvalPlanQual(estate, 
1333 1334
						resultRelationInfo->ri_RangeTableIndex, &ctid);

V
Vadim B. Mikheev 已提交
1335
				if (!TupIsNull(epqslot))
1336 1337
				{
					*tupleid = ctid;
V
Vadim B. Mikheev 已提交
1338 1339
					tuple = ExecRemoveJunk(estate->es_junkFilter, epqslot);
					slot = ExecStoreTuple(tuple, slot, InvalidBuffer, true);
1340 1341 1342
					goto lreplace;
				}
			}
V
Vadim B. Mikheev 已提交
1343 1344 1345 1346 1347
			return;

		default:
			elog(ERROR, "Unknown status %u from heap_replace", result);
			return;
1348 1349 1350 1351 1352
	}

	IncrReplaced();
	(estate->es_processed)++;

1353
	/******************
1354 1355 1356 1357 1358 1359 1360
	 *	Note: instead of having to update the old index tuples
	 *		  associated with the heap tuple, all we do is form
	 *		  and insert new index tuples..  This is because
	 *		  replaces are actually deletes and inserts and
	 *		  index tuple deletion is done automagically by
	 *		  the vaccuum deamon.. All we do is insert new
	 *		  index tuples.  -cim 9/27/89
1361
	 ******************
1362 1363
	 */

1364
	/******************
1365 1366 1367 1368 1369 1370 1371
	 *	process indices
	 *
	 *	heap_replace updates a tuple in the base relation by invalidating
	 *	it and then appending a new tuple to the relation.	As a side
	 *	effect, the tupleid of the new tuple is placed in the new
	 *	tuple's t_ctid field.  So we now insert index tuples using
	 *	the new tupleid stored there.
1372
	 ******************
1373 1374 1375 1376
	 */

	numIndices = resultRelationInfo->ri_NumIndices;
	if (numIndices > 0)
1377
		ExecInsertIndexTuples(slot, &(tuple->t_self), estate, true);
1378 1379 1380 1381

	/* AFTER ROW UPDATE Triggers */
	if (resultRelationDesc->trigdesc &&
	 resultRelationDesc->trigdesc->n_after_row[TRIGGER_EVENT_UPDATE] > 0)
V
Vadim B. Mikheev 已提交
1382
		ExecARUpdateTriggers(estate, tupleid, tuple);
1383
}
V
Vadim B. Mikheev 已提交
1384

1385
#if 0
1386
static HeapTuple
1387
ExecAttrDefault(Relation rel, HeapTuple tuple)
V
Vadim B. Mikheev 已提交
1388
{
1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400
	int			ndef = rel->rd_att->constr->num_defval;
	AttrDefault *attrdef = rel->rd_att->constr->defval;
	ExprContext *econtext = makeNode(ExprContext);
	HeapTuple	newtuple;
	Node	   *expr;
	bool		isnull;
	bool		isdone;
	Datum		val;
	Datum	   *replValue = NULL;
	char	   *replNull = NULL;
	char	   *repl = NULL;
	int			i;
1401 1402 1403 1404 1405 1406

	econtext->ecxt_scantuple = NULL;	/* scan tuple slot */
	econtext->ecxt_innertuple = NULL;	/* inner tuple slot */
	econtext->ecxt_outertuple = NULL;	/* outer tuple slot */
	econtext->ecxt_relation = NULL;		/* relation */
	econtext->ecxt_relid = 0;	/* relid */
1407 1408
	econtext->ecxt_param_list_info = NULL;		/* param list info */
	econtext->ecxt_param_exec_vals = NULL;		/* exec param values */
1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427
	econtext->ecxt_range_table = NULL;	/* range table */
	for (i = 0; i < ndef; i++)
	{
		if (!heap_attisnull(tuple, attrdef[i].adnum))
			continue;
		expr = (Node *) stringToNode(attrdef[i].adbin);

		val = ExecEvalExpr(expr, econtext, &isnull, &isdone);

		pfree(expr);

		if (isnull)
			continue;

		if (repl == NULL)
		{
			repl = (char *) palloc(rel->rd_att->natts * sizeof(char));
			replNull = (char *) palloc(rel->rd_att->natts * sizeof(char));
			replValue = (Datum *) palloc(rel->rd_att->natts * sizeof(Datum));
B
Bruce Momjian 已提交
1428
			MemSet(repl, ' ', rel->rd_att->natts * sizeof(char));
1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439
		}

		repl[attrdef[i].adnum - 1] = 'r';
		replNull[attrdef[i].adnum - 1] = ' ';
		replValue[attrdef[i].adnum - 1] = val;

	}

	pfree(econtext);

	if (repl == NULL)
1440
		return tuple;
1441

1442
	newtuple = heap_modifytuple(tuple, rel, replValue, replNull, repl);
1443 1444

	pfree(repl);
1445
	pfree(tuple);
1446 1447 1448
	pfree(replNull);
	pfree(replValue);

1449
	return newtuple;
1450

V
Vadim B. Mikheev 已提交
1451
}
1452

1453
#endif
V
Vadim B. Mikheev 已提交
1454

1455
static char *
1456
ExecRelCheck(Relation rel, HeapTuple tuple)
V
Vadim B. Mikheev 已提交
1457
{
1458 1459 1460
	int			ncheck = rel->rd_att->constr->num_check;
	ConstrCheck *check = rel->rd_att->constr->check;
	ExprContext *econtext = makeNode(ExprContext);
1461
	TupleTableSlot *slot = makeNode(TupleTableSlot);
1462 1463 1464 1465 1466
	RangeTblEntry *rte = makeNode(RangeTblEntry);
	List	   *rtlist;
	List	   *qual;
	bool		res;
	int			i;
1467 1468 1469 1470 1471 1472 1473 1474 1475

	slot->val = tuple;
	slot->ttc_shouldFree = false;
	slot->ttc_descIsNew = true;
	slot->ttc_tupleDescriptor = rel->rd_att;
	slot->ttc_buffer = InvalidBuffer;
	slot->ttc_whichplan = -1;
	rte->relname = nameout(&(rel->rd_rel->relname));
	rte->refname = rte->relname;
1476
	rte->relid = RelationGetRelid(rel);
1477 1478 1479 1480 1481 1482 1483 1484 1485
	rte->inh = false;
	rte->inFromCl = true;
	rtlist = lcons(rte, NIL);
	econtext->ecxt_scantuple = slot;	/* scan tuple slot */
	econtext->ecxt_innertuple = NULL;	/* inner tuple slot */
	econtext->ecxt_outertuple = NULL;	/* outer tuple slot */
	econtext->ecxt_relation = rel;		/* relation */
	econtext->ecxt_relid = 0;	/* relid */
	econtext->ecxt_param_list_info = NULL;		/* param list info */
V
Vadim B. Mikheev 已提交
1486
	econtext->ecxt_param_exec_vals = NULL;		/* exec param values */
1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497
	econtext->ecxt_range_table = rtlist;		/* range table */

	for (i = 0; i < ncheck; i++)
	{
		qual = (List *) stringToNode(check[i].ccbin);

		res = ExecQual(qual, econtext);

		pfree(qual);

		if (!res)
1498
			return check[i].ccname;
1499 1500 1501 1502 1503 1504 1505 1506
	}

	pfree(slot);
	pfree(rte->relname);
	pfree(rte);
	pfree(rtlist);
	pfree(econtext);

1507
	return (char *) NULL;
1508

V
Vadim B. Mikheev 已提交
1509 1510
}

1511
void
1512
ExecConstraints(char *caller, Relation rel, HeapTuple tuple)
V
Vadim B. Mikheev 已提交
1513
{
1514 1515 1516 1517

	Assert(rel->rd_att->constr);

	if (rel->rd_att->constr->has_not_null)
V
Vadim B. Mikheev 已提交
1518
	{
1519
		int			attrChk;
1520 1521 1522 1523

		for (attrChk = 1; attrChk <= rel->rd_att->natts; attrChk++)
		{
			if (rel->rd_att->attrs[attrChk - 1]->attnotnull && heap_attisnull(tuple, attrChk))
1524
				elog(ERROR, "%s: Fail to add null value in not null attribute %s",
1525 1526 1527 1528 1529 1530
				  caller, rel->rd_att->attrs[attrChk - 1]->attname.data);
		}
	}

	if (rel->rd_att->constr->num_check > 0)
	{
1531
		char	   *failed;
1532 1533

		if ((failed = ExecRelCheck(rel, tuple)) != NULL)
1534
			elog(ERROR, "%s: rejected due to CHECK constraint %s", caller, failed);
1535 1536
	}

1537
	return;
V
Vadim B. Mikheev 已提交
1538
}
1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595

TupleTableSlot*
EvalPlanQual(EState *estate, Index rti, ItemPointer tid)
{
	evalPlanQual	   *epq = (evalPlanQual*) estate->es_evalPlanQual;
	evalPlanQual	   *oldepq;
	EState			   *epqstate = NULL;
	Relation			relation;
	Buffer				buffer;
	HeapTupleData		tuple;
	bool				endNode = true;

	Assert(rti != 0);

	if (epq != NULL && epq->rti == 0)
	{
		Assert(!(estate->es_useEvalPlan) && 
				epq->estate.es_evalPlanQual == NULL);
		epq->rti = rti;
		endNode = false;
	}

	/*
	 * If this is request for another RTE - Ra, - then we have to check
	 * wasn't PlanQual requested for Ra already and if so then Ra' row 
	 * was updated again and we have to re-start old execution for Ra 
	 * and forget all what we done after Ra was suspended. Cool? -:))
	 */
	if (epq != NULL && epq->rti != rti && 
		epq->estate.es_evTuple[rti - 1] != NULL)
	{
		do
		{
			/* pop previous PlanQual from the stack */
			epqstate = &(epq->estate);
			oldepq = (evalPlanQual*) epqstate->es_evalPlanQual;
			Assert(oldepq->rti != 0);
			/* stop execution */
			ExecEndNode(epq->plan, epq->plan);
			pfree(epqstate->es_evTuple[epq->rti - 1]);
			epqstate->es_evTuple[epq->rti - 1] = NULL;
			/* push current PQ to freePQ stack */
			oldepq->free = epq;
			epq = oldepq;
		} while (epq->rti != rti);
		estate->es_evalPlanQual = (Pointer) epq;
	}

	/* 
	 * If we are requested for another RTE then we have to suspend
	 * execution of current PlanQual and start execution for new one.
	 */
	if (epq == NULL || epq->rti != rti)
	{
		/* try to reuse plan used previously */
		evalPlanQual   *newepq = (epq != NULL) ? epq->free : NULL;

1596
		if (newepq == NULL)		/* first call or freePQ stack is empty */
1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616
		{
			newepq = (evalPlanQual*) palloc(sizeof(evalPlanQual));
			/* Init EState */
			epqstate = &(newepq->estate);
			memset(epqstate, 0, sizeof(EState));
			epqstate->type = T_EState; 
			epqstate->es_direction = ForwardScanDirection;
			epqstate->es_snapshot = estate->es_snapshot;
			epqstate->es_range_table = estate->es_range_table;
			epqstate->es_param_list_info = estate->es_param_list_info;
			if (estate->es_origPlan->nParamExec > 0)
				epqstate->es_param_exec_vals = (ParamExecData *)
							palloc(estate->es_origPlan->nParamExec * 
									sizeof(ParamExecData));
			epqstate->es_tupleTable = 
				ExecCreateTupleTable(estate->es_tupleTable->size);
			epqstate->es_refcount = estate->es_refcount;
			/* ... rest */
			newepq->plan = copyObject(estate->es_origPlan);
			newepq->free = NULL;
1617 1618 1619
			epqstate->es_evTupleNull = (bool*) 
				palloc(length(estate->es_range_table) * sizeof(bool));
			if (epq == NULL)	/* first call */
1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696
			{
				epqstate->es_evTuple = (HeapTuple*) 
					palloc(length(estate->es_range_table) * sizeof(HeapTuple));
				memset(epqstate->es_evTuple, 0, 
					length(estate->es_range_table) * sizeof(HeapTuple));
			}
			else
			{
				epqstate->es_evTuple = epq->estate.es_evTuple;
			}
		}
		else
		{
			epqstate = &(newepq->estate);
		}
		/* push current PQ to the stack */
		epqstate->es_evalPlanQual = (Pointer) epq;
		estate->es_evalPlanQual = (Pointer) epq = newepq;
		epq->rti = rti;
		endNode = false;
	}

	epqstate = &(epq->estate);

	/*
	 * Ok - we're requested for the same RTE (-:)).
	 * I'm not sure about ability to use ExecReScan instead of
	 * ExecInitNode, so...
	 */
	if (endNode)
		ExecEndNode(epq->plan, epq->plan);

	/* free old RTE' tuple */
	if (epqstate->es_evTuple[epq->rti - 1] != NULL)
	{
		pfree(epqstate->es_evTuple[epq->rti - 1]);
		epqstate->es_evTuple[epq->rti - 1] = NULL;
	}

	/* ** fetch tid tuple ** */
	if (estate->es_result_relation_info != NULL && 
		estate->es_result_relation_info->ri_RangeTableIndex == rti)
		relation = estate->es_result_relation_info->ri_RelationDesc;
	else
	{
		List   *l;

		foreach (l, estate->es_rowMark)
		{
			if (((execRowMark*) lfirst(l))->rti == rti)
				break;
		}
		relation = ((execRowMark*) lfirst(l))->relation;
	}
	tuple.t_self = *tid;
	for ( ; ; )
	{
		heap_fetch(relation, SnapshotDirty, &tuple, &buffer);
		if (tuple.t_data != NULL)
		{
			TransactionId xwait = SnapshotDirty->xmax;

			if (TransactionIdIsValid(SnapshotDirty->xmin))
				elog(ERROR, "EvalPlanQual: t_xmin is uncommitted ?!");
			/*
			 * If tuple is being updated by other transaction then 
			 * we have to wait for its commit/abort.
			 */
			if (TransactionIdIsValid(xwait))
			{
				ReleaseBuffer(buffer);
				XactLockTableWait(xwait);
				continue;
			}
			/*
			 * Nice! We got tuple - now copy it.
			 */
1697 1698
			if (epqstate->es_evTuple[epq->rti - 1] != NULL)
				pfree(epqstate->es_evTuple[epq->rti - 1]);
1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738
			epqstate->es_evTuple[epq->rti - 1] = heap_copytuple(&tuple);
			ReleaseBuffer(buffer);
			break;
		}
		/*
		 * Ops! Invalid tuple. Have to check is it updated or deleted.
		 * Note that it's possible to get invalid SnapshotDirty->tid
		 * if tuple updated by this transaction. Have we to check this ?
		 */
		if (ItemPointerIsValid(&(SnapshotDirty->tid)) && 
			!(ItemPointerEquals(&(tuple.t_self), &(SnapshotDirty->tid))))
		{
			tuple.t_self = SnapshotDirty->tid;	/* updated ... */
			continue;
		}
		/*
		 * Deleted or updated by this transaction. Do not
		 * (re-)start execution of this PQ. Continue previous PQ.
		 */
		oldepq = (evalPlanQual*) epqstate->es_evalPlanQual;
		if (oldepq != NULL)
		{
			Assert(oldepq->rti != 0);
			/* push current PQ to freePQ stack */
			oldepq->free = epq;
			epq = oldepq;
			epqstate = &(epq->estate);
			estate->es_evalPlanQual = (Pointer) epq;
		}
		else
		{									/* this is the first (oldest) PQ
			epq->rti = 0;					 * - mark as free and 
			estate->es_useEvalPlan = false;	 * continue Query execution
			return (NULL);					 */
		}
	}

	if (estate->es_origPlan->nParamExec > 0)
		memset(epqstate->es_param_exec_vals, 0, 
				estate->es_origPlan->nParamExec * sizeof(ParamExecData));
1739 1740
	memset(epqstate->es_evTupleNull, false, 
			length(estate->es_range_table) * sizeof(bool));
1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791
	ExecInitNode(epq->plan, epqstate, NULL);

	/*
	 * For UPDATE/DELETE we have to return tid of actual row
	 * we're executing PQ for.
	 */
	*tid = tuple.t_self;

	return (EvalPlanQualNext(estate));
}

static TupleTableSlot* 
EvalPlanQualNext(EState *estate)
{
	evalPlanQual	   *epq = (evalPlanQual*) estate->es_evalPlanQual;
	EState			   *epqstate = &(epq->estate);
	evalPlanQual	   *oldepq;
	TupleTableSlot	   *slot;

	Assert(epq->rti != 0);

lpqnext:;
	slot = ExecProcNode(epq->plan, epq->plan);

	/*
	 * No more tuples for this PQ. Continue previous one.
	 */
	if (TupIsNull(slot))
	{
		ExecEndNode(epq->plan, epq->plan);
		pfree(epqstate->es_evTuple[epq->rti - 1]);
		epqstate->es_evTuple[epq->rti - 1] = NULL;
		/* pop old PQ from the stack */
		oldepq = (evalPlanQual*) epqstate->es_evalPlanQual;
		if (oldepq == (evalPlanQual*) NULL)
		{									/* this is the first (oldest) */
			epq->rti = 0;					/* PQ - mark as free and      */
			estate->es_useEvalPlan = false;	/* continue Query execution   */
			return (NULL);
		}
		Assert(oldepq->rti != 0);
		/* push current PQ to freePQ stack */
		oldepq->free = epq;
		epq = oldepq;
		epqstate = &(epq->estate);
		estate->es_evalPlanQual = (Pointer) epq;
		goto lpqnext;
	}

	return (slot);
}