execMain.c 34.6 KB
Newer Older
1 2 3
/*-------------------------------------------------------------------------
 *
 * execMain.c--
4
 *	  top level executor interface routines
5 6
 *
 * INTERFACE ROUTINES
7 8 9
 *	ExecutorStart()
 *	ExecutorRun()
 *	ExecutorEnd()
10
 *
11 12 13 14 15 16 17 18 19 20 21 22 23
 *	The old ExecutorMain() has been replaced by ExecutorStart(),
 *	ExecutorRun() and ExecutorEnd()
 *
 *	These three procedures are the external interfaces to the executor.
 *	In each case, the query descriptor and the execution state is required
 *	 as arguments
 *
 *	ExecutorStart() must be called at the beginning of any execution of any
 *	query plan and ExecutorEnd() should always be called at the end of
 *	execution of a plan.
 *
 *	ExecutorRun accepts 'feature' and 'count' arguments that specify whether
 *	the plan is to be executed forwards, backwards, and for how many tuples.
24 25 26 27 28
 *
 * Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
29
 *	  $Header: /cvsroot/pgsql/src/backend/executor/execMain.c,v 1.56 1998/09/25 13:38:30 thomas Exp $
30 31 32
 *
 *-------------------------------------------------------------------------
 */
33
#include <string.h>
34
#include "postgres.h"
35
#include "miscadmin.h"
36

37
#include "executor/executor.h"
38 39
#include "executor/execdefs.h"
#include "executor/execdebug.h"
40
#include "executor/nodeIndexscan.h"
41 42 43
#include "utils/builtins.h"
#include "utils/palloc.h"
#include "utils/acl.h"
44
#include "utils/syscache.h"
45
#include "utils/tqual.h"
46
#include "parser/parsetree.h"	/* rt_fetch() */
47
#include "storage/bufmgr.h"
M
Marc G. Fournier 已提交
48
#include "storage/lmgr.h"
49
#include "storage/smgr.h"
50 51 52
#include "commands/async.h"
/* #include "access/localam.h" */
#include "optimizer/var.h"
53 54
#include "access/heapam.h"
#include "catalog/heap.h"
55
#include "commands/trigger.h"
56

57
void ExecCheckPerms(CmdType operation, int resultRelation, List *rangeTable,
58
			   Query *parseTree);
59 60 61


/* decls for local routines only used within this module */
62
static TupleDesc InitPlan(CmdType operation, Query *parseTree,
63 64
		 Plan *plan, EState *estate);
static void EndPlan(Plan *plan, EState *estate);
65
static TupleTableSlot *ExecutePlan(EState *estate, Plan *plan,
66
			Query *parseTree, CmdType operation,
67 68
			int numberTuples, ScanDirection direction,
			void (*printfunc) ());
69 70
static void ExecRetrieve(TupleTableSlot *slot, void (*printfunc) (),
									 EState *estate);
71
static void ExecAppend(TupleTableSlot *slot, ItemPointer tupleid,
72
		   EState *estate);
73
static void ExecDelete(TupleTableSlot *slot, ItemPointer tupleid,
74
		   EState *estate);
75
static void ExecReplace(TupleTableSlot *slot, ItemPointer tupleid,
76
			EState *estate, Query *parseTree);
77 78 79

/* end of local decls */

M
Marc G. Fournier 已提交
80
#ifdef QUERY_LIMIT
81
static int	queryLimit = ALL_TUPLES;
82

M
Marc G. Fournier 已提交
83 84 85
#undef ALL_TUPLES
#define ALL_TUPLES queryLimit

86
int			ExecutorLimit(int limit);
87

M
Marc G. Fournier 已提交
88 89 90
int
ExecutorLimit(int limit)
{
91
	return queryLimit = limit;
M
Marc G. Fournier 已提交
92
}
93

94
#endif
M
Marc G. Fournier 已提交
95

96
/* ----------------------------------------------------------------
97 98 99 100 101 102 103
 *		ExecutorStart
 *
 *		This routine must be called at the beginning of any execution of any
 *		query plan
 *
 *		returns (AttrInfo*) which describes the attributes of the tuples to
 *		be returned by the query.
104 105 106 107
 *
 * ----------------------------------------------------------------
 */
TupleDesc
108
ExecutorStart(QueryDesc *queryDesc, EState *estate)
109
{
110
	TupleDesc	result;
111 112 113

	/* sanity checks */
	Assert(queryDesc != NULL);
114

V
Vadim B. Mikheev 已提交
115 116
	if (queryDesc->plantree->nParamExec > 0)
	{
117 118 119
		estate->es_param_exec_vals = (ParamExecData *)
			palloc(queryDesc->plantree->nParamExec * sizeof(ParamExecData));
		memset(estate->es_param_exec_vals, 0, queryDesc->plantree->nParamExec * sizeof(ParamExecData));
V
Vadim B. Mikheev 已提交
120
	}
121

122
	estate->es_snapshot = SnapshotNow;
123

124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
	result = InitPlan(queryDesc->operation,
					  queryDesc->parsetree,
					  queryDesc->plantree,
					  estate);

	/*
	 * reset buffer refcount.  the current refcounts are saved and will be
	 * restored when ExecutorEnd is called
	 *
	 * this makes sure that when ExecutorRun's are called recursively as for
	 * postquel functions, the buffers pinned by one ExecutorRun will not
	 * be unpinned by another ExecutorRun.
	 */
	BufferRefCountReset(estate->es_refcount);

	return result;
140 141 142
}

/* ----------------------------------------------------------------
143 144 145 146 147 148 149
 *		ExecutorRun
 *
 *		This is the main routine of the executor module. It accepts
 *		the query descriptor from the traffic cop and executes the
 *		query plan.
 *
 *		ExecutorStart must have been called already.
150
 *
151 152 153 154 155 156
 *		the different features supported are:
 *			 EXEC_RUN:	retrieve all tuples in the forward direction
 *			 EXEC_FOR:	retrieve 'count' number of tuples in the forward dir
 *			 EXEC_BACK: retrieve 'count' number of tuples in the backward dir
 *			 EXEC_RETONE: return one tuple but don't 'retrieve' it
 *						   used in postquel function processing
157 158 159 160
 *
 *
 * ----------------------------------------------------------------
 */
161
TupleTableSlot *
162
ExecutorRun(QueryDesc *queryDesc, EState *estate, int feature, int count)
163
{
164 165 166
	CmdType		operation;
	Query	   *parseTree;
	Plan	   *plan;
167
	TupleTableSlot *result;
168 169
	CommandDest dest;
	void		(*destination) ();
170

171
	/******************
172
	 *	sanity checks
173
	 ******************
174
	 */
175 176
	Assert(queryDesc != NULL);

177
	/******************
178 179
	 *	extract information from the query descriptor
	 *	and the query feature.
180
	 ******************
181
	 */
182 183 184 185 186 187 188 189 190 191 192
	operation = queryDesc->operation;
	parseTree = queryDesc->parsetree;
	plan = queryDesc->plantree;
	dest = queryDesc->dest;
	destination = (void (*) ()) DestToFunction(dest);
	estate->es_processed = 0;
	estate->es_lastoid = InvalidOid;

	switch (feature)
	{

193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
		case EXEC_RUN:
			result = ExecutePlan(estate,
								 plan,
								 parseTree,
								 operation,
								 ALL_TUPLES,
								 ForwardScanDirection,
								 destination);
			break;
		case EXEC_FOR:
			result = ExecutePlan(estate,
								 plan,
								 parseTree,
								 operation,
								 count,
								 ForwardScanDirection,
								 destination);
			break;
211

212
			/******************
213
			 *		retrieve next n "backward" tuples
214
			 ******************
215 216 217 218 219 220 221 222 223 224
			 */
		case EXEC_BACK:
			result = ExecutePlan(estate,
								 plan,
								 parseTree,
								 operation,
								 count,
								 BackwardScanDirection,
								 destination);
			break;
225

226
			/******************
227 228
			 *		return one tuple but don't "retrieve" it.
			 *		(this is used by the rule manager..) -cim 9/14/89
229
			 ******************
230 231 232 233 234 235 236 237 238 239 240 241 242 243
			 */
		case EXEC_RETONE:
			result = ExecutePlan(estate,
								 plan,
								 parseTree,
								 operation,
								 ONE_TUPLE,
								 ForwardScanDirection,
								 destination);
			break;
		default:
			result = NULL;
			elog(DEBUG, "ExecutorRun: Unknown feature %d", feature);
			break;
244 245 246
	}

	return result;
247 248 249
}

/* ----------------------------------------------------------------
250 251 252 253 254 255 256
 *		ExecutorEnd
 *
 *		This routine must be called at the end of any execution of any
 *		query plan
 *
 *		returns (AttrInfo*) which describes the attributes of the tuples to
 *		be returned by the query.
257 258 259 260
 *
 * ----------------------------------------------------------------
 */
void
261
ExecutorEnd(QueryDesc *queryDesc, EState *estate)
262
{
263 264
	/* sanity checks */
	Assert(queryDesc != NULL);
265

266
	EndPlan(queryDesc->plantree, estate);
267

268 269
	/* restore saved refcounts. */
	BufferRefCountRestore(estate->es_refcount);
270 271
}

272
void
273
ExecCheckPerms(CmdType operation,
274
			   int resultRelation,
275 276
			   List *rangeTable,
			   Query *parseTree)
277
{
278 279
	int			i = 1;
	Oid			relid;
280
	HeapTuple	htup;
281 282 283 284 285 286 287 288
	List	   *lp;
	List	   *qvars,
			   *tvars;
	int32		ok = 1,
				aclcheck_result = -1;
	char	   *opstr;
	NameData	rname;
	char	   *userName;
289 290 291 292 293 294 295

#define CHECK(MODE)		pg_aclcheck(rname.data, userName, MODE)

	userName = GetPgUserName();

	foreach(lp, rangeTable)
	{
296
		RangeTblEntry *rte = lfirst(lp);
297

M
Marc G. Fournier 已提交
298 299
		if (rte->skipAcl)
		{
300

M
Marc G. Fournier 已提交
301
			/*
302 303 304 305
			 * This happens if the access to this table is due to a view
			 * query rewriting - the rewrite handler checked the
			 * permissions against the view owner, so we just skip this
			 * entry.
M
Marc G. Fournier 已提交
306 307 308 309
			 */
			continue;
		}

310
		relid = rte->relid;
311
		htup = SearchSysCacheTuple(RELOID,
312 313
								   ObjectIdGetDatum(relid),
								   0, 0, 0);
314
		if (!HeapTupleIsValid(htup))
315
			elog(ERROR, "ExecCheckPerms: bogus RT relid: %d",
316
				 relid);
317
		StrNCpy(rname.data,
318
				((Form_pg_class) GETSTRUCT(htup))->relname.data,
319
				NAMEDATALEN);
320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
		if (i == resultRelation)
		{						/* this is the result relation */
			qvars = pull_varnos(parseTree->qual);
			tvars = pull_varnos((Node *) parseTree->targetList);
			if (intMember(resultRelation, qvars) ||
				intMember(resultRelation, tvars))
			{
				/* result relation is scanned */
				ok = ((aclcheck_result = CHECK(ACL_RD)) == ACLCHECK_OK);
				opstr = "read";
				if (!ok)
					break;
			}
			switch (operation)
			{
335 336 337 338 339 340 341 342 343 344 345 346 347
				case CMD_INSERT:
					ok = ((aclcheck_result = CHECK(ACL_AP)) == ACLCHECK_OK) ||
						((aclcheck_result = CHECK(ACL_WR)) == ACLCHECK_OK);
					opstr = "append";
					break;
				case CMD_NOTIFY:		/* what does this mean?? -- jw,
										 * 1/6/94 */
				case CMD_DELETE:
				case CMD_UPDATE:
					ok = ((aclcheck_result = CHECK(ACL_WR)) == ACLCHECK_OK);
					opstr = "write";
					break;
				default:
348
					elog(ERROR, "ExecCheckPerms: bogus operation %d",
349
						 operation);
350 351 352 353 354 355 356 357
			}
		}
		else
		{
			/* XXX NOTIFY?? */
			ok = ((aclcheck_result = CHECK(ACL_RD)) == ACLCHECK_OK);
			opstr = "read";
		}
358
		if (!ok)
359 360
			break;
		++i;
361 362
	}
	if (!ok)
363
		elog(ERROR, "%s: %s", rname.data, aclcheck_error_strings[aclcheck_result]);
364 365
}

366 367 368 369 370 371 372
/* ===============================================================
 * ===============================================================
						 static routines follow
 * ===============================================================
 * ===============================================================
 */

373 374

/* ----------------------------------------------------------------
375 376 377 378
 *		InitPlan
 *
 *		Initializes the query plan: open files, allocate storage
 *		and start up the rule manager
379 380
 * ----------------------------------------------------------------
 */
381
static TupleDesc
382
InitPlan(CmdType operation, Query *parseTree, Plan *plan, EState *estate)
383
{
384 385 386
	List	   *rangeTable;
	int			resultRelation;
	Relation	intoRelationDesc;
387

388 389 390
	TupleDesc	tupType;
	List	   *targetList;
	int			len;
391

392
	/******************
393
	 *	get information from query descriptor
394
	 ******************
395
	 */
396 397
	rangeTable = parseTree->rtable;
	resultRelation = parseTree->resultRelation;
398

399
	/******************
400
	 *	initialize the node's execution state
401
	 ******************
402
	 */
403 404
	estate->es_range_table = rangeTable;

405
	/******************
406 407 408 409
	 *	initialize the BaseId counter so node base_id's
	 *	are assigned correctly.  Someday baseid's will have to
	 *	be stored someplace other than estate because they
	 *	should be unique per query planned.
410
	 ******************
411
	 */
412
	estate->es_BaseId = 1;
413

414
	/******************
415
	 *	initialize result relation stuff
416
	 ******************
417
	 */
418

419 420
	if (resultRelation != 0 && operation != CMD_SELECT)
	{
421
		/******************
422 423 424
		 *	  if we have a result relation, open it and

		 *	  initialize the result relation info stuff.
425
		 ******************
426
		 */
427 428 429 430 431
		RelationInfo *resultRelationInfo;
		Index		resultRelationIndex;
		RangeTblEntry *rtentry;
		Oid			resultRelationOid;
		Relation	resultRelationDesc;
432 433 434 435 436 437 438

		resultRelationIndex = resultRelation;
		rtentry = rt_fetch(resultRelationIndex, rangeTable);
		resultRelationOid = rtentry->relid;
		resultRelationDesc = heap_open(resultRelationOid);

		if (resultRelationDesc->rd_rel->relkind == RELKIND_SEQUENCE)
439
			elog(ERROR, "You can't change sequence relation %s",
440 441
				 resultRelationDesc->rd_rel->relname.data);

V
Vadim B. Mikheev 已提交
442
		/*
443 444 445 446 447
		 * Write-lock the result relation right away: if the relation is
		 * used in a subsequent scan, we won't have to elevate the
		 * read-lock set by heap_beginscan to a write-lock (needed by
		 * heap_insert, heap_delete and heap_replace). This will hopefully
		 * prevent some deadlocks.	- 01/24/94
V
Vadim B. Mikheev 已提交
448
		 */
449 450 451 452 453 454 455 456
		RelationSetLockForWrite(resultRelationDesc);

		resultRelationInfo = makeNode(RelationInfo);
		resultRelationInfo->ri_RangeTableIndex = resultRelationIndex;
		resultRelationInfo->ri_RelationDesc = resultRelationDesc;
		resultRelationInfo->ri_NumIndices = 0;
		resultRelationInfo->ri_IndexRelationDescs = NULL;
		resultRelationInfo->ri_IndexRelationInfo = NULL;
457

458
		/******************
459 460
		 *	open indices on result relation and save descriptors
		 *	in the result relation information..
461
		 ******************
462
		 */
463 464 465
		ExecOpenIndices(resultRelationOid, resultRelationInfo);

		estate->es_result_relation_info = resultRelationInfo;
466
	}
467 468
	else
	{
469
		/******************
470
		 *		if no result relation, then set state appropriately
471
		 ******************
472 473 474 475 476 477 478 479
		 */
		estate->es_result_relation_info = NULL;
	}

#ifndef NO_SECURITY
	ExecCheckPerms(operation, resultRelation, rangeTable, parseTree);
#endif

480
	/******************
481
	 *	  initialize the executor "tuple" table.
482
	 ******************
483 484
	 */
	{
485 486
		int			nSlots = ExecCountSlotsNode(plan);
		TupleTable	tupleTable = ExecCreateTupleTable(nSlots + 10);		/* why add ten? - jolly */
487

488 489
		estate->es_tupleTable = tupleTable;
	}
490

491
	/******************
492 493 494 495
	 *	   initialize the private state information for
	 *	   all the nodes in the query tree.  This opens
	 *	   files, allocates storage and leaves us ready
	 *	   to start processing tuples..
496
	 ******************
497 498 499
	 */
	ExecInitNode(plan, estate, NULL);

500
	/******************
501 502 503
	 *	   get the tuple descriptor describing the type
	 *	   of tuples to return.. (this is especially important
	 *	   if we are creating a relation with "retrieve into")
504
	 ******************
505 506 507 508 509
	 */
	tupType = ExecGetTupType(plan);		/* tuple descriptor */
	targetList = plan->targetlist;
	len = ExecTargetListLength(targetList);		/* number of attributes */

510
	/******************
511 512 513 514 515 516
	 *	  now that we have the target list, initialize the junk filter
	 *	  if this is a REPLACE or a DELETE query.
	 *	  We also init the junk filter if this is an append query
	 *	  (there might be some rule lock info there...)
	 *	  NOTE: in the future we might want to initialize the junk
	 *	  filter for all queries.
517
	 ******************
518 519
	 *		  SELECT added by daveh@insightdist.com  5/20/98 to allow
	 *		  ORDER/GROUP BY have an identifier missing from the target.
520 521
	 */
	{
522 523 524
		bool		junk_filter_needed = false;
		List	   *tlist;

525
		if (operation == CMD_SELECT)
526 527 528
		{
			foreach(tlist, targetList)
			{
529 530
				TargetEntry *tle = lfirst(tlist);

531 532 533 534 535 536 537 538 539 540 541 542 543
				if (tle->resdom->resjunk)
				{
					junk_filter_needed = true;
					break;
				}
			}
		}

		if (operation == CMD_UPDATE || operation == CMD_DELETE ||
			operation == CMD_INSERT ||
			(operation == CMD_SELECT && junk_filter_needed))
		{
			JunkFilter *j = (JunkFilter *) ExecInitJunkFilter(targetList);
544

545
			estate->es_junkFilter = j;
546

547 548 549 550 551 552
			if (operation == CMD_SELECT)
				tupType = j->jf_cleanTupType;
		}
		else
			estate->es_junkFilter = NULL;
	}
553

554
	/******************
555
	 *	initialize the "into" relation
556
	 ******************
557 558 559 560 561
	 */
	intoRelationDesc = (Relation) NULL;

	if (operation == CMD_SELECT)
	{
562 563 564
		char	   *intoName;
		Oid			intoRelationId;
		TupleDesc	tupdesc;
565 566 567 568 569 570 571 572 573

		if (!parseTree->isPortal)
		{

			/*
			 * a select into table
			 */
			if (parseTree->into != NULL)
			{
574
				/******************
575
				 *	create the "into" relation
576
				 ******************
577 578 579 580 581 582 583 584
				 */
				intoName = parseTree->into;

				/*
				 * have to copy tupType to get rid of constraints
				 */
				tupdesc = CreateTupleDescCopy(tupType);

585
				intoRelationId = heap_create_with_catalog(intoName,
586
											  tupdesc, RELKIND_RELATION);
587

588 589
				FreeTupleDesc(tupdesc);

590
				/******************
591 592 593
				 *	XXX rather than having to call setheapoverride(true)
				 *		and then back to false, we should change the
				 *		arguments to heap_open() instead..
594
				 ******************
595 596 597 598 599 600 601 602 603 604 605 606 607
				 */
				setheapoverride(true);

				intoRelationDesc = heap_open(intoRelationId);

				setheapoverride(false);
			}
		}
	}

	estate->es_into_relation_descriptor = intoRelationDesc;

	return tupType;
608 609 610
}

/* ----------------------------------------------------------------
611 612 613
 *		EndPlan
 *
 *		Cleans up the query plan -- closes files and free up storages
614 615 616
 * ----------------------------------------------------------------
 */
static void
617
EndPlan(Plan *plan, EState *estate)
618
{
619 620
	RelationInfo *resultRelationInfo;
	Relation	intoRelationDesc;
621

622
	/******************
623
	 *	get information from state
624
	 ******************
625
	 */
626 627 628
	resultRelationInfo = estate->es_result_relation_info;
	intoRelationDesc = estate->es_into_relation_descriptor;

629
	/******************
630
	 *	 shut down the query
631
	 ******************
632 633 634
	 */
	ExecEndNode(plan, plan);

635
	/******************
636
	 *	  destroy the executor "tuple" table.
637
	 ******************
638 639
	 */
	{
640
		TupleTable	tupleTable = (TupleTable) estate->es_tupleTable;
641 642 643 644 645

		ExecDestroyTupleTable(tupleTable, true);		/* was missing last arg */
		estate->es_tupleTable = NULL;
	}

646
	/******************
647
	 *	 close the result relations if necessary
648
	 ******************
649 650 651
	 */
	if (resultRelationInfo != NULL)
	{
652
		Relation	resultRelationDesc;
653 654 655 656

		resultRelationDesc = resultRelationInfo->ri_RelationDesc;
		heap_close(resultRelationDesc);

657
		/******************
658
		 *	close indices on the result relation
659
		 ******************
660 661 662 663
		 */
		ExecCloseIndices(resultRelationInfo);
	}

664
	/******************
665
	 *	 close the "into" relation if necessary
666
	 ******************
667 668 669
	 */
	if (intoRelationDesc != NULL)
		heap_close(intoRelationDesc);
670 671 672
}

/* ----------------------------------------------------------------
673 674 675 676 677 678 679 680
 *		ExecutePlan
 *
 *		processes the query plan to retrieve 'tupleCount' tuples in the
 *		direction specified.
 *		Retrieves all tuples if tupleCount is 0
 *
 *		result is either a slot containing a tuple in the case
 *		of a RETRIEVE or NULL otherwise.
681 682 683 684 685 686 687 688
 *
 * ----------------------------------------------------------------
 */

/* the ctid attribute is a 'junk' attribute that is removed before the
   user can see it*/

static TupleTableSlot *
689 690 691
ExecutePlan(EState *estate,
			Plan *plan,
			Query *parseTree,
692 693 694 695
			CmdType operation,
			int numberTuples,
			ScanDirection direction,
			void (*printfunc) ())
696
{
697
	JunkFilter *junkfilter;
698 699

	TupleTableSlot *slot;
700
	ItemPointer tupleid = NULL;
701
	ItemPointerData tuple_ctid;
702
	int			current_tuple_count;
703 704
	TupleTableSlot *result;

705
	/******************
706
	 *	initialize local variables
707
	 ******************
708
	 */
709 710 711 712
	slot = NULL;
	current_tuple_count = 0;
	result = NULL;

713
	/******************
714
	 *	Set the direction.
715
	 ******************
716
	 */
717 718
	estate->es_direction = direction;

719
	/******************
720 721
	 *	Loop until we've processed the proper number
	 *	of tuples from the plan..
722
	 ******************
723 724 725 726 727 728
	 */

	for (;;)
	{
		if (operation != CMD_NOTIFY)
		{
729
			/******************
730
			 *	Execute the plan and obtain a tuple
731
			 ******************
732 733 734 735
			 */
			/* at the top level, the parent of a plan (2nd arg) is itself */
			slot = ExecProcNode(plan, plan);

736
			/******************
737 738 739
			 *	if the tuple is null, then we assume
			 *	there is nothing more to process so
			 *	we just return null...
740
			 ******************
741 742 743 744 745 746 747 748
			 */
			if (TupIsNull(slot))
			{
				result = NULL;
				break;
			}
		}

749
		/******************
750 751 752 753 754 755
		 *		if we have a junk filter, then project a new
		 *		tuple with the junk removed.
		 *
		 *		Store this new "clean" tuple in the place of the
		 *		original tuple.
		 *
756 757
		 *		Also, extract all the junk information we need.
		 ******************
758 759 760
		 */
		if ((junkfilter = estate->es_junkFilter) != (JunkFilter *) NULL)
		{
761
			Datum		datum;
762 763

/*			NameData	attrName; */
764 765
			HeapTuple	newTuple;
			bool		isNull;
766

767
			/******************
768
			 * extract the 'ctid' junk attribute.
769
			 ******************
770 771 772 773 774 775 776 777
			 */
			if (operation == CMD_UPDATE || operation == CMD_DELETE)
			{
				if (!ExecGetJunkAttribute(junkfilter,
										  slot,
										  "ctid",
										  &datum,
										  &isNull))
778
					elog(ERROR, "ExecutePlan: NO (junk) `ctid' was found!");
779 780

				if (isNull)
781
					elog(ERROR, "ExecutePlan: (junk) `ctid' is NULL!");
782 783 784 785 786 787 788

				tupleid = (ItemPointer) DatumGetPointer(datum);
				tuple_ctid = *tupleid;	/* make sure we don't free the
										 * ctid!! */
				tupleid = &tuple_ctid;
			}

789
			/******************
790 791
			 * Finally create a new "clean" tuple with all junk attributes
			 * removed
792
			 ******************
793 794 795 796 797 798 799 800 801 802
			 */
			newTuple = ExecRemoveJunk(junkfilter, slot);

			slot = ExecStoreTuple(newTuple,		/* tuple to store */
								  slot, /* destination slot */
								  InvalidBuffer,		/* this tuple has no
														 * buffer */
								  true);		/* tuple should be pfreed */
		}						/* if (junkfilter... */

803
		/******************
804 805 806 807
		 *		now that we have a tuple, do the appropriate thing
		 *		with it.. either return it to the user, add
		 *		it to a relation someplace, delete it from a
		 *		relation, or modify some of it's attributes.
808
		 ******************
809 810 811 812
		 */

		switch (operation)
		{
813 814 815 816 817 818
			case CMD_SELECT:
				ExecRetrieve(slot,		/* slot containing tuple */
							 printfunc, /* print function */
							 estate);	/* */
				result = slot;
				break;
819

820 821 822 823
			case CMD_INSERT:
				ExecAppend(slot, tupleid, estate);
				result = NULL;
				break;
824

825 826 827 828
			case CMD_DELETE:
				ExecDelete(slot, tupleid, estate);
				result = NULL;
				break;
829

830 831 832 833
			case CMD_UPDATE:
				ExecReplace(slot, tupleid, estate, parseTree);
				result = NULL;
				break;
834

835 836
				/*
				 * Total hack. I'm ignoring any accessor functions for
837
				 * Relation, RelationForm, NameData. Assuming that
838 839 840 841 842 843 844 845 846 847 848 849 850 851
				 * NameData.data has offset 0.
				 */
			case CMD_NOTIFY:
				{
					RelationInfo *rInfo = estate->es_result_relation_info;
					Relation	rDesc = rInfo->ri_RelationDesc;

					Async_Notify(rDesc->rd_rel->relname.data);
					result = NULL;
					current_tuple_count = 0;
					numberTuples = 1;
					elog(DEBUG, "ExecNotify %s", &rDesc->rd_rel->relname);
				}
				break;
852

853 854
			default:
				elog(DEBUG, "ExecutePlan: unknown operation in queryDesc");
855
				result = NULL;
856
				break;
857
		}
858
		/******************
859 860 861
		 *		check our tuple count.. if we've returned the
		 *		proper number then return, else loop again and
		 *		process more tuples..
862
		 ******************
863 864 865 866
		 */
		current_tuple_count += 1;
		if (numberTuples == current_tuple_count)
			break;
867
	}
868

869
	/******************
870 871
	 *	here, result is either a slot containing a tuple in the case
	 *	of a RETRIEVE or NULL otherwise.
872
	 ******************
873
	 */
874
	return result;
875 876 877
}

/* ----------------------------------------------------------------
878
 *		ExecRetrieve
879
 *
880 881 882 883 884
 *		RETRIEVEs are easy.. we just pass the tuple to the appropriate
 *		print function.  The only complexity is when we do a
 *		"retrieve into", in which case we insert the tuple into
 *		the appropriate relation (note: this is a newly created relation
 *		so we don't need to worry about indices or locks.)
885 886 887
 * ----------------------------------------------------------------
 */
static void
888
ExecRetrieve(TupleTableSlot *slot,
889
			 void (*printfunc) (),
890
			 EState *estate)
891
{
892 893
	HeapTuple	tuple;
	TupleDesc	attrtype;
894

895
	/******************
896
	 *	get the heap tuple out of the tuple table slot
897
	 ******************
898 899 900 901
	 */
	tuple = slot->val;
	attrtype = slot->ttc_tupleDescriptor;

902
	/******************
903
	 *	insert the tuple into the "into relation"
904
	 ******************
905 906 907 908 909 910 911
	 */
	if (estate->es_into_relation_descriptor != NULL)
	{
		heap_insert(estate->es_into_relation_descriptor, tuple);
		IncrAppended();
	}

912
	/******************
913
	 *	send the tuple to the front end (or the screen)
914
	 ******************
915 916 917 918
	 */
	(*printfunc) (tuple, attrtype);
	IncrRetrieved();
	(estate->es_processed)++;
919 920 921
}

/* ----------------------------------------------------------------
922
 *		ExecAppend
923
 *
924 925 926
 *		APPENDs are trickier.. we have to insert the tuple into
 *		the base relation and insert appropriate tuples into the
 *		index relations.
927 928 929 930
 * ----------------------------------------------------------------
 */

static void
931
ExecAppend(TupleTableSlot *slot,
932
		   ItemPointer tupleid,
933
		   EState *estate)
934
{
935 936 937 938 939
	HeapTuple	tuple;
	RelationInfo *resultRelationInfo;
	Relation	resultRelationDesc;
	int			numIndices;
	Oid			newId;
940

941
	/******************
942
	 *	get the heap tuple out of the tuple table slot
943
	 ******************
944 945 946
	 */
	tuple = slot->val;

947
	/******************
948
	 *	get information on the result relation
949
	 ******************
950 951 952 953
	 */
	resultRelationInfo = estate->es_result_relation_info;
	resultRelationDesc = resultRelationInfo->ri_RelationDesc;

954
	/******************
955 956
	 *	have to add code to preform unique checking here.
	 *	cim -12/1/89
957
	 ******************
958 959 960 961 962 963
	 */

	/* BEFORE ROW INSERT Triggers */
	if (resultRelationDesc->trigdesc &&
	resultRelationDesc->trigdesc->n_before_row[TRIGGER_EVENT_INSERT] > 0)
	{
964
		HeapTuple	newtuple;
965 966 967 968 969 970 971 972 973 974 975 976 977 978

		newtuple = ExecBRInsertTriggers(resultRelationDesc, tuple);

		if (newtuple == NULL)	/* "do nothing" */
			return;

		if (newtuple != tuple)	/* modified by Trigger(s) */
		{
			Assert(slot->ttc_shouldFree);
			pfree(tuple);
			slot->val = tuple = newtuple;
		}
	}

979
	/******************
980
	 * Check the constraints of a tuple
981
	 ******************
982 983 984 985
	 */

	if (resultRelationDesc->rd_att->constr)
	{
986
		HeapTuple	newtuple;
987 988 989 990 991 992 993 994 995 996 997

		newtuple = ExecConstraints("ExecAppend", resultRelationDesc, tuple);

		if (newtuple != tuple)	/* modified by DEFAULT */
		{
			Assert(slot->ttc_shouldFree);
			pfree(tuple);
			slot->val = tuple = newtuple;
		}
	}

998
	/******************
999
	 *	insert the tuple
1000
	 ******************
1001 1002 1003 1004 1005
	 */
	newId = heap_insert(resultRelationDesc,		/* relation desc */
						tuple); /* heap tuple */
	IncrAppended();

1006
	/******************
1007 1008 1009 1010 1011
	 *	process indices
	 *
	 *	Note: heap_insert adds a new tuple to a relation.  As a side
	 *	effect, the tupleid of the new tuple is placed in the new
	 *	tuple's t_ctid field.
1012
	 ******************
1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023
	 */
	numIndices = resultRelationInfo->ri_NumIndices;
	if (numIndices > 0)
		ExecInsertIndexTuples(slot, &(tuple->t_ctid), estate, false);
	(estate->es_processed)++;
	estate->es_lastoid = newId;

	/* AFTER ROW INSERT Triggers */
	if (resultRelationDesc->trigdesc &&
	 resultRelationDesc->trigdesc->n_after_row[TRIGGER_EVENT_INSERT] > 0)
		ExecARInsertTriggers(resultRelationDesc, tuple);
1024 1025 1026
}

/* ----------------------------------------------------------------
1027
 *		ExecDelete
1028
 *
1029 1030
 *		DELETE is like append, we delete the tuple and its
 *		index tuples.
1031 1032 1033
 * ----------------------------------------------------------------
 */
static void
1034
ExecDelete(TupleTableSlot *slot,
1035
		   ItemPointer tupleid,
1036
		   EState *estate)
1037
{
1038 1039
	RelationInfo *resultRelationInfo;
	Relation	resultRelationDesc;
1040

1041
	/******************
1042
	 *	get the result relation information
1043
	 ******************
1044 1045 1046 1047 1048 1049 1050 1051
	 */
	resultRelationInfo = estate->es_result_relation_info;
	resultRelationDesc = resultRelationInfo->ri_RelationDesc;

	/* BEFORE ROW DELETE Triggers */
	if (resultRelationDesc->trigdesc &&
	resultRelationDesc->trigdesc->n_before_row[TRIGGER_EVENT_DELETE] > 0)
	{
1052
		bool		dodelete;
1053 1054 1055 1056 1057 1058 1059

		dodelete = ExecBRDeleteTriggers(resultRelationDesc, tupleid);

		if (!dodelete)			/* "do nothing" */
			return;
	}

1060
	/******************
1061
	 *	delete the tuple
1062
	 ******************
1063 1064 1065 1066 1067 1068 1069 1070
	 */
	if (heap_delete(resultRelationDesc, /* relation desc */
					tupleid))	/* item pointer to tuple */
		return;

	IncrDeleted();
	(estate->es_processed)++;

1071
	/******************
1072 1073 1074 1075 1076 1077 1078 1079
	 *	Note: Normally one would think that we have to
	 *		  delete index tuples associated with the
	 *		  heap tuple now..
	 *
	 *		  ... but in POSTGRES, we have no need to do this
	 *		  because the vacuum daemon automatically
	 *		  opens an index scan and deletes index tuples
	 *		  when it finds deleted heap tuples. -cim 9/27/89
1080
	 ******************
1081 1082 1083 1084 1085 1086
	 */

	/* AFTER ROW DELETE Triggers */
	if (resultRelationDesc->trigdesc &&
	 resultRelationDesc->trigdesc->n_after_row[TRIGGER_EVENT_DELETE] > 0)
		ExecARDeleteTriggers(resultRelationDesc, tupleid);
1087 1088 1089 1090

}

/* ----------------------------------------------------------------
1091
 *		ExecReplace
1092
 *
1093 1094 1095 1096 1097 1098
 *		note: we can't run replace queries with transactions
 *		off because replaces are actually appends and our
 *		scan will mistakenly loop forever, replacing the tuple
 *		it just appended..	This should be fixed but until it
 *		is, we don't want to get stuck in an infinite loop
 *		which corrupts your database..
1099 1100 1101
 * ----------------------------------------------------------------
 */
static void
1102
ExecReplace(TupleTableSlot *slot,
1103
			ItemPointer tupleid,
1104 1105
			EState *estate,
			Query *parseTree)
1106
{
1107 1108 1109 1110
	HeapTuple	tuple;
	RelationInfo *resultRelationInfo;
	Relation	resultRelationDesc;
	int			numIndices;
1111

1112
	/******************
1113
	 *	abort the operation if not running transactions
1114
	 ******************
1115 1116 1117 1118 1119 1120 1121
	 */
	if (IsBootstrapProcessingMode())
	{
		elog(DEBUG, "ExecReplace: replace can't run without transactions");
		return;
	}

1122
	/******************
1123
	 *	get the heap tuple out of the tuple table slot
1124
	 ******************
1125 1126 1127
	 */
	tuple = slot->val;

1128
	/******************
1129
	 *	get the result relation information
1130
	 ******************
1131 1132 1133 1134
	 */
	resultRelationInfo = estate->es_result_relation_info;
	resultRelationDesc = resultRelationInfo->ri_RelationDesc;

1135
	/******************
1136 1137 1138 1139
	 *	have to add code to preform unique checking here.
	 *	in the event of unique tuples, this becomes a deletion
	 *	of the original tuple affected by the replace.
	 *	cim -12/1/89
1140
	 ******************
1141 1142 1143 1144 1145 1146
	 */

	/* BEFORE ROW UPDATE Triggers */
	if (resultRelationDesc->trigdesc &&
	resultRelationDesc->trigdesc->n_before_row[TRIGGER_EVENT_UPDATE] > 0)
	{
1147
		HeapTuple	newtuple;
1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161

		newtuple = ExecBRUpdateTriggers(resultRelationDesc, tupleid, tuple);

		if (newtuple == NULL)	/* "do nothing" */
			return;

		if (newtuple != tuple)	/* modified by Trigger(s) */
		{
			Assert(slot->ttc_shouldFree);
			pfree(tuple);
			slot->val = tuple = newtuple;
		}
	}

1162
	/******************
1163
	 * Check the constraints of a tuple
1164
	 ******************
1165 1166 1167 1168
	 */

	if (resultRelationDesc->rd_att->constr)
	{
1169
		HeapTuple	newtuple;
1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180

		newtuple = ExecConstraints("ExecReplace", resultRelationDesc, tuple);

		if (newtuple != tuple)	/* modified by DEFAULT */
		{
			Assert(slot->ttc_shouldFree);
			pfree(tuple);
			slot->val = tuple = newtuple;
		}
	}

1181
	/******************
1182 1183 1184 1185 1186
	 *	replace the heap tuple
	 *
	 * Don't want to continue if our heap_replace didn't actually
	 * do a replace. This would be the case if heap_replace
	 * detected a non-functional update. -kw 12/30/93
1187
	 ******************
1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198
	 */
	if (heap_replace(resultRelationDesc,		/* relation desc */
					 tupleid,	/* item ptr of tuple to replace */
					 tuple))
	{							/* replacement heap tuple */
		return;
	}

	IncrReplaced();
	(estate->es_processed)++;

1199
	/******************
1200 1201 1202 1203 1204 1205 1206
	 *	Note: instead of having to update the old index tuples
	 *		  associated with the heap tuple, all we do is form
	 *		  and insert new index tuples..  This is because
	 *		  replaces are actually deletes and inserts and
	 *		  index tuple deletion is done automagically by
	 *		  the vaccuum deamon.. All we do is insert new
	 *		  index tuples.  -cim 9/27/89
1207
	 ******************
1208 1209
	 */

1210
	/******************
1211 1212 1213 1214 1215 1216 1217
	 *	process indices
	 *
	 *	heap_replace updates a tuple in the base relation by invalidating
	 *	it and then appending a new tuple to the relation.	As a side
	 *	effect, the tupleid of the new tuple is placed in the new
	 *	tuple's t_ctid field.  So we now insert index tuples using
	 *	the new tupleid stored there.
1218
	 ******************
1219 1220 1221 1222 1223 1224 1225 1226 1227 1228
	 */

	numIndices = resultRelationInfo->ri_NumIndices;
	if (numIndices > 0)
		ExecInsertIndexTuples(slot, &(tuple->t_ctid), estate, true);

	/* AFTER ROW UPDATE Triggers */
	if (resultRelationDesc->trigdesc &&
	 resultRelationDesc->trigdesc->n_after_row[TRIGGER_EVENT_UPDATE] > 0)
		ExecARUpdateTriggers(resultRelationDesc, tupleid, tuple);
1229
}
V
Vadim B. Mikheev 已提交
1230

1231
#if 0
1232
static HeapTuple
1233
ExecAttrDefault(Relation rel, HeapTuple tuple)
V
Vadim B. Mikheev 已提交
1234
{
1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246
	int			ndef = rel->rd_att->constr->num_defval;
	AttrDefault *attrdef = rel->rd_att->constr->defval;
	ExprContext *econtext = makeNode(ExprContext);
	HeapTuple	newtuple;
	Node	   *expr;
	bool		isnull;
	bool		isdone;
	Datum		val;
	Datum	   *replValue = NULL;
	char	   *replNull = NULL;
	char	   *repl = NULL;
	int			i;
1247 1248 1249 1250 1251 1252

	econtext->ecxt_scantuple = NULL;	/* scan tuple slot */
	econtext->ecxt_innertuple = NULL;	/* inner tuple slot */
	econtext->ecxt_outertuple = NULL;	/* outer tuple slot */
	econtext->ecxt_relation = NULL;		/* relation */
	econtext->ecxt_relid = 0;	/* relid */
1253 1254
	econtext->ecxt_param_list_info = NULL;		/* param list info */
	econtext->ecxt_param_exec_vals = NULL;		/* exec param values */
1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273
	econtext->ecxt_range_table = NULL;	/* range table */
	for (i = 0; i < ndef; i++)
	{
		if (!heap_attisnull(tuple, attrdef[i].adnum))
			continue;
		expr = (Node *) stringToNode(attrdef[i].adbin);

		val = ExecEvalExpr(expr, econtext, &isnull, &isdone);

		pfree(expr);

		if (isnull)
			continue;

		if (repl == NULL)
		{
			repl = (char *) palloc(rel->rd_att->natts * sizeof(char));
			replNull = (char *) palloc(rel->rd_att->natts * sizeof(char));
			replValue = (Datum *) palloc(rel->rd_att->natts * sizeof(Datum));
B
Bruce Momjian 已提交
1274
			MemSet(repl, ' ', rel->rd_att->natts * sizeof(char));
1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285
		}

		repl[attrdef[i].adnum - 1] = 'r';
		replNull[attrdef[i].adnum - 1] = ' ';
		replValue[attrdef[i].adnum - 1] = val;

	}

	pfree(econtext);

	if (repl == NULL)
1286
		return tuple;
1287

1288
	newtuple = heap_modifytuple(tuple, rel, replValue, replNull, repl);
1289 1290

	pfree(repl);
1291
	pfree(tuple);
1292 1293 1294
	pfree(replNull);
	pfree(replValue);

1295
	return newtuple;
1296

V
Vadim B. Mikheev 已提交
1297
}
1298

1299
#endif
V
Vadim B. Mikheev 已提交
1300

1301
static char *
1302
ExecRelCheck(Relation rel, HeapTuple tuple)
V
Vadim B. Mikheev 已提交
1303
{
1304 1305 1306
	int			ncheck = rel->rd_att->constr->num_check;
	ConstrCheck *check = rel->rd_att->constr->check;
	ExprContext *econtext = makeNode(ExprContext);
1307
	TupleTableSlot *slot = makeNode(TupleTableSlot);
1308 1309 1310 1311 1312
	RangeTblEntry *rte = makeNode(RangeTblEntry);
	List	   *rtlist;
	List	   *qual;
	bool		res;
	int			i;
1313 1314 1315 1316 1317 1318 1319 1320 1321

	slot->val = tuple;
	slot->ttc_shouldFree = false;
	slot->ttc_descIsNew = true;
	slot->ttc_tupleDescriptor = rel->rd_att;
	slot->ttc_buffer = InvalidBuffer;
	slot->ttc_whichplan = -1;
	rte->relname = nameout(&(rel->rd_rel->relname));
	rte->refname = rte->relname;
1322
	rte->relid = RelationGetRelid(rel);
1323 1324 1325 1326 1327 1328 1329 1330 1331
	rte->inh = false;
	rte->inFromCl = true;
	rtlist = lcons(rte, NIL);
	econtext->ecxt_scantuple = slot;	/* scan tuple slot */
	econtext->ecxt_innertuple = NULL;	/* inner tuple slot */
	econtext->ecxt_outertuple = NULL;	/* outer tuple slot */
	econtext->ecxt_relation = rel;		/* relation */
	econtext->ecxt_relid = 0;	/* relid */
	econtext->ecxt_param_list_info = NULL;		/* param list info */
V
Vadim B. Mikheev 已提交
1332
	econtext->ecxt_param_exec_vals = NULL;		/* exec param values */
1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343
	econtext->ecxt_range_table = rtlist;		/* range table */

	for (i = 0; i < ncheck; i++)
	{
		qual = (List *) stringToNode(check[i].ccbin);

		res = ExecQual(qual, econtext);

		pfree(qual);

		if (!res)
1344
			return check[i].ccname;
1345 1346 1347 1348 1349 1350 1351 1352
	}

	pfree(slot);
	pfree(rte->relname);
	pfree(rte);
	pfree(rtlist);
	pfree(econtext);

1353
	return (char *) NULL;
1354

V
Vadim B. Mikheev 已提交
1355 1356 1357
}

HeapTuple
1358
ExecConstraints(char *caller, Relation rel, HeapTuple tuple)
V
Vadim B. Mikheev 已提交
1359
{
1360
	HeapTuple	newtuple = tuple;
1361 1362 1363

	Assert(rel->rd_att->constr);

1364
#if 0
1365 1366
	if (rel->rd_att->constr->num_defval > 0)
		newtuple = tuple = ExecAttrDefault(rel, tuple);
1367
#endif
1368 1369

	if (rel->rd_att->constr->has_not_null)
V
Vadim B. Mikheev 已提交
1370
	{
1371
		int			attrChk;
1372 1373 1374 1375

		for (attrChk = 1; attrChk <= rel->rd_att->natts; attrChk++)
		{
			if (rel->rd_att->attrs[attrChk - 1]->attnotnull && heap_attisnull(tuple, attrChk))
1376
				elog(ERROR, "%s: Fail to add null value in not null attribute %s",
1377 1378 1379 1380 1381 1382
				  caller, rel->rd_att->attrs[attrChk - 1]->attname.data);
		}
	}

	if (rel->rd_att->constr->num_check > 0)
	{
1383
		char	   *failed;
1384 1385

		if ((failed = ExecRelCheck(rel, tuple)) != NULL)
1386
			elog(ERROR, "%s: rejected due to CHECK constraint %s", caller, failed);
1387 1388
	}

1389
	return newtuple;
V
Vadim B. Mikheev 已提交
1390
}