execMain.c 33.5 KB
Newer Older
1 2 3
/*-------------------------------------------------------------------------
 *
 * execMain.c--
4
 *	  top level executor interface routines
5 6
 *
 * INTERFACE ROUTINES
7 8 9
 *	ExecutorStart()
 *	ExecutorRun()
 *	ExecutorEnd()
10
 *
11 12 13 14 15 16 17 18 19 20 21 22 23
 *	The old ExecutorMain() has been replaced by ExecutorStart(),
 *	ExecutorRun() and ExecutorEnd()
 *
 *	These three procedures are the external interfaces to the executor.
 *	In each case, the query descriptor and the execution state is required
 *	 as arguments
 *
 *	ExecutorStart() must be called at the beginning of any execution of any
 *	query plan and ExecutorEnd() should always be called at the end of
 *	execution of a plan.
 *
 *	ExecutorRun accepts 'feature' and 'count' arguments that specify whether
 *	the plan is to be executed forwards, backwards, and for how many tuples.
24 25 26 27 28
 *
 * Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
29
 *	  $Header: /cvsroot/pgsql/src/backend/executor/execMain.c,v 1.59 1998/11/27 19:51:59 vadim Exp $
30 31 32
 *
 *-------------------------------------------------------------------------
 */
33
#include <string.h>
34
#include "postgres.h"
35
#include "miscadmin.h"
36

37
#include "executor/executor.h"
38 39
#include "executor/execdefs.h"
#include "executor/execdebug.h"
40
#include "executor/nodeIndexscan.h"
41 42 43
#include "utils/builtins.h"
#include "utils/palloc.h"
#include "utils/acl.h"
44
#include "utils/syscache.h"
45
#include "utils/tqual.h"
46
#include "parser/parsetree.h"	/* rt_fetch() */
47
#include "storage/bufmgr.h"
M
Marc G. Fournier 已提交
48
#include "storage/lmgr.h"
49
#include "storage/smgr.h"
50 51 52
#include "commands/async.h"
/* #include "access/localam.h" */
#include "optimizer/var.h"
53 54
#include "access/heapam.h"
#include "catalog/heap.h"
55
#include "commands/trigger.h"
56

57
void ExecCheckPerms(CmdType operation, int resultRelation, List *rangeTable,
58
			   Query *parseTree);
59 60 61


/* decls for local routines only used within this module */
62
static TupleDesc InitPlan(CmdType operation, Query *parseTree,
63 64
		 Plan *plan, EState *estate);
static void EndPlan(Plan *plan, EState *estate);
65
static TupleTableSlot *ExecutePlan(EState *estate, Plan *plan,
66
			Query *parseTree, CmdType operation,
67 68
			int numberTuples, ScanDirection direction,
			void (*printfunc) ());
69 70
static void ExecRetrieve(TupleTableSlot *slot, void (*printfunc) (),
									 EState *estate);
71
static void ExecAppend(TupleTableSlot *slot, ItemPointer tupleid,
72
		   EState *estate);
73
static void ExecDelete(TupleTableSlot *slot, ItemPointer tupleid,
74
		   EState *estate);
75
static void ExecReplace(TupleTableSlot *slot, ItemPointer tupleid,
76
			EState *estate, Query *parseTree);
77 78 79

/* end of local decls */

M
Marc G. Fournier 已提交
80
#ifdef QUERY_LIMIT
81
static int	queryLimit = ALL_TUPLES;
82

M
Marc G. Fournier 已提交
83 84 85 86 87 88
#undef ALL_TUPLES
#define ALL_TUPLES queryLimit

int
ExecutorLimit(int limit)
{
89
	return queryLimit = limit;
M
Marc G. Fournier 已提交
90
}
91

B
Bruce Momjian 已提交
92 93 94 95 96 97
int
ExecutorGetLimit()
{
	return queryLimit;
}

98
#endif
M
Marc G. Fournier 已提交
99

100
/* ----------------------------------------------------------------
101 102 103 104 105 106 107
 *		ExecutorStart
 *
 *		This routine must be called at the beginning of any execution of any
 *		query plan
 *
 *		returns (AttrInfo*) which describes the attributes of the tuples to
 *		be returned by the query.
108 109 110 111
 *
 * ----------------------------------------------------------------
 */
TupleDesc
112
ExecutorStart(QueryDesc *queryDesc, EState *estate)
113
{
114
	TupleDesc	result;
115 116 117

	/* sanity checks */
	Assert(queryDesc != NULL);
118

V
Vadim B. Mikheev 已提交
119 120
	if (queryDesc->plantree->nParamExec > 0)
	{
121 122 123
		estate->es_param_exec_vals = (ParamExecData *)
			palloc(queryDesc->plantree->nParamExec * sizeof(ParamExecData));
		memset(estate->es_param_exec_vals, 0, queryDesc->plantree->nParamExec * sizeof(ParamExecData));
V
Vadim B. Mikheev 已提交
124
	}
125

126
	estate->es_snapshot = SnapshotNow;
127

128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
	result = InitPlan(queryDesc->operation,
					  queryDesc->parsetree,
					  queryDesc->plantree,
					  estate);

	/*
	 * reset buffer refcount.  the current refcounts are saved and will be
	 * restored when ExecutorEnd is called
	 *
	 * this makes sure that when ExecutorRun's are called recursively as for
	 * postquel functions, the buffers pinned by one ExecutorRun will not
	 * be unpinned by another ExecutorRun.
	 */
	BufferRefCountReset(estate->es_refcount);

	return result;
144 145 146
}

/* ----------------------------------------------------------------
147 148 149 150 151 152 153
 *		ExecutorRun
 *
 *		This is the main routine of the executor module. It accepts
 *		the query descriptor from the traffic cop and executes the
 *		query plan.
 *
 *		ExecutorStart must have been called already.
154
 *
155 156 157 158 159 160
 *		the different features supported are:
 *			 EXEC_RUN:	retrieve all tuples in the forward direction
 *			 EXEC_FOR:	retrieve 'count' number of tuples in the forward dir
 *			 EXEC_BACK: retrieve 'count' number of tuples in the backward dir
 *			 EXEC_RETONE: return one tuple but don't 'retrieve' it
 *						   used in postquel function processing
161 162 163 164
 *
 *
 * ----------------------------------------------------------------
 */
165
TupleTableSlot *
166
ExecutorRun(QueryDesc *queryDesc, EState *estate, int feature, int count)
167
{
168 169 170
	CmdType		operation;
	Query	   *parseTree;
	Plan	   *plan;
171
	TupleTableSlot *result;
172 173
	CommandDest dest;
	void		(*destination) ();
174

175
	/******************
176
	 *	sanity checks
177
	 ******************
178
	 */
179 180
	Assert(queryDesc != NULL);

181
	/******************
182 183
	 *	extract information from the query descriptor
	 *	and the query feature.
184
	 ******************
185
	 */
186 187 188 189 190 191 192 193 194 195 196
	operation = queryDesc->operation;
	parseTree = queryDesc->parsetree;
	plan = queryDesc->plantree;
	dest = queryDesc->dest;
	destination = (void (*) ()) DestToFunction(dest);
	estate->es_processed = 0;
	estate->es_lastoid = InvalidOid;

	switch (feature)
	{

197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
		case EXEC_RUN:
			result = ExecutePlan(estate,
								 plan,
								 parseTree,
								 operation,
								 ALL_TUPLES,
								 ForwardScanDirection,
								 destination);
			break;
		case EXEC_FOR:
			result = ExecutePlan(estate,
								 plan,
								 parseTree,
								 operation,
								 count,
								 ForwardScanDirection,
								 destination);
			break;
215

216
			/******************
217
			 *		retrieve next n "backward" tuples
218
			 ******************
219 220 221 222 223 224 225 226 227 228
			 */
		case EXEC_BACK:
			result = ExecutePlan(estate,
								 plan,
								 parseTree,
								 operation,
								 count,
								 BackwardScanDirection,
								 destination);
			break;
229

230
			/******************
231 232
			 *		return one tuple but don't "retrieve" it.
			 *		(this is used by the rule manager..) -cim 9/14/89
233
			 ******************
234 235 236 237 238 239 240 241 242 243 244 245 246 247
			 */
		case EXEC_RETONE:
			result = ExecutePlan(estate,
								 plan,
								 parseTree,
								 operation,
								 ONE_TUPLE,
								 ForwardScanDirection,
								 destination);
			break;
		default:
			result = NULL;
			elog(DEBUG, "ExecutorRun: Unknown feature %d", feature);
			break;
248 249 250
	}

	return result;
251 252 253
}

/* ----------------------------------------------------------------
254 255 256 257 258 259 260
 *		ExecutorEnd
 *
 *		This routine must be called at the end of any execution of any
 *		query plan
 *
 *		returns (AttrInfo*) which describes the attributes of the tuples to
 *		be returned by the query.
261 262 263 264
 *
 * ----------------------------------------------------------------
 */
void
265
ExecutorEnd(QueryDesc *queryDesc, EState *estate)
266
{
267 268
	/* sanity checks */
	Assert(queryDesc != NULL);
269

270
	EndPlan(queryDesc->plantree, estate);
271

272 273
	/* restore saved refcounts. */
	BufferRefCountRestore(estate->es_refcount);
274 275
}

276
void
277
ExecCheckPerms(CmdType operation,
278
			   int resultRelation,
279 280
			   List *rangeTable,
			   Query *parseTree)
281
{
282 283
	int			i = 1;
	Oid			relid;
284
	HeapTuple	htup;
285 286 287 288 289 290 291 292
	List	   *lp;
	List	   *qvars,
			   *tvars;
	int32		ok = 1,
				aclcheck_result = -1;
	char	   *opstr;
	NameData	rname;
	char	   *userName;
293 294 295 296 297 298 299

#define CHECK(MODE)		pg_aclcheck(rname.data, userName, MODE)

	userName = GetPgUserName();

	foreach(lp, rangeTable)
	{
300
		RangeTblEntry *rte = lfirst(lp);
301

M
Marc G. Fournier 已提交
302 303
		if (rte->skipAcl)
		{
304

M
Marc G. Fournier 已提交
305
			/*
306 307 308 309
			 * This happens if the access to this table is due to a view
			 * query rewriting - the rewrite handler checked the
			 * permissions against the view owner, so we just skip this
			 * entry.
M
Marc G. Fournier 已提交
310 311 312 313
			 */
			continue;
		}

314
		relid = rte->relid;
315
		htup = SearchSysCacheTuple(RELOID,
316 317
								   ObjectIdGetDatum(relid),
								   0, 0, 0);
318
		if (!HeapTupleIsValid(htup))
319
			elog(ERROR, "ExecCheckPerms: bogus RT relid: %d",
320
				 relid);
321
		StrNCpy(rname.data,
322
				((Form_pg_class) GETSTRUCT(htup))->relname.data,
323
				NAMEDATALEN);
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338
		if (i == resultRelation)
		{						/* this is the result relation */
			qvars = pull_varnos(parseTree->qual);
			tvars = pull_varnos((Node *) parseTree->targetList);
			if (intMember(resultRelation, qvars) ||
				intMember(resultRelation, tvars))
			{
				/* result relation is scanned */
				ok = ((aclcheck_result = CHECK(ACL_RD)) == ACLCHECK_OK);
				opstr = "read";
				if (!ok)
					break;
			}
			switch (operation)
			{
339 340 341 342 343 344 345 346 347 348 349
				case CMD_INSERT:
					ok = ((aclcheck_result = CHECK(ACL_AP)) == ACLCHECK_OK) ||
						((aclcheck_result = CHECK(ACL_WR)) == ACLCHECK_OK);
					opstr = "append";
					break;
				case CMD_DELETE:
				case CMD_UPDATE:
					ok = ((aclcheck_result = CHECK(ACL_WR)) == ACLCHECK_OK);
					opstr = "write";
					break;
				default:
350
					elog(ERROR, "ExecCheckPerms: bogus operation %d",
351
						 operation);
352 353 354 355 356 357 358
			}
		}
		else
		{
			ok = ((aclcheck_result = CHECK(ACL_RD)) == ACLCHECK_OK);
			opstr = "read";
		}
359
		if (!ok)
360 361
			break;
		++i;
362 363
	}
	if (!ok)
364
		elog(ERROR, "%s: %s", rname.data, aclcheck_error_strings[aclcheck_result]);
365 366
}

367 368 369 370 371 372 373
/* ===============================================================
 * ===============================================================
						 static routines follow
 * ===============================================================
 * ===============================================================
 */

374 375

/* ----------------------------------------------------------------
376 377 378 379
 *		InitPlan
 *
 *		Initializes the query plan: open files, allocate storage
 *		and start up the rule manager
380 381
 * ----------------------------------------------------------------
 */
382
static TupleDesc
383
InitPlan(CmdType operation, Query *parseTree, Plan *plan, EState *estate)
384
{
385 386 387
	List	   *rangeTable;
	int			resultRelation;
	Relation	intoRelationDesc;
388

389 390 391
	TupleDesc	tupType;
	List	   *targetList;
	int			len;
392

393
	/******************
394
	 *	get information from query descriptor
395
	 ******************
396
	 */
397 398
	rangeTable = parseTree->rtable;
	resultRelation = parseTree->resultRelation;
399

400
	/******************
401
	 *	initialize the node's execution state
402
	 ******************
403
	 */
404 405
	estate->es_range_table = rangeTable;

406
	/******************
407 408 409 410
	 *	initialize the BaseId counter so node base_id's
	 *	are assigned correctly.  Someday baseid's will have to
	 *	be stored someplace other than estate because they
	 *	should be unique per query planned.
411
	 ******************
412
	 */
413
	estate->es_BaseId = 1;
414

415
	/******************
416
	 *	initialize result relation stuff
417
	 ******************
418
	 */
419

420 421
	if (resultRelation != 0 && operation != CMD_SELECT)
	{
422
		/******************
423 424 425
		 *	  if we have a result relation, open it and

		 *	  initialize the result relation info stuff.
426
		 ******************
427
		 */
428 429 430 431 432
		RelationInfo *resultRelationInfo;
		Index		resultRelationIndex;
		RangeTblEntry *rtentry;
		Oid			resultRelationOid;
		Relation	resultRelationDesc;
433 434 435 436 437 438 439

		resultRelationIndex = resultRelation;
		rtentry = rt_fetch(resultRelationIndex, rangeTable);
		resultRelationOid = rtentry->relid;
		resultRelationDesc = heap_open(resultRelationOid);

		if (resultRelationDesc->rd_rel->relkind == RELKIND_SEQUENCE)
440
			elog(ERROR, "You can't change sequence relation %s",
441 442
				 resultRelationDesc->rd_rel->relname.data);

V
Vadim B. Mikheev 已提交
443
		/*
444 445 446 447 448
		 * Write-lock the result relation right away: if the relation is
		 * used in a subsequent scan, we won't have to elevate the
		 * read-lock set by heap_beginscan to a write-lock (needed by
		 * heap_insert, heap_delete and heap_replace). This will hopefully
		 * prevent some deadlocks.	- 01/24/94
V
Vadim B. Mikheev 已提交
449
		 */
450 451 452 453 454 455 456 457
		RelationSetLockForWrite(resultRelationDesc);

		resultRelationInfo = makeNode(RelationInfo);
		resultRelationInfo->ri_RangeTableIndex = resultRelationIndex;
		resultRelationInfo->ri_RelationDesc = resultRelationDesc;
		resultRelationInfo->ri_NumIndices = 0;
		resultRelationInfo->ri_IndexRelationDescs = NULL;
		resultRelationInfo->ri_IndexRelationInfo = NULL;
458

459
		/******************
460 461
		 *	open indices on result relation and save descriptors
		 *	in the result relation information..
462
		 ******************
463
		 */
464 465 466
		ExecOpenIndices(resultRelationOid, resultRelationInfo);

		estate->es_result_relation_info = resultRelationInfo;
467
	}
468 469
	else
	{
470
		/******************
471
		 *		if no result relation, then set state appropriately
472
		 ******************
473 474 475 476 477 478 479 480
		 */
		estate->es_result_relation_info = NULL;
	}

#ifndef NO_SECURITY
	ExecCheckPerms(operation, resultRelation, rangeTable, parseTree);
#endif

481
	/******************
482
	 *	  initialize the executor "tuple" table.
483
	 ******************
484 485
	 */
	{
486 487
		int			nSlots = ExecCountSlotsNode(plan);
		TupleTable	tupleTable = ExecCreateTupleTable(nSlots + 10);		/* why add ten? - jolly */
488

489 490
		estate->es_tupleTable = tupleTable;
	}
491

492
	/******************
493 494 495 496
	 *	   initialize the private state information for
	 *	   all the nodes in the query tree.  This opens
	 *	   files, allocates storage and leaves us ready
	 *	   to start processing tuples..
497
	 ******************
498 499 500
	 */
	ExecInitNode(plan, estate, NULL);

501
	/******************
502 503 504
	 *	   get the tuple descriptor describing the type
	 *	   of tuples to return.. (this is especially important
	 *	   if we are creating a relation with "retrieve into")
505
	 ******************
506 507 508 509 510
	 */
	tupType = ExecGetTupType(plan);		/* tuple descriptor */
	targetList = plan->targetlist;
	len = ExecTargetListLength(targetList);		/* number of attributes */

511
	/******************
512 513 514 515 516 517
	 *	  now that we have the target list, initialize the junk filter
	 *	  if this is a REPLACE or a DELETE query.
	 *	  We also init the junk filter if this is an append query
	 *	  (there might be some rule lock info there...)
	 *	  NOTE: in the future we might want to initialize the junk
	 *	  filter for all queries.
518
	 ******************
519 520
	 *		  SELECT added by daveh@insightdist.com  5/20/98 to allow
	 *		  ORDER/GROUP BY have an identifier missing from the target.
521 522
	 */
	{
523 524 525
		bool		junk_filter_needed = false;
		List	   *tlist;

526
		if (operation == CMD_SELECT)
527 528 529
		{
			foreach(tlist, targetList)
			{
530 531
				TargetEntry *tle = lfirst(tlist);

532 533 534 535 536 537 538 539 540 541 542 543 544
				if (tle->resdom->resjunk)
				{
					junk_filter_needed = true;
					break;
				}
			}
		}

		if (operation == CMD_UPDATE || operation == CMD_DELETE ||
			operation == CMD_INSERT ||
			(operation == CMD_SELECT && junk_filter_needed))
		{
			JunkFilter *j = (JunkFilter *) ExecInitJunkFilter(targetList);
545

546
			estate->es_junkFilter = j;
547

548 549 550 551 552 553
			if (operation == CMD_SELECT)
				tupType = j->jf_cleanTupType;
		}
		else
			estate->es_junkFilter = NULL;
	}
554

555
	/******************
556
	 *	initialize the "into" relation
557
	 ******************
558 559 560 561 562
	 */
	intoRelationDesc = (Relation) NULL;

	if (operation == CMD_SELECT)
	{
563 564 565
		char	   *intoName;
		Oid			intoRelationId;
		TupleDesc	tupdesc;
566 567 568 569 570 571 572 573 574

		if (!parseTree->isPortal)
		{

			/*
			 * a select into table
			 */
			if (parseTree->into != NULL)
			{
575
				/******************
576
				 *	create the "into" relation
577
				 ******************
578 579 580 581 582 583 584 585
				 */
				intoName = parseTree->into;

				/*
				 * have to copy tupType to get rid of constraints
				 */
				tupdesc = CreateTupleDescCopy(tupType);

586
				intoRelationId = heap_create_with_catalog(intoName,
587
											  tupdesc, RELKIND_RELATION);
588

589 590
				FreeTupleDesc(tupdesc);

591
				/******************
592 593 594
				 *	XXX rather than having to call setheapoverride(true)
				 *		and then back to false, we should change the
				 *		arguments to heap_open() instead..
595
				 ******************
596 597 598 599 600 601 602 603 604 605 606 607 608
				 */
				setheapoverride(true);

				intoRelationDesc = heap_open(intoRelationId);

				setheapoverride(false);
			}
		}
	}

	estate->es_into_relation_descriptor = intoRelationDesc;

	return tupType;
609 610 611
}

/* ----------------------------------------------------------------
612 613 614
 *		EndPlan
 *
 *		Cleans up the query plan -- closes files and free up storages
615 616 617
 * ----------------------------------------------------------------
 */
static void
618
EndPlan(Plan *plan, EState *estate)
619
{
620 621
	RelationInfo *resultRelationInfo;
	Relation	intoRelationDesc;
622

623
	/******************
624
	 *	get information from state
625
	 ******************
626
	 */
627 628 629
	resultRelationInfo = estate->es_result_relation_info;
	intoRelationDesc = estate->es_into_relation_descriptor;

630
	/******************
631
	 *	 shut down the query
632
	 ******************
633 634 635
	 */
	ExecEndNode(plan, plan);

636
	/******************
637
	 *	  destroy the executor "tuple" table.
638
	 ******************
639 640
	 */
	{
641
		TupleTable	tupleTable = (TupleTable) estate->es_tupleTable;
642 643 644 645 646

		ExecDestroyTupleTable(tupleTable, true);		/* was missing last arg */
		estate->es_tupleTable = NULL;
	}

647
	/******************
648
	 *	 close the result relations if necessary
649
	 ******************
650 651 652
	 */
	if (resultRelationInfo != NULL)
	{
653
		Relation	resultRelationDesc;
654 655 656 657

		resultRelationDesc = resultRelationInfo->ri_RelationDesc;
		heap_close(resultRelationDesc);

658
		/******************
659
		 *	close indices on the result relation
660
		 ******************
661 662 663 664
		 */
		ExecCloseIndices(resultRelationInfo);
	}

665
	/******************
666
	 *	 close the "into" relation if necessary
667
	 ******************
668 669 670
	 */
	if (intoRelationDesc != NULL)
		heap_close(intoRelationDesc);
671 672 673
}

/* ----------------------------------------------------------------
674 675 676 677 678 679 680 681
 *		ExecutePlan
 *
 *		processes the query plan to retrieve 'tupleCount' tuples in the
 *		direction specified.
 *		Retrieves all tuples if tupleCount is 0
 *
 *		result is either a slot containing a tuple in the case
 *		of a RETRIEVE or NULL otherwise.
682 683 684 685 686 687 688 689
 *
 * ----------------------------------------------------------------
 */

/* the ctid attribute is a 'junk' attribute that is removed before the
   user can see it*/

static TupleTableSlot *
690 691 692
ExecutePlan(EState *estate,
			Plan *plan,
			Query *parseTree,
693 694 695 696
			CmdType operation,
			int numberTuples,
			ScanDirection direction,
			void (*printfunc) ())
697
{
698
	JunkFilter *junkfilter;
699 700

	TupleTableSlot *slot;
701
	ItemPointer tupleid = NULL;
702
	ItemPointerData tuple_ctid;
703
	int			current_tuple_count;
704 705
	TupleTableSlot *result;

706
	/******************
707
	 *	initialize local variables
708
	 ******************
709
	 */
710 711 712 713
	slot = NULL;
	current_tuple_count = 0;
	result = NULL;

714
	/******************
715
	 *	Set the direction.
716
	 ******************
717
	 */
718 719
	estate->es_direction = direction;

720
	/******************
721 722
	 *	Loop until we've processed the proper number
	 *	of tuples from the plan..
723
	 ******************
724 725 726 727
	 */

	for (;;)
	{
728 729 730 731 732 733
		/******************
		 *	Execute the plan and obtain a tuple
		 ******************
		 */
		/* at the top level, the parent of a plan (2nd arg) is itself */
		slot = ExecProcNode(plan, plan);
734

735 736 737 738 739 740 741 742 743 744
		/******************
		 *	if the tuple is null, then we assume
		 *	there is nothing more to process so
		 *	we just return null...
		 ******************
		 */
		if (TupIsNull(slot))
		{
			result = NULL;
			break;
745 746
		}

747
		/******************
748 749 750 751 752 753
		 *		if we have a junk filter, then project a new
		 *		tuple with the junk removed.
		 *
		 *		Store this new "clean" tuple in the place of the
		 *		original tuple.
		 *
754 755
		 *		Also, extract all the junk information we need.
		 ******************
756 757 758
		 */
		if ((junkfilter = estate->es_junkFilter) != (JunkFilter *) NULL)
		{
759
			Datum		datum;
760 761

/*			NameData	attrName; */
762 763
			HeapTuple	newTuple;
			bool		isNull;
764

765
			/******************
766
			 * extract the 'ctid' junk attribute.
767
			 ******************
768 769 770 771 772 773 774 775
			 */
			if (operation == CMD_UPDATE || operation == CMD_DELETE)
			{
				if (!ExecGetJunkAttribute(junkfilter,
										  slot,
										  "ctid",
										  &datum,
										  &isNull))
776
					elog(ERROR, "ExecutePlan: NO (junk) `ctid' was found!");
777 778

				if (isNull)
779
					elog(ERROR, "ExecutePlan: (junk) `ctid' is NULL!");
780 781 782 783 784 785 786

				tupleid = (ItemPointer) DatumGetPointer(datum);
				tuple_ctid = *tupleid;	/* make sure we don't free the
										 * ctid!! */
				tupleid = &tuple_ctid;
			}

787
			/******************
788 789
			 * Finally create a new "clean" tuple with all junk attributes
			 * removed
790
			 ******************
791 792 793 794 795 796 797 798 799 800
			 */
			newTuple = ExecRemoveJunk(junkfilter, slot);

			slot = ExecStoreTuple(newTuple,		/* tuple to store */
								  slot, /* destination slot */
								  InvalidBuffer,		/* this tuple has no
														 * buffer */
								  true);		/* tuple should be pfreed */
		}						/* if (junkfilter... */

801
		/******************
802 803 804 805
		 *		now that we have a tuple, do the appropriate thing
		 *		with it.. either return it to the user, add
		 *		it to a relation someplace, delete it from a
		 *		relation, or modify some of it's attributes.
806
		 ******************
807 808 809 810
		 */

		switch (operation)
		{
811 812 813 814 815 816
			case CMD_SELECT:
				ExecRetrieve(slot,		/* slot containing tuple */
							 printfunc, /* print function */
							 estate);	/* */
				result = slot;
				break;
817

818 819 820 821
			case CMD_INSERT:
				ExecAppend(slot, tupleid, estate);
				result = NULL;
				break;
822

823 824 825 826
			case CMD_DELETE:
				ExecDelete(slot, tupleid, estate);
				result = NULL;
				break;
827

828 829 830 831
			case CMD_UPDATE:
				ExecReplace(slot, tupleid, estate, parseTree);
				result = NULL;
				break;
832

833 834
			default:
				elog(DEBUG, "ExecutePlan: unknown operation in queryDesc");
835
				result = NULL;
836
				break;
837
		}
838
		/******************
839 840 841
		 *		check our tuple count.. if we've returned the
		 *		proper number then return, else loop again and
		 *		process more tuples..
842
		 ******************
843 844 845 846
		 */
		current_tuple_count += 1;
		if (numberTuples == current_tuple_count)
			break;
847
	}
848

849
	/******************
850 851
	 *	here, result is either a slot containing a tuple in the case
	 *	of a RETRIEVE or NULL otherwise.
852
	 ******************
853
	 */
854
	return result;
855 856 857
}

/* ----------------------------------------------------------------
858
 *		ExecRetrieve
859
 *
860 861 862 863 864
 *		RETRIEVEs are easy.. we just pass the tuple to the appropriate
 *		print function.  The only complexity is when we do a
 *		"retrieve into", in which case we insert the tuple into
 *		the appropriate relation (note: this is a newly created relation
 *		so we don't need to worry about indices or locks.)
865 866 867
 * ----------------------------------------------------------------
 */
static void
868
ExecRetrieve(TupleTableSlot *slot,
869
			 void (*printfunc) (),
870
			 EState *estate)
871
{
872 873
	HeapTuple	tuple;
	TupleDesc	attrtype;
874

875
	/******************
876
	 *	get the heap tuple out of the tuple table slot
877
	 ******************
878 879 880 881
	 */
	tuple = slot->val;
	attrtype = slot->ttc_tupleDescriptor;

882
	/******************
883
	 *	insert the tuple into the "into relation"
884
	 ******************
885 886 887 888 889 890 891
	 */
	if (estate->es_into_relation_descriptor != NULL)
	{
		heap_insert(estate->es_into_relation_descriptor, tuple);
		IncrAppended();
	}

892
	/******************
893
	 *	send the tuple to the front end (or the screen)
894
	 ******************
895 896 897 898
	 */
	(*printfunc) (tuple, attrtype);
	IncrRetrieved();
	(estate->es_processed)++;
899 900 901
}

/* ----------------------------------------------------------------
902
 *		ExecAppend
903
 *
904 905 906
 *		APPENDs are trickier.. we have to insert the tuple into
 *		the base relation and insert appropriate tuples into the
 *		index relations.
907 908 909 910
 * ----------------------------------------------------------------
 */

static void
911
ExecAppend(TupleTableSlot *slot,
912
		   ItemPointer tupleid,
913
		   EState *estate)
914
{
915 916 917 918 919
	HeapTuple	tuple;
	RelationInfo *resultRelationInfo;
	Relation	resultRelationDesc;
	int			numIndices;
	Oid			newId;
920

921
	/******************
922
	 *	get the heap tuple out of the tuple table slot
923
	 ******************
924 925 926
	 */
	tuple = slot->val;

927
	/******************
928
	 *	get information on the result relation
929
	 ******************
930 931 932 933
	 */
	resultRelationInfo = estate->es_result_relation_info;
	resultRelationDesc = resultRelationInfo->ri_RelationDesc;

934
	/******************
935 936
	 *	have to add code to preform unique checking here.
	 *	cim -12/1/89
937
	 ******************
938 939 940 941 942 943
	 */

	/* BEFORE ROW INSERT Triggers */
	if (resultRelationDesc->trigdesc &&
	resultRelationDesc->trigdesc->n_before_row[TRIGGER_EVENT_INSERT] > 0)
	{
944
		HeapTuple	newtuple;
945 946 947 948 949 950 951 952 953 954 955 956 957 958

		newtuple = ExecBRInsertTriggers(resultRelationDesc, tuple);

		if (newtuple == NULL)	/* "do nothing" */
			return;

		if (newtuple != tuple)	/* modified by Trigger(s) */
		{
			Assert(slot->ttc_shouldFree);
			pfree(tuple);
			slot->val = tuple = newtuple;
		}
	}

959
	/******************
960
	 * Check the constraints of a tuple
961
	 ******************
962 963 964 965
	 */

	if (resultRelationDesc->rd_att->constr)
	{
966
		ExecConstraints("ExecAppend", resultRelationDesc, tuple);
967 968
	}

969
	/******************
970
	 *	insert the tuple
971
	 ******************
972 973 974 975 976
	 */
	newId = heap_insert(resultRelationDesc,		/* relation desc */
						tuple); /* heap tuple */
	IncrAppended();

977
	/******************
978 979 980 981 982
	 *	process indices
	 *
	 *	Note: heap_insert adds a new tuple to a relation.  As a side
	 *	effect, the tupleid of the new tuple is placed in the new
	 *	tuple's t_ctid field.
983
	 ******************
984 985 986
	 */
	numIndices = resultRelationInfo->ri_NumIndices;
	if (numIndices > 0)
987
		ExecInsertIndexTuples(slot, &(tuple->t_self), estate, false);
988 989 990 991 992 993 994
	(estate->es_processed)++;
	estate->es_lastoid = newId;

	/* AFTER ROW INSERT Triggers */
	if (resultRelationDesc->trigdesc &&
	 resultRelationDesc->trigdesc->n_after_row[TRIGGER_EVENT_INSERT] > 0)
		ExecARInsertTriggers(resultRelationDesc, tuple);
995 996 997
}

/* ----------------------------------------------------------------
998
 *		ExecDelete
999
 *
1000 1001
 *		DELETE is like append, we delete the tuple and its
 *		index tuples.
1002 1003 1004
 * ----------------------------------------------------------------
 */
static void
1005
ExecDelete(TupleTableSlot *slot,
1006
		   ItemPointer tupleid,
1007
		   EState *estate)
1008
{
1009 1010
	RelationInfo *resultRelationInfo;
	Relation	resultRelationDesc;
1011

1012
	/******************
1013
	 *	get the result relation information
1014
	 ******************
1015 1016 1017 1018 1019 1020 1021 1022
	 */
	resultRelationInfo = estate->es_result_relation_info;
	resultRelationDesc = resultRelationInfo->ri_RelationDesc;

	/* BEFORE ROW DELETE Triggers */
	if (resultRelationDesc->trigdesc &&
	resultRelationDesc->trigdesc->n_before_row[TRIGGER_EVENT_DELETE] > 0)
	{
1023
		bool		dodelete;
1024 1025 1026 1027 1028 1029 1030

		dodelete = ExecBRDeleteTriggers(resultRelationDesc, tupleid);

		if (!dodelete)			/* "do nothing" */
			return;
	}

1031
	/******************
1032
	 *	delete the tuple
1033
	 ******************
1034 1035 1036 1037 1038 1039 1040 1041
	 */
	if (heap_delete(resultRelationDesc, /* relation desc */
					tupleid))	/* item pointer to tuple */
		return;

	IncrDeleted();
	(estate->es_processed)++;

1042
	/******************
1043 1044 1045 1046 1047 1048 1049 1050
	 *	Note: Normally one would think that we have to
	 *		  delete index tuples associated with the
	 *		  heap tuple now..
	 *
	 *		  ... but in POSTGRES, we have no need to do this
	 *		  because the vacuum daemon automatically
	 *		  opens an index scan and deletes index tuples
	 *		  when it finds deleted heap tuples. -cim 9/27/89
1051
	 ******************
1052 1053 1054 1055 1056 1057
	 */

	/* AFTER ROW DELETE Triggers */
	if (resultRelationDesc->trigdesc &&
	 resultRelationDesc->trigdesc->n_after_row[TRIGGER_EVENT_DELETE] > 0)
		ExecARDeleteTriggers(resultRelationDesc, tupleid);
1058 1059 1060 1061

}

/* ----------------------------------------------------------------
1062
 *		ExecReplace
1063
 *
1064 1065 1066 1067 1068 1069
 *		note: we can't run replace queries with transactions
 *		off because replaces are actually appends and our
 *		scan will mistakenly loop forever, replacing the tuple
 *		it just appended..	This should be fixed but until it
 *		is, we don't want to get stuck in an infinite loop
 *		which corrupts your database..
1070 1071 1072
 * ----------------------------------------------------------------
 */
static void
1073
ExecReplace(TupleTableSlot *slot,
1074
			ItemPointer tupleid,
1075 1076
			EState *estate,
			Query *parseTree)
1077
{
1078 1079 1080 1081
	HeapTuple	tuple;
	RelationInfo *resultRelationInfo;
	Relation	resultRelationDesc;
	int			numIndices;
1082

1083
	/******************
1084
	 *	abort the operation if not running transactions
1085
	 ******************
1086 1087 1088 1089 1090 1091 1092
	 */
	if (IsBootstrapProcessingMode())
	{
		elog(DEBUG, "ExecReplace: replace can't run without transactions");
		return;
	}

1093
	/******************
1094
	 *	get the heap tuple out of the tuple table slot
1095
	 ******************
1096 1097 1098
	 */
	tuple = slot->val;

1099
	/******************
1100
	 *	get the result relation information
1101
	 ******************
1102 1103 1104 1105
	 */
	resultRelationInfo = estate->es_result_relation_info;
	resultRelationDesc = resultRelationInfo->ri_RelationDesc;

1106
	/******************
1107 1108 1109 1110
	 *	have to add code to preform unique checking here.
	 *	in the event of unique tuples, this becomes a deletion
	 *	of the original tuple affected by the replace.
	 *	cim -12/1/89
1111
	 ******************
1112 1113 1114 1115 1116 1117
	 */

	/* BEFORE ROW UPDATE Triggers */
	if (resultRelationDesc->trigdesc &&
	resultRelationDesc->trigdesc->n_before_row[TRIGGER_EVENT_UPDATE] > 0)
	{
1118
		HeapTuple	newtuple;
1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132

		newtuple = ExecBRUpdateTriggers(resultRelationDesc, tupleid, tuple);

		if (newtuple == NULL)	/* "do nothing" */
			return;

		if (newtuple != tuple)	/* modified by Trigger(s) */
		{
			Assert(slot->ttc_shouldFree);
			pfree(tuple);
			slot->val = tuple = newtuple;
		}
	}

1133
	/******************
1134
	 * Check the constraints of a tuple
1135
	 ******************
1136 1137 1138 1139
	 */

	if (resultRelationDesc->rd_att->constr)
	{
1140
		ExecConstraints("ExecReplace", resultRelationDesc, tuple);
1141 1142
	}

1143
	/******************
1144 1145 1146 1147 1148
	 *	replace the heap tuple
	 *
	 * Don't want to continue if our heap_replace didn't actually
	 * do a replace. This would be the case if heap_replace
	 * detected a non-functional update. -kw 12/30/93
1149
	 ******************
1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160
	 */
	if (heap_replace(resultRelationDesc,		/* relation desc */
					 tupleid,	/* item ptr of tuple to replace */
					 tuple))
	{							/* replacement heap tuple */
		return;
	}

	IncrReplaced();
	(estate->es_processed)++;

1161
	/******************
1162 1163 1164 1165 1166 1167 1168
	 *	Note: instead of having to update the old index tuples
	 *		  associated with the heap tuple, all we do is form
	 *		  and insert new index tuples..  This is because
	 *		  replaces are actually deletes and inserts and
	 *		  index tuple deletion is done automagically by
	 *		  the vaccuum deamon.. All we do is insert new
	 *		  index tuples.  -cim 9/27/89
1169
	 ******************
1170 1171
	 */

1172
	/******************
1173 1174 1175 1176 1177 1178 1179
	 *	process indices
	 *
	 *	heap_replace updates a tuple in the base relation by invalidating
	 *	it and then appending a new tuple to the relation.	As a side
	 *	effect, the tupleid of the new tuple is placed in the new
	 *	tuple's t_ctid field.  So we now insert index tuples using
	 *	the new tupleid stored there.
1180
	 ******************
1181 1182 1183 1184
	 */

	numIndices = resultRelationInfo->ri_NumIndices;
	if (numIndices > 0)
1185
		ExecInsertIndexTuples(slot, &(tuple->t_self), estate, true);
1186 1187 1188 1189 1190

	/* AFTER ROW UPDATE Triggers */
	if (resultRelationDesc->trigdesc &&
	 resultRelationDesc->trigdesc->n_after_row[TRIGGER_EVENT_UPDATE] > 0)
		ExecARUpdateTriggers(resultRelationDesc, tupleid, tuple);
1191
}
V
Vadim B. Mikheev 已提交
1192

1193
#if 0
1194
static HeapTuple
1195
ExecAttrDefault(Relation rel, HeapTuple tuple)
V
Vadim B. Mikheev 已提交
1196
{
1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208
	int			ndef = rel->rd_att->constr->num_defval;
	AttrDefault *attrdef = rel->rd_att->constr->defval;
	ExprContext *econtext = makeNode(ExprContext);
	HeapTuple	newtuple;
	Node	   *expr;
	bool		isnull;
	bool		isdone;
	Datum		val;
	Datum	   *replValue = NULL;
	char	   *replNull = NULL;
	char	   *repl = NULL;
	int			i;
1209 1210 1211 1212 1213 1214

	econtext->ecxt_scantuple = NULL;	/* scan tuple slot */
	econtext->ecxt_innertuple = NULL;	/* inner tuple slot */
	econtext->ecxt_outertuple = NULL;	/* outer tuple slot */
	econtext->ecxt_relation = NULL;		/* relation */
	econtext->ecxt_relid = 0;	/* relid */
1215 1216
	econtext->ecxt_param_list_info = NULL;		/* param list info */
	econtext->ecxt_param_exec_vals = NULL;		/* exec param values */
1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235
	econtext->ecxt_range_table = NULL;	/* range table */
	for (i = 0; i < ndef; i++)
	{
		if (!heap_attisnull(tuple, attrdef[i].adnum))
			continue;
		expr = (Node *) stringToNode(attrdef[i].adbin);

		val = ExecEvalExpr(expr, econtext, &isnull, &isdone);

		pfree(expr);

		if (isnull)
			continue;

		if (repl == NULL)
		{
			repl = (char *) palloc(rel->rd_att->natts * sizeof(char));
			replNull = (char *) palloc(rel->rd_att->natts * sizeof(char));
			replValue = (Datum *) palloc(rel->rd_att->natts * sizeof(Datum));
B
Bruce Momjian 已提交
1236
			MemSet(repl, ' ', rel->rd_att->natts * sizeof(char));
1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247
		}

		repl[attrdef[i].adnum - 1] = 'r';
		replNull[attrdef[i].adnum - 1] = ' ';
		replValue[attrdef[i].adnum - 1] = val;

	}

	pfree(econtext);

	if (repl == NULL)
1248
		return tuple;
1249

1250
	newtuple = heap_modifytuple(tuple, rel, replValue, replNull, repl);
1251 1252

	pfree(repl);
1253
	pfree(tuple);
1254 1255 1256
	pfree(replNull);
	pfree(replValue);

1257
	return newtuple;
1258

V
Vadim B. Mikheev 已提交
1259
}
1260

1261
#endif
V
Vadim B. Mikheev 已提交
1262

1263
static char *
1264
ExecRelCheck(Relation rel, HeapTuple tuple)
V
Vadim B. Mikheev 已提交
1265
{
1266 1267 1268
	int			ncheck = rel->rd_att->constr->num_check;
	ConstrCheck *check = rel->rd_att->constr->check;
	ExprContext *econtext = makeNode(ExprContext);
1269
	TupleTableSlot *slot = makeNode(TupleTableSlot);
1270 1271 1272 1273 1274
	RangeTblEntry *rte = makeNode(RangeTblEntry);
	List	   *rtlist;
	List	   *qual;
	bool		res;
	int			i;
1275 1276 1277 1278 1279 1280 1281 1282 1283

	slot->val = tuple;
	slot->ttc_shouldFree = false;
	slot->ttc_descIsNew = true;
	slot->ttc_tupleDescriptor = rel->rd_att;
	slot->ttc_buffer = InvalidBuffer;
	slot->ttc_whichplan = -1;
	rte->relname = nameout(&(rel->rd_rel->relname));
	rte->refname = rte->relname;
1284
	rte->relid = RelationGetRelid(rel);
1285 1286 1287 1288 1289 1290 1291 1292 1293
	rte->inh = false;
	rte->inFromCl = true;
	rtlist = lcons(rte, NIL);
	econtext->ecxt_scantuple = slot;	/* scan tuple slot */
	econtext->ecxt_innertuple = NULL;	/* inner tuple slot */
	econtext->ecxt_outertuple = NULL;	/* outer tuple slot */
	econtext->ecxt_relation = rel;		/* relation */
	econtext->ecxt_relid = 0;	/* relid */
	econtext->ecxt_param_list_info = NULL;		/* param list info */
V
Vadim B. Mikheev 已提交
1294
	econtext->ecxt_param_exec_vals = NULL;		/* exec param values */
1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305
	econtext->ecxt_range_table = rtlist;		/* range table */

	for (i = 0; i < ncheck; i++)
	{
		qual = (List *) stringToNode(check[i].ccbin);

		res = ExecQual(qual, econtext);

		pfree(qual);

		if (!res)
1306
			return check[i].ccname;
1307 1308 1309 1310 1311 1312 1313 1314
	}

	pfree(slot);
	pfree(rte->relname);
	pfree(rte);
	pfree(rtlist);
	pfree(econtext);

1315
	return (char *) NULL;
1316

V
Vadim B. Mikheev 已提交
1317 1318
}

1319
void
1320
ExecConstraints(char *caller, Relation rel, HeapTuple tuple)
V
Vadim B. Mikheev 已提交
1321
{
1322 1323 1324 1325

	Assert(rel->rd_att->constr);

	if (rel->rd_att->constr->has_not_null)
V
Vadim B. Mikheev 已提交
1326
	{
1327
		int			attrChk;
1328 1329 1330 1331

		for (attrChk = 1; attrChk <= rel->rd_att->natts; attrChk++)
		{
			if (rel->rd_att->attrs[attrChk - 1]->attnotnull && heap_attisnull(tuple, attrChk))
1332
				elog(ERROR, "%s: Fail to add null value in not null attribute %s",
1333 1334 1335 1336 1337 1338
				  caller, rel->rd_att->attrs[attrChk - 1]->attname.data);
		}
	}

	if (rel->rd_att->constr->num_check > 0)
	{
1339
		char	   *failed;
1340 1341

		if ((failed = ExecRelCheck(rel, tuple)) != NULL)
1342
			elog(ERROR, "%s: rejected due to CHECK constraint %s", caller, failed);
1343 1344
	}

1345
	return;
V
Vadim B. Mikheev 已提交
1346
}