execMain.c 33.7 KB
Newer Older
1 2 3
/*-------------------------------------------------------------------------
 *
 * execMain.c--
4
 *	  top level executor interface routines
5 6
 *
 * INTERFACE ROUTINES
7 8 9
 *	ExecutorStart()
 *	ExecutorRun()
 *	ExecutorEnd()
10
 *
11 12 13 14 15 16 17 18 19 20 21 22 23
 *	The old ExecutorMain() has been replaced by ExecutorStart(),
 *	ExecutorRun() and ExecutorEnd()
 *
 *	These three procedures are the external interfaces to the executor.
 *	In each case, the query descriptor and the execution state is required
 *	 as arguments
 *
 *	ExecutorStart() must be called at the beginning of any execution of any
 *	query plan and ExecutorEnd() should always be called at the end of
 *	execution of a plan.
 *
 *	ExecutorRun accepts 'feature' and 'count' arguments that specify whether
 *	the plan is to be executed forwards, backwards, and for how many tuples.
24 25 26 27 28
 *
 * Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
V
Vadim B. Mikheev 已提交
29
 *	  $Header: /cvsroot/pgsql/src/backend/executor/execMain.c,v 1.61 1998/12/16 11:53:45 vadim Exp $
30 31 32
 *
 *-------------------------------------------------------------------------
 */
33
#include <string.h>
34
#include "postgres.h"
35
#include "miscadmin.h"
36

37
#include "executor/executor.h"
38 39
#include "executor/execdefs.h"
#include "executor/execdebug.h"
40
#include "executor/nodeIndexscan.h"
41 42 43
#include "utils/builtins.h"
#include "utils/palloc.h"
#include "utils/acl.h"
44
#include "utils/syscache.h"
45
#include "utils/tqual.h"
46
#include "parser/parsetree.h"	/* rt_fetch() */
47
#include "storage/bufmgr.h"
M
Marc G. Fournier 已提交
48
#include "storage/lmgr.h"
49
#include "storage/smgr.h"
50 51 52
#include "commands/async.h"
/* #include "access/localam.h" */
#include "optimizer/var.h"
53
#include "access/heapam.h"
V
Vadim B. Mikheev 已提交
54
#include "access/xact.h"
55
#include "catalog/heap.h"
56
#include "commands/trigger.h"
57

58
void ExecCheckPerms(CmdType operation, int resultRelation, List *rangeTable,
59
			   Query *parseTree);
60 61 62


/* decls for local routines only used within this module */
63
static TupleDesc InitPlan(CmdType operation, Query *parseTree,
64 65
		 Plan *plan, EState *estate);
static void EndPlan(Plan *plan, EState *estate);
66
static TupleTableSlot *ExecutePlan(EState *estate, Plan *plan,
67
			Query *parseTree, CmdType operation,
68 69
			int numberTuples, ScanDirection direction,
			void (*printfunc) ());
70 71
static void ExecRetrieve(TupleTableSlot *slot, void (*printfunc) (),
									 EState *estate);
72
static void ExecAppend(TupleTableSlot *slot, ItemPointer tupleid,
73
		   EState *estate);
74
static void ExecDelete(TupleTableSlot *slot, ItemPointer tupleid,
75
		   EState *estate);
76
static void ExecReplace(TupleTableSlot *slot, ItemPointer tupleid,
77
			EState *estate, Query *parseTree);
78 79 80

/* end of local decls */

M
Marc G. Fournier 已提交
81
#ifdef QUERY_LIMIT
82
static int	queryLimit = ALL_TUPLES;
83

M
Marc G. Fournier 已提交
84 85 86 87 88 89
#undef ALL_TUPLES
#define ALL_TUPLES queryLimit

int
ExecutorLimit(int limit)
{
90
	return queryLimit = limit;
M
Marc G. Fournier 已提交
91
}
92

B
Bruce Momjian 已提交
93 94 95 96 97 98
int
ExecutorGetLimit()
{
	return queryLimit;
}

99
#endif
M
Marc G. Fournier 已提交
100

101
/* ----------------------------------------------------------------
102 103 104 105 106 107 108
 *		ExecutorStart
 *
 *		This routine must be called at the beginning of any execution of any
 *		query plan
 *
 *		returns (AttrInfo*) which describes the attributes of the tuples to
 *		be returned by the query.
109 110 111 112
 *
 * ----------------------------------------------------------------
 */
TupleDesc
113
ExecutorStart(QueryDesc *queryDesc, EState *estate)
114
{
115
	TupleDesc	result;
116 117 118

	/* sanity checks */
	Assert(queryDesc != NULL);
119

V
Vadim B. Mikheev 已提交
120 121
	if (queryDesc->plantree->nParamExec > 0)
	{
122 123 124
		estate->es_param_exec_vals = (ParamExecData *)
			palloc(queryDesc->plantree->nParamExec * sizeof(ParamExecData));
		memset(estate->es_param_exec_vals, 0, queryDesc->plantree->nParamExec * sizeof(ParamExecData));
V
Vadim B. Mikheev 已提交
125
	}
126

V
Vadim B. Mikheev 已提交
127
	estate->es_snapshot = QuerySnapshot;
128

129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
	result = InitPlan(queryDesc->operation,
					  queryDesc->parsetree,
					  queryDesc->plantree,
					  estate);

	/*
	 * reset buffer refcount.  the current refcounts are saved and will be
	 * restored when ExecutorEnd is called
	 *
	 * this makes sure that when ExecutorRun's are called recursively as for
	 * postquel functions, the buffers pinned by one ExecutorRun will not
	 * be unpinned by another ExecutorRun.
	 */
	BufferRefCountReset(estate->es_refcount);

	return result;
145 146 147
}

/* ----------------------------------------------------------------
148 149 150 151 152 153 154
 *		ExecutorRun
 *
 *		This is the main routine of the executor module. It accepts
 *		the query descriptor from the traffic cop and executes the
 *		query plan.
 *
 *		ExecutorStart must have been called already.
155
 *
156 157 158 159 160 161
 *		the different features supported are:
 *			 EXEC_RUN:	retrieve all tuples in the forward direction
 *			 EXEC_FOR:	retrieve 'count' number of tuples in the forward dir
 *			 EXEC_BACK: retrieve 'count' number of tuples in the backward dir
 *			 EXEC_RETONE: return one tuple but don't 'retrieve' it
 *						   used in postquel function processing
162 163 164 165
 *
 *
 * ----------------------------------------------------------------
 */
166
TupleTableSlot *
167
ExecutorRun(QueryDesc *queryDesc, EState *estate, int feature, int count)
168
{
169 170 171
	CmdType		operation;
	Query	   *parseTree;
	Plan	   *plan;
172
	TupleTableSlot *result;
173 174
	CommandDest dest;
	void		(*destination) ();
175

176
	/******************
177
	 *	sanity checks
178
	 ******************
179
	 */
180 181
	Assert(queryDesc != NULL);

182
	/******************
183 184
	 *	extract information from the query descriptor
	 *	and the query feature.
185
	 ******************
186
	 */
187 188 189 190 191 192 193 194 195 196 197
	operation = queryDesc->operation;
	parseTree = queryDesc->parsetree;
	plan = queryDesc->plantree;
	dest = queryDesc->dest;
	destination = (void (*) ()) DestToFunction(dest);
	estate->es_processed = 0;
	estate->es_lastoid = InvalidOid;

	switch (feature)
	{

198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
		case EXEC_RUN:
			result = ExecutePlan(estate,
								 plan,
								 parseTree,
								 operation,
								 ALL_TUPLES,
								 ForwardScanDirection,
								 destination);
			break;
		case EXEC_FOR:
			result = ExecutePlan(estate,
								 plan,
								 parseTree,
								 operation,
								 count,
								 ForwardScanDirection,
								 destination);
			break;
216

217
			/******************
218
			 *		retrieve next n "backward" tuples
219
			 ******************
220 221 222 223 224 225 226 227 228 229
			 */
		case EXEC_BACK:
			result = ExecutePlan(estate,
								 plan,
								 parseTree,
								 operation,
								 count,
								 BackwardScanDirection,
								 destination);
			break;
230

231
			/******************
232 233
			 *		return one tuple but don't "retrieve" it.
			 *		(this is used by the rule manager..) -cim 9/14/89
234
			 ******************
235 236 237 238 239 240 241 242 243 244 245 246 247 248
			 */
		case EXEC_RETONE:
			result = ExecutePlan(estate,
								 plan,
								 parseTree,
								 operation,
								 ONE_TUPLE,
								 ForwardScanDirection,
								 destination);
			break;
		default:
			result = NULL;
			elog(DEBUG, "ExecutorRun: Unknown feature %d", feature);
			break;
249 250 251
	}

	return result;
252 253 254
}

/* ----------------------------------------------------------------
255 256 257 258 259 260 261
 *		ExecutorEnd
 *
 *		This routine must be called at the end of any execution of any
 *		query plan
 *
 *		returns (AttrInfo*) which describes the attributes of the tuples to
 *		be returned by the query.
262 263 264 265
 *
 * ----------------------------------------------------------------
 */
void
266
ExecutorEnd(QueryDesc *queryDesc, EState *estate)
267
{
268 269
	/* sanity checks */
	Assert(queryDesc != NULL);
270

271
	EndPlan(queryDesc->plantree, estate);
272

273 274
	/* restore saved refcounts. */
	BufferRefCountRestore(estate->es_refcount);
275 276
}

277
void
278
ExecCheckPerms(CmdType operation,
279
			   int resultRelation,
280 281
			   List *rangeTable,
			   Query *parseTree)
282
{
283 284
	int			i = 1;
	Oid			relid;
285
	HeapTuple	htup;
286 287 288 289 290 291 292 293
	List	   *lp;
	List	   *qvars,
			   *tvars;
	int32		ok = 1,
				aclcheck_result = -1;
	char	   *opstr;
	NameData	rname;
	char	   *userName;
294 295 296 297 298 299 300

#define CHECK(MODE)		pg_aclcheck(rname.data, userName, MODE)

	userName = GetPgUserName();

	foreach(lp, rangeTable)
	{
301
		RangeTblEntry *rte = lfirst(lp);
302

M
Marc G. Fournier 已提交
303 304
		if (rte->skipAcl)
		{
305

M
Marc G. Fournier 已提交
306
			/*
307 308 309 310
			 * This happens if the access to this table is due to a view
			 * query rewriting - the rewrite handler checked the
			 * permissions against the view owner, so we just skip this
			 * entry.
M
Marc G. Fournier 已提交
311 312 313 314
			 */
			continue;
		}

315
		relid = rte->relid;
316
		htup = SearchSysCacheTuple(RELOID,
317 318
								   ObjectIdGetDatum(relid),
								   0, 0, 0);
319
		if (!HeapTupleIsValid(htup))
320
			elog(ERROR, "ExecCheckPerms: bogus RT relid: %d",
321
				 relid);
322
		StrNCpy(rname.data,
323
				((Form_pg_class) GETSTRUCT(htup))->relname.data,
324
				NAMEDATALEN);
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
		if (i == resultRelation)
		{						/* this is the result relation */
			qvars = pull_varnos(parseTree->qual);
			tvars = pull_varnos((Node *) parseTree->targetList);
			if (intMember(resultRelation, qvars) ||
				intMember(resultRelation, tvars))
			{
				/* result relation is scanned */
				ok = ((aclcheck_result = CHECK(ACL_RD)) == ACLCHECK_OK);
				opstr = "read";
				if (!ok)
					break;
			}
			switch (operation)
			{
340 341 342 343 344 345 346 347 348 349 350
				case CMD_INSERT:
					ok = ((aclcheck_result = CHECK(ACL_AP)) == ACLCHECK_OK) ||
						((aclcheck_result = CHECK(ACL_WR)) == ACLCHECK_OK);
					opstr = "append";
					break;
				case CMD_DELETE:
				case CMD_UPDATE:
					ok = ((aclcheck_result = CHECK(ACL_WR)) == ACLCHECK_OK);
					opstr = "write";
					break;
				default:
351
					elog(ERROR, "ExecCheckPerms: bogus operation %d",
352
						 operation);
353 354 355 356 357 358 359
			}
		}
		else
		{
			ok = ((aclcheck_result = CHECK(ACL_RD)) == ACLCHECK_OK);
			opstr = "read";
		}
360
		if (!ok)
361 362
			break;
		++i;
363 364
	}
	if (!ok)
365
		elog(ERROR, "%s: %s", rname.data, aclcheck_error_strings[aclcheck_result]);
366 367
}

368 369 370 371 372 373 374
/* ===============================================================
 * ===============================================================
						 static routines follow
 * ===============================================================
 * ===============================================================
 */

375 376

/* ----------------------------------------------------------------
377 378 379 380
 *		InitPlan
 *
 *		Initializes the query plan: open files, allocate storage
 *		and start up the rule manager
381 382
 * ----------------------------------------------------------------
 */
383
static TupleDesc
384
InitPlan(CmdType operation, Query *parseTree, Plan *plan, EState *estate)
385
{
386 387 388
	List	   *rangeTable;
	int			resultRelation;
	Relation	intoRelationDesc;
389

390 391 392
	TupleDesc	tupType;
	List	   *targetList;
	int			len;
393

394
	/******************
395
	 *	get information from query descriptor
396
	 ******************
397
	 */
398 399
	rangeTable = parseTree->rtable;
	resultRelation = parseTree->resultRelation;
400

401
	/******************
402
	 *	initialize the node's execution state
403
	 ******************
404
	 */
405 406
	estate->es_range_table = rangeTable;

407
	/******************
408 409 410 411
	 *	initialize the BaseId counter so node base_id's
	 *	are assigned correctly.  Someday baseid's will have to
	 *	be stored someplace other than estate because they
	 *	should be unique per query planned.
412
	 ******************
413
	 */
414
	estate->es_BaseId = 1;
415

416
	/******************
417
	 *	initialize result relation stuff
418
	 ******************
419
	 */
420

421 422
	if (resultRelation != 0 && operation != CMD_SELECT)
	{
423
		/******************
424 425
		 *	  if we have a result relation, open it and
		 *	  initialize the result relation info stuff.
426
		 ******************
427
		 */
428 429 430 431 432
		RelationInfo *resultRelationInfo;
		Index		resultRelationIndex;
		RangeTblEntry *rtentry;
		Oid			resultRelationOid;
		Relation	resultRelationDesc;
433 434 435 436 437 438 439

		resultRelationIndex = resultRelation;
		rtentry = rt_fetch(resultRelationIndex, rangeTable);
		resultRelationOid = rtentry->relid;
		resultRelationDesc = heap_open(resultRelationOid);

		if (resultRelationDesc->rd_rel->relkind == RELKIND_SEQUENCE)
440
			elog(ERROR, "You can't change sequence relation %s",
441 442
				 resultRelationDesc->rd_rel->relname.data);

V
Vadim B. Mikheev 已提交
443
		LockRelation(resultRelationDesc, RowExclusiveLock);
444 445 446 447 448 449 450

		resultRelationInfo = makeNode(RelationInfo);
		resultRelationInfo->ri_RangeTableIndex = resultRelationIndex;
		resultRelationInfo->ri_RelationDesc = resultRelationDesc;
		resultRelationInfo->ri_NumIndices = 0;
		resultRelationInfo->ri_IndexRelationDescs = NULL;
		resultRelationInfo->ri_IndexRelationInfo = NULL;
451

452
		/******************
453 454
		 *	open indices on result relation and save descriptors
		 *	in the result relation information..
455
		 ******************
456
		 */
V
Vadim B. Mikheev 已提交
457 458
		if (operation != CMD_DELETE)
			ExecOpenIndices(resultRelationOid, resultRelationInfo);
459 460

		estate->es_result_relation_info = resultRelationInfo;
461
	}
462 463
	else
	{
464
		/******************
465
		 *		if no result relation, then set state appropriately
466
		 ******************
467 468 469 470 471 472 473 474
		 */
		estate->es_result_relation_info = NULL;
	}

#ifndef NO_SECURITY
	ExecCheckPerms(operation, resultRelation, rangeTable, parseTree);
#endif

475
	/******************
476
	 *	  initialize the executor "tuple" table.
477
	 ******************
478 479
	 */
	{
480 481
		int			nSlots = ExecCountSlotsNode(plan);
		TupleTable	tupleTable = ExecCreateTupleTable(nSlots + 10);		/* why add ten? - jolly */
482

483 484
		estate->es_tupleTable = tupleTable;
	}
485

486
	/******************
487 488 489 490
	 *	   initialize the private state information for
	 *	   all the nodes in the query tree.  This opens
	 *	   files, allocates storage and leaves us ready
	 *	   to start processing tuples..
491
	 ******************
492 493 494
	 */
	ExecInitNode(plan, estate, NULL);

495
	/******************
496 497 498
	 *	   get the tuple descriptor describing the type
	 *	   of tuples to return.. (this is especially important
	 *	   if we are creating a relation with "retrieve into")
499
	 ******************
500 501 502 503 504
	 */
	tupType = ExecGetTupType(plan);		/* tuple descriptor */
	targetList = plan->targetlist;
	len = ExecTargetListLength(targetList);		/* number of attributes */

505
	/******************
506 507 508 509 510 511
	 *	  now that we have the target list, initialize the junk filter
	 *	  if this is a REPLACE or a DELETE query.
	 *	  We also init the junk filter if this is an append query
	 *	  (there might be some rule lock info there...)
	 *	  NOTE: in the future we might want to initialize the junk
	 *	  filter for all queries.
512
	 ******************
513 514
	 *		  SELECT added by daveh@insightdist.com  5/20/98 to allow
	 *		  ORDER/GROUP BY have an identifier missing from the target.
515 516
	 */
	{
517 518 519
		bool		junk_filter_needed = false;
		List	   *tlist;

520
		if (operation == CMD_SELECT)
521 522 523
		{
			foreach(tlist, targetList)
			{
524 525
				TargetEntry *tle = lfirst(tlist);

526 527 528 529 530 531 532 533 534 535 536 537 538
				if (tle->resdom->resjunk)
				{
					junk_filter_needed = true;
					break;
				}
			}
		}

		if (operation == CMD_UPDATE || operation == CMD_DELETE ||
			operation == CMD_INSERT ||
			(operation == CMD_SELECT && junk_filter_needed))
		{
			JunkFilter *j = (JunkFilter *) ExecInitJunkFilter(targetList);
539

540
			estate->es_junkFilter = j;
541

542 543 544 545 546 547
			if (operation == CMD_SELECT)
				tupType = j->jf_cleanTupType;
		}
		else
			estate->es_junkFilter = NULL;
	}
548

549
	/******************
550
	 *	initialize the "into" relation
551
	 ******************
552 553 554 555 556
	 */
	intoRelationDesc = (Relation) NULL;

	if (operation == CMD_SELECT)
	{
557 558 559
		char	   *intoName;
		Oid			intoRelationId;
		TupleDesc	tupdesc;
560 561 562 563 564 565 566 567 568

		if (!parseTree->isPortal)
		{

			/*
			 * a select into table
			 */
			if (parseTree->into != NULL)
			{
569
				/******************
570
				 *	create the "into" relation
571
				 ******************
572 573 574 575 576 577 578 579
				 */
				intoName = parseTree->into;

				/*
				 * have to copy tupType to get rid of constraints
				 */
				tupdesc = CreateTupleDescCopy(tupType);

580
				intoRelationId = heap_create_with_catalog(intoName,
581
											  tupdesc, RELKIND_RELATION);
582

583 584
				FreeTupleDesc(tupdesc);

585
				/******************
586 587 588
				 *	XXX rather than having to call setheapoverride(true)
				 *		and then back to false, we should change the
				 *		arguments to heap_open() instead..
589
				 ******************
590 591 592 593 594 595 596 597 598 599 600 601 602
				 */
				setheapoverride(true);

				intoRelationDesc = heap_open(intoRelationId);

				setheapoverride(false);
			}
		}
	}

	estate->es_into_relation_descriptor = intoRelationDesc;

	return tupType;
603 604 605
}

/* ----------------------------------------------------------------
606 607 608
 *		EndPlan
 *
 *		Cleans up the query plan -- closes files and free up storages
609 610 611
 * ----------------------------------------------------------------
 */
static void
612
EndPlan(Plan *plan, EState *estate)
613
{
614 615
	RelationInfo *resultRelationInfo;
	Relation	intoRelationDesc;
616

617
	/******************
618
	 *	get information from state
619
	 ******************
620
	 */
621 622 623
	resultRelationInfo = estate->es_result_relation_info;
	intoRelationDesc = estate->es_into_relation_descriptor;

624
	/******************
625
	 *	 shut down the query
626
	 ******************
627 628 629
	 */
	ExecEndNode(plan, plan);

630
	/******************
631
	 *	  destroy the executor "tuple" table.
632
	 ******************
633 634
	 */
	{
635
		TupleTable	tupleTable = (TupleTable) estate->es_tupleTable;
636 637 638 639 640

		ExecDestroyTupleTable(tupleTable, true);		/* was missing last arg */
		estate->es_tupleTable = NULL;
	}

641
	/******************
642
	 *	 close the result relations if necessary
643
	 ******************
644 645 646
	 */
	if (resultRelationInfo != NULL)
	{
647
		Relation	resultRelationDesc;
648 649 650 651

		resultRelationDesc = resultRelationInfo->ri_RelationDesc;
		heap_close(resultRelationDesc);

652
		/******************
653
		 *	close indices on the result relation
654
		 ******************
655 656 657 658
		 */
		ExecCloseIndices(resultRelationInfo);
	}

659
	/******************
660
	 *	 close the "into" relation if necessary
661
	 ******************
662 663 664
	 */
	if (intoRelationDesc != NULL)
		heap_close(intoRelationDesc);
665 666 667
}

/* ----------------------------------------------------------------
668 669 670 671 672 673 674 675
 *		ExecutePlan
 *
 *		processes the query plan to retrieve 'tupleCount' tuples in the
 *		direction specified.
 *		Retrieves all tuples if tupleCount is 0
 *
 *		result is either a slot containing a tuple in the case
 *		of a RETRIEVE or NULL otherwise.
676 677 678 679 680 681 682 683
 *
 * ----------------------------------------------------------------
 */

/* the ctid attribute is a 'junk' attribute that is removed before the
   user can see it*/

static TupleTableSlot *
684 685 686
ExecutePlan(EState *estate,
			Plan *plan,
			Query *parseTree,
687 688 689 690
			CmdType operation,
			int numberTuples,
			ScanDirection direction,
			void (*printfunc) ())
691
{
692
	JunkFilter *junkfilter;
693 694

	TupleTableSlot *slot;
695
	ItemPointer tupleid = NULL;
696
	ItemPointerData tuple_ctid;
697
	int			current_tuple_count;
698 699
	TupleTableSlot *result;

700
	/******************
701
	 *	initialize local variables
702
	 ******************
703
	 */
704 705 706 707
	slot = NULL;
	current_tuple_count = 0;
	result = NULL;

708
	/******************
709
	 *	Set the direction.
710
	 ******************
711
	 */
712 713
	estate->es_direction = direction;

714
	/******************
715 716
	 *	Loop until we've processed the proper number
	 *	of tuples from the plan..
717
	 ******************
718 719 720 721
	 */

	for (;;)
	{
722 723 724 725 726 727
		/******************
		 *	Execute the plan and obtain a tuple
		 ******************
		 */
		/* at the top level, the parent of a plan (2nd arg) is itself */
		slot = ExecProcNode(plan, plan);
728

729 730 731 732 733 734 735 736 737 738
		/******************
		 *	if the tuple is null, then we assume
		 *	there is nothing more to process so
		 *	we just return null...
		 ******************
		 */
		if (TupIsNull(slot))
		{
			result = NULL;
			break;
739 740
		}

741
		/******************
742 743 744 745 746 747
		 *		if we have a junk filter, then project a new
		 *		tuple with the junk removed.
		 *
		 *		Store this new "clean" tuple in the place of the
		 *		original tuple.
		 *
748 749
		 *		Also, extract all the junk information we need.
		 ******************
750 751 752
		 */
		if ((junkfilter = estate->es_junkFilter) != (JunkFilter *) NULL)
		{
753
			Datum		datum;
754 755

/*			NameData	attrName; */
756 757
			HeapTuple	newTuple;
			bool		isNull;
758

759
			/******************
760
			 * extract the 'ctid' junk attribute.
761
			 ******************
762 763 764 765 766 767 768 769
			 */
			if (operation == CMD_UPDATE || operation == CMD_DELETE)
			{
				if (!ExecGetJunkAttribute(junkfilter,
										  slot,
										  "ctid",
										  &datum,
										  &isNull))
770
					elog(ERROR, "ExecutePlan: NO (junk) `ctid' was found!");
771 772

				if (isNull)
773
					elog(ERROR, "ExecutePlan: (junk) `ctid' is NULL!");
774 775 776 777 778 779 780

				tupleid = (ItemPointer) DatumGetPointer(datum);
				tuple_ctid = *tupleid;	/* make sure we don't free the
										 * ctid!! */
				tupleid = &tuple_ctid;
			}

781
			/******************
782 783
			 * Finally create a new "clean" tuple with all junk attributes
			 * removed
784
			 ******************
785 786 787 788 789 790 791 792 793 794
			 */
			newTuple = ExecRemoveJunk(junkfilter, slot);

			slot = ExecStoreTuple(newTuple,		/* tuple to store */
								  slot, /* destination slot */
								  InvalidBuffer,		/* this tuple has no
														 * buffer */
								  true);		/* tuple should be pfreed */
		}						/* if (junkfilter... */

795
		/******************
796 797 798 799
		 *		now that we have a tuple, do the appropriate thing
		 *		with it.. either return it to the user, add
		 *		it to a relation someplace, delete it from a
		 *		relation, or modify some of it's attributes.
800
		 ******************
801 802 803 804
		 */

		switch (operation)
		{
805 806 807 808 809 810
			case CMD_SELECT:
				ExecRetrieve(slot,		/* slot containing tuple */
							 printfunc, /* print function */
							 estate);	/* */
				result = slot;
				break;
811

812 813 814 815
			case CMD_INSERT:
				ExecAppend(slot, tupleid, estate);
				result = NULL;
				break;
816

817 818 819 820
			case CMD_DELETE:
				ExecDelete(slot, tupleid, estate);
				result = NULL;
				break;
821

822 823 824 825
			case CMD_UPDATE:
				ExecReplace(slot, tupleid, estate, parseTree);
				result = NULL;
				break;
826

827 828
			default:
				elog(DEBUG, "ExecutePlan: unknown operation in queryDesc");
829
				result = NULL;
830
				break;
831
		}
832
		/******************
833 834 835
		 *		check our tuple count.. if we've returned the
		 *		proper number then return, else loop again and
		 *		process more tuples..
836
		 ******************
837 838 839 840
		 */
		current_tuple_count += 1;
		if (numberTuples == current_tuple_count)
			break;
841
	}
842

843
	/******************
844 845
	 *	here, result is either a slot containing a tuple in the case
	 *	of a RETRIEVE or NULL otherwise.
846
	 ******************
847
	 */
848
	return result;
849 850 851
}

/* ----------------------------------------------------------------
852
 *		ExecRetrieve
853
 *
854 855 856 857 858
 *		RETRIEVEs are easy.. we just pass the tuple to the appropriate
 *		print function.  The only complexity is when we do a
 *		"retrieve into", in which case we insert the tuple into
 *		the appropriate relation (note: this is a newly created relation
 *		so we don't need to worry about indices or locks.)
859 860 861
 * ----------------------------------------------------------------
 */
static void
862
ExecRetrieve(TupleTableSlot *slot,
863
			 void (*printfunc) (),
864
			 EState *estate)
865
{
866 867
	HeapTuple	tuple;
	TupleDesc	attrtype;
868

869
	/******************
870
	 *	get the heap tuple out of the tuple table slot
871
	 ******************
872 873 874 875
	 */
	tuple = slot->val;
	attrtype = slot->ttc_tupleDescriptor;

876
	/******************
877
	 *	insert the tuple into the "into relation"
878
	 ******************
879 880 881 882 883 884 885
	 */
	if (estate->es_into_relation_descriptor != NULL)
	{
		heap_insert(estate->es_into_relation_descriptor, tuple);
		IncrAppended();
	}

886
	/******************
887
	 *	send the tuple to the front end (or the screen)
888
	 ******************
889 890 891 892
	 */
	(*printfunc) (tuple, attrtype);
	IncrRetrieved();
	(estate->es_processed)++;
893 894 895
}

/* ----------------------------------------------------------------
896
 *		ExecAppend
897
 *
898 899 900
 *		APPENDs are trickier.. we have to insert the tuple into
 *		the base relation and insert appropriate tuples into the
 *		index relations.
901 902 903 904
 * ----------------------------------------------------------------
 */

static void
905
ExecAppend(TupleTableSlot *slot,
906
		   ItemPointer tupleid,
907
		   EState *estate)
908
{
909 910 911 912 913
	HeapTuple	tuple;
	RelationInfo *resultRelationInfo;
	Relation	resultRelationDesc;
	int			numIndices;
	Oid			newId;
914

915
	/******************
916
	 *	get the heap tuple out of the tuple table slot
917
	 ******************
918 919 920
	 */
	tuple = slot->val;

921
	/******************
922
	 *	get information on the result relation
923
	 ******************
924 925 926 927
	 */
	resultRelationInfo = estate->es_result_relation_info;
	resultRelationDesc = resultRelationInfo->ri_RelationDesc;

928
	/******************
929 930
	 *	have to add code to preform unique checking here.
	 *	cim -12/1/89
931
	 ******************
932 933 934 935 936 937
	 */

	/* BEFORE ROW INSERT Triggers */
	if (resultRelationDesc->trigdesc &&
	resultRelationDesc->trigdesc->n_before_row[TRIGGER_EVENT_INSERT] > 0)
	{
938
		HeapTuple	newtuple;
939 940 941 942 943 944 945 946 947 948 949 950 951 952

		newtuple = ExecBRInsertTriggers(resultRelationDesc, tuple);

		if (newtuple == NULL)	/* "do nothing" */
			return;

		if (newtuple != tuple)	/* modified by Trigger(s) */
		{
			Assert(slot->ttc_shouldFree);
			pfree(tuple);
			slot->val = tuple = newtuple;
		}
	}

953
	/******************
954
	 * Check the constraints of a tuple
955
	 ******************
956 957 958 959
	 */

	if (resultRelationDesc->rd_att->constr)
	{
960
		ExecConstraints("ExecAppend", resultRelationDesc, tuple);
961 962
	}

963
	/******************
964
	 *	insert the tuple
965
	 ******************
966 967 968 969 970
	 */
	newId = heap_insert(resultRelationDesc,		/* relation desc */
						tuple); /* heap tuple */
	IncrAppended();

971
	/******************
972 973 974 975 976
	 *	process indices
	 *
	 *	Note: heap_insert adds a new tuple to a relation.  As a side
	 *	effect, the tupleid of the new tuple is placed in the new
	 *	tuple's t_ctid field.
977
	 ******************
978 979 980
	 */
	numIndices = resultRelationInfo->ri_NumIndices;
	if (numIndices > 0)
981
		ExecInsertIndexTuples(slot, &(tuple->t_self), estate, false);
982 983 984 985 986 987 988
	(estate->es_processed)++;
	estate->es_lastoid = newId;

	/* AFTER ROW INSERT Triggers */
	if (resultRelationDesc->trigdesc &&
	 resultRelationDesc->trigdesc->n_after_row[TRIGGER_EVENT_INSERT] > 0)
		ExecARInsertTriggers(resultRelationDesc, tuple);
989 990 991
}

/* ----------------------------------------------------------------
992
 *		ExecDelete
993
 *
994 995
 *		DELETE is like append, we delete the tuple and its
 *		index tuples.
996 997 998
 * ----------------------------------------------------------------
 */
static void
999
ExecDelete(TupleTableSlot *slot,
1000
		   ItemPointer tupleid,
1001
		   EState *estate)
1002
{
V
Vadim B. Mikheev 已提交
1003 1004 1005 1006
	RelationInfo	   *resultRelationInfo;
	Relation			resultRelationDesc;
	ItemPointerData		ctid;
	int					result;
1007

1008
	/******************
1009
	 *	get the result relation information
1010
	 ******************
1011 1012 1013 1014 1015 1016 1017 1018
	 */
	resultRelationInfo = estate->es_result_relation_info;
	resultRelationDesc = resultRelationInfo->ri_RelationDesc;

	/* BEFORE ROW DELETE Triggers */
	if (resultRelationDesc->trigdesc &&
	resultRelationDesc->trigdesc->n_before_row[TRIGGER_EVENT_DELETE] > 0)
	{
1019
		bool		dodelete;
1020

V
Vadim B. Mikheev 已提交
1021
		dodelete = ExecBRDeleteTriggers(estate, tupleid);
1022 1023 1024 1025 1026

		if (!dodelete)			/* "do nothing" */
			return;
	}

V
Vadim B. Mikheev 已提交
1027
	/*
1028 1029
	 *	delete the tuple
	 */
V
Vadim B. Mikheev 已提交
1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049
	result = heap_delete(resultRelationDesc, tupleid, &ctid);
	switch (result)
	{
		case HeapTupleSelfUpdated:
			return;

		case HeapTupleMayBeUpdated:
			break;

		case HeapTupleUpdated:
			if (XactIsoLevel == XACT_SERIALIZED)
				elog(ERROR, "Serialize access failed due to concurrent update");
			else
				elog(ERROR, "Isolation level %u is not supported", XactIsoLevel);
			return;

		default:
			elog(ERROR, "Unknown status %u from heap_delete", result);
			return;
	}
1050 1051 1052 1053

	IncrDeleted();
	(estate->es_processed)++;

1054
	/******************
1055 1056 1057 1058 1059 1060 1061 1062
	 *	Note: Normally one would think that we have to
	 *		  delete index tuples associated with the
	 *		  heap tuple now..
	 *
	 *		  ... but in POSTGRES, we have no need to do this
	 *		  because the vacuum daemon automatically
	 *		  opens an index scan and deletes index tuples
	 *		  when it finds deleted heap tuples. -cim 9/27/89
1063
	 ******************
1064 1065 1066 1067 1068
	 */

	/* AFTER ROW DELETE Triggers */
	if (resultRelationDesc->trigdesc &&
	 resultRelationDesc->trigdesc->n_after_row[TRIGGER_EVENT_DELETE] > 0)
V
Vadim B. Mikheev 已提交
1069
		ExecARDeleteTriggers(estate, tupleid);
1070 1071 1072 1073

}

/* ----------------------------------------------------------------
1074
 *		ExecReplace
1075
 *
1076 1077 1078 1079 1080 1081
 *		note: we can't run replace queries with transactions
 *		off because replaces are actually appends and our
 *		scan will mistakenly loop forever, replacing the tuple
 *		it just appended..	This should be fixed but until it
 *		is, we don't want to get stuck in an infinite loop
 *		which corrupts your database..
1082 1083 1084
 * ----------------------------------------------------------------
 */
static void
1085
ExecReplace(TupleTableSlot *slot,
1086
			ItemPointer tupleid,
1087 1088
			EState *estate,
			Query *parseTree)
1089
{
V
Vadim B. Mikheev 已提交
1090 1091 1092 1093 1094 1095
	HeapTuple			tuple;
	RelationInfo	   *resultRelationInfo;
	Relation			resultRelationDesc;
	ItemPointerData		ctid;
	int					result;
	int					numIndices;
1096

1097
	/******************
1098
	 *	abort the operation if not running transactions
1099
	 ******************
1100 1101 1102 1103 1104 1105 1106
	 */
	if (IsBootstrapProcessingMode())
	{
		elog(DEBUG, "ExecReplace: replace can't run without transactions");
		return;
	}

1107
	/******************
1108
	 *	get the heap tuple out of the tuple table slot
1109
	 ******************
1110 1111 1112
	 */
	tuple = slot->val;

1113
	/******************
1114
	 *	get the result relation information
1115
	 ******************
1116 1117 1118 1119
	 */
	resultRelationInfo = estate->es_result_relation_info;
	resultRelationDesc = resultRelationInfo->ri_RelationDesc;

1120
	/******************
1121 1122 1123 1124
	 *	have to add code to preform unique checking here.
	 *	in the event of unique tuples, this becomes a deletion
	 *	of the original tuple affected by the replace.
	 *	cim -12/1/89
1125
	 ******************
1126 1127 1128 1129 1130 1131
	 */

	/* BEFORE ROW UPDATE Triggers */
	if (resultRelationDesc->trigdesc &&
	resultRelationDesc->trigdesc->n_before_row[TRIGGER_EVENT_UPDATE] > 0)
	{
1132
		HeapTuple	newtuple;
1133

V
Vadim B. Mikheev 已提交
1134
		newtuple = ExecBRUpdateTriggers(estate, tupleid, tuple);
1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146

		if (newtuple == NULL)	/* "do nothing" */
			return;

		if (newtuple != tuple)	/* modified by Trigger(s) */
		{
			Assert(slot->ttc_shouldFree);
			pfree(tuple);
			slot->val = tuple = newtuple;
		}
	}

1147
	/******************
1148
	 * Check the constraints of a tuple
1149
	 ******************
1150 1151 1152 1153
	 */

	if (resultRelationDesc->rd_att->constr)
	{
1154
		ExecConstraints("ExecReplace", resultRelationDesc, tuple);
1155 1156
	}

V
Vadim B. Mikheev 已提交
1157
	/*
1158 1159
	 *	replace the heap tuple
	 */
V
Vadim B. Mikheev 已提交
1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178
	result = heap_replace(resultRelationDesc, tupleid, tuple, &ctid);
	switch (result)
	{
		case HeapTupleSelfUpdated:
			return;

		case HeapTupleMayBeUpdated:
			break;

		case HeapTupleUpdated:
			if (XactIsoLevel == XACT_SERIALIZED)
				elog(ERROR, "Serialize access failed due to concurrent update");
			else
				elog(ERROR, "Isolation level %u is not supported", XactIsoLevel);
			return;

		default:
			elog(ERROR, "Unknown status %u from heap_replace", result);
			return;
1179 1180 1181 1182 1183
	}

	IncrReplaced();
	(estate->es_processed)++;

1184
	/******************
1185 1186 1187 1188 1189 1190 1191
	 *	Note: instead of having to update the old index tuples
	 *		  associated with the heap tuple, all we do is form
	 *		  and insert new index tuples..  This is because
	 *		  replaces are actually deletes and inserts and
	 *		  index tuple deletion is done automagically by
	 *		  the vaccuum deamon.. All we do is insert new
	 *		  index tuples.  -cim 9/27/89
1192
	 ******************
1193 1194
	 */

1195
	/******************
1196 1197 1198 1199 1200 1201 1202
	 *	process indices
	 *
	 *	heap_replace updates a tuple in the base relation by invalidating
	 *	it and then appending a new tuple to the relation.	As a side
	 *	effect, the tupleid of the new tuple is placed in the new
	 *	tuple's t_ctid field.  So we now insert index tuples using
	 *	the new tupleid stored there.
1203
	 ******************
1204 1205 1206 1207
	 */

	numIndices = resultRelationInfo->ri_NumIndices;
	if (numIndices > 0)
1208
		ExecInsertIndexTuples(slot, &(tuple->t_self), estate, true);
1209 1210 1211 1212

	/* AFTER ROW UPDATE Triggers */
	if (resultRelationDesc->trigdesc &&
	 resultRelationDesc->trigdesc->n_after_row[TRIGGER_EVENT_UPDATE] > 0)
V
Vadim B. Mikheev 已提交
1213
		ExecARUpdateTriggers(estate, tupleid, tuple);
1214
}
V
Vadim B. Mikheev 已提交
1215

1216
#if 0
1217
static HeapTuple
1218
ExecAttrDefault(Relation rel, HeapTuple tuple)
V
Vadim B. Mikheev 已提交
1219
{
1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231
	int			ndef = rel->rd_att->constr->num_defval;
	AttrDefault *attrdef = rel->rd_att->constr->defval;
	ExprContext *econtext = makeNode(ExprContext);
	HeapTuple	newtuple;
	Node	   *expr;
	bool		isnull;
	bool		isdone;
	Datum		val;
	Datum	   *replValue = NULL;
	char	   *replNull = NULL;
	char	   *repl = NULL;
	int			i;
1232 1233 1234 1235 1236 1237

	econtext->ecxt_scantuple = NULL;	/* scan tuple slot */
	econtext->ecxt_innertuple = NULL;	/* inner tuple slot */
	econtext->ecxt_outertuple = NULL;	/* outer tuple slot */
	econtext->ecxt_relation = NULL;		/* relation */
	econtext->ecxt_relid = 0;	/* relid */
1238 1239
	econtext->ecxt_param_list_info = NULL;		/* param list info */
	econtext->ecxt_param_exec_vals = NULL;		/* exec param values */
1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258
	econtext->ecxt_range_table = NULL;	/* range table */
	for (i = 0; i < ndef; i++)
	{
		if (!heap_attisnull(tuple, attrdef[i].adnum))
			continue;
		expr = (Node *) stringToNode(attrdef[i].adbin);

		val = ExecEvalExpr(expr, econtext, &isnull, &isdone);

		pfree(expr);

		if (isnull)
			continue;

		if (repl == NULL)
		{
			repl = (char *) palloc(rel->rd_att->natts * sizeof(char));
			replNull = (char *) palloc(rel->rd_att->natts * sizeof(char));
			replValue = (Datum *) palloc(rel->rd_att->natts * sizeof(Datum));
B
Bruce Momjian 已提交
1259
			MemSet(repl, ' ', rel->rd_att->natts * sizeof(char));
1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270
		}

		repl[attrdef[i].adnum - 1] = 'r';
		replNull[attrdef[i].adnum - 1] = ' ';
		replValue[attrdef[i].adnum - 1] = val;

	}

	pfree(econtext);

	if (repl == NULL)
1271
		return tuple;
1272

1273
	newtuple = heap_modifytuple(tuple, rel, replValue, replNull, repl);
1274 1275

	pfree(repl);
1276
	pfree(tuple);
1277 1278 1279
	pfree(replNull);
	pfree(replValue);

1280
	return newtuple;
1281

V
Vadim B. Mikheev 已提交
1282
}
1283

1284
#endif
V
Vadim B. Mikheev 已提交
1285

1286
static char *
1287
ExecRelCheck(Relation rel, HeapTuple tuple)
V
Vadim B. Mikheev 已提交
1288
{
1289 1290 1291
	int			ncheck = rel->rd_att->constr->num_check;
	ConstrCheck *check = rel->rd_att->constr->check;
	ExprContext *econtext = makeNode(ExprContext);
1292
	TupleTableSlot *slot = makeNode(TupleTableSlot);
1293 1294 1295 1296 1297
	RangeTblEntry *rte = makeNode(RangeTblEntry);
	List	   *rtlist;
	List	   *qual;
	bool		res;
	int			i;
1298 1299 1300 1301 1302 1303 1304 1305 1306

	slot->val = tuple;
	slot->ttc_shouldFree = false;
	slot->ttc_descIsNew = true;
	slot->ttc_tupleDescriptor = rel->rd_att;
	slot->ttc_buffer = InvalidBuffer;
	slot->ttc_whichplan = -1;
	rte->relname = nameout(&(rel->rd_rel->relname));
	rte->refname = rte->relname;
1307
	rte->relid = RelationGetRelid(rel);
1308 1309 1310 1311 1312 1313 1314 1315 1316
	rte->inh = false;
	rte->inFromCl = true;
	rtlist = lcons(rte, NIL);
	econtext->ecxt_scantuple = slot;	/* scan tuple slot */
	econtext->ecxt_innertuple = NULL;	/* inner tuple slot */
	econtext->ecxt_outertuple = NULL;	/* outer tuple slot */
	econtext->ecxt_relation = rel;		/* relation */
	econtext->ecxt_relid = 0;	/* relid */
	econtext->ecxt_param_list_info = NULL;		/* param list info */
V
Vadim B. Mikheev 已提交
1317
	econtext->ecxt_param_exec_vals = NULL;		/* exec param values */
1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328
	econtext->ecxt_range_table = rtlist;		/* range table */

	for (i = 0; i < ncheck; i++)
	{
		qual = (List *) stringToNode(check[i].ccbin);

		res = ExecQual(qual, econtext);

		pfree(qual);

		if (!res)
1329
			return check[i].ccname;
1330 1331 1332 1333 1334 1335 1336 1337
	}

	pfree(slot);
	pfree(rte->relname);
	pfree(rte);
	pfree(rtlist);
	pfree(econtext);

1338
	return (char *) NULL;
1339

V
Vadim B. Mikheev 已提交
1340 1341
}

1342
void
1343
ExecConstraints(char *caller, Relation rel, HeapTuple tuple)
V
Vadim B. Mikheev 已提交
1344
{
1345 1346 1347 1348

	Assert(rel->rd_att->constr);

	if (rel->rd_att->constr->has_not_null)
V
Vadim B. Mikheev 已提交
1349
	{
1350
		int			attrChk;
1351 1352 1353 1354

		for (attrChk = 1; attrChk <= rel->rd_att->natts; attrChk++)
		{
			if (rel->rd_att->attrs[attrChk - 1]->attnotnull && heap_attisnull(tuple, attrChk))
1355
				elog(ERROR, "%s: Fail to add null value in not null attribute %s",
1356 1357 1358 1359 1360 1361
				  caller, rel->rd_att->attrs[attrChk - 1]->attname.data);
		}
	}

	if (rel->rd_att->constr->num_check > 0)
	{
1362
		char	   *failed;
1363 1364

		if ((failed = ExecRelCheck(rel, tuple)) != NULL)
1365
			elog(ERROR, "%s: rejected due to CHECK constraint %s", caller, failed);
1366 1367
	}

1368
	return;
V
Vadim B. Mikheev 已提交
1369
}