rewriteHandler.c 20.3 KB
Newer Older
1 2 3 4 5 6 7 8
/*-------------------------------------------------------------------------
 *
 * rewriteHandler.c--
 *
 * Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
9
 *	  $Header: /cvsroot/pgsql/src/backend/rewrite/rewriteHandler.c,v 1.17 1998/07/19 05:49:24 momjian Exp $
10 11 12
 *
 *-------------------------------------------------------------------------
 */
M
Marc G. Fournier 已提交
13
#include <string.h>
14 15 16 17 18
#include "postgres.h"
#include "miscadmin.h"
#include "utils/palloc.h"
#include "utils/elog.h"
#include "utils/rel.h"
19
#include "nodes/pg_list.h"
20 21
#include "nodes/primnodes.h"

22
#include "parser/parsetree.h"	/* for parsetree manipulation */
23 24
#include "nodes/parsenodes.h"

25
#include "rewrite/rewriteSupport.h"
26 27 28 29 30 31 32
#include "rewrite/rewriteHandler.h"
#include "rewrite/rewriteManip.h"
#include "rewrite/locks.h"

#include "commands/creatinh.h"
#include "access/heapam.h"

M
Marc G. Fournier 已提交
33 34
#include "utils/syscache.h"
#include "utils/acl.h"
35
#include "catalog/pg_shadow.h"
M
Marc G. Fournier 已提交
36

37 38
static void
ApplyRetrieveRule(Query *parsetree, RewriteRule *rule,
M
Marc G. Fournier 已提交
39 40
				  int rt_index, int relation_level,
				  Relation relation, int *modified);
41 42
static List *
fireRules(Query *parsetree, int rt_index, CmdType event,
43
		  bool *instead_flag, List *locks, List **qual_products);
44
static void QueryRewriteSubLink(Node *node);
45
static List *QueryRewriteOne(Query *parsetree);
46
static List *deepRewriteQuery(Query *parsetree);
M
Marc G. Fournier 已提交
47
static void CheckViewPerms(Relation view, List *rtable);
48 49 50

/*
 * gatherRewriteMeta -
51 52 53
 *	  Gather meta information about parsetree, and rule. Fix rule body
 *	  and qualifier so that they can be mixed with the parsetree and
 *	  maintain semantic validity
54 55
 */
static RewriteInfo *
56 57 58
gatherRewriteMeta(Query *parsetree,
				  Query *rule_action,
				  Node *rule_qual,
59 60
				  int rt_index,
				  CmdType event,
61
				  bool *instead_flag)
62
{
63 64 65
	RewriteInfo *info;
	int			rt_length;
	int			result_reln;
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88

	info = (RewriteInfo *) palloc(sizeof(RewriteInfo));
	info->rt_index = rt_index;
	info->event = event;
	info->instead_flag = *instead_flag;
	info->rule_action = (Query *) copyObject(rule_action);
	info->rule_qual = (Node *) copyObject(rule_qual);
	if (info->rule_action == NULL)
		info->nothing = TRUE;
	else
	{
		info->nothing = FALSE;
		info->action = info->rule_action->commandType;
		info->current_varno = rt_index;
		info->rt = parsetree->rtable;
		rt_length = length(info->rt);
		info->rt = append(info->rt, info->rule_action->rtable);

		info->new_varno = PRS2_NEW_VARNO + rt_length;
		OffsetVarNodes(info->rule_action->qual, rt_length);
		OffsetVarNodes((Node *) info->rule_action->targetList, rt_length);
		OffsetVarNodes(info->rule_qual, rt_length);
		ChangeVarNodes((Node *) info->rule_action->qual,
89
					   PRS2_CURRENT_VARNO + rt_length, rt_index, 0);
90
		ChangeVarNodes((Node *) info->rule_action->targetList,
91
					   PRS2_CURRENT_VARNO + rt_length, rt_index, 0);
92
		ChangeVarNodes(info->rule_qual,
93
					   PRS2_CURRENT_VARNO + rt_length, rt_index, 0);
94 95 96 97 98 99 100 101

		/*
		 * bug here about replace CURRENT  -- sort of replace current is
		 * deprecated now so this code shouldn't really need to be so
		 * clutzy but.....
		 */
		if (info->action != CMD_SELECT)
		{						/* i.e update XXXXX */
102
			int			new_result_reln = 0;
103 104 105 106

			result_reln = info->rule_action->resultRelation;
			switch (result_reln)
			{
107 108 109 110 111 112 113
				case PRS2_CURRENT_VARNO:
					new_result_reln = rt_index;
					break;
				case PRS2_NEW_VARNO:	/* XXX */
				default:
					new_result_reln = result_reln + rt_length;
					break;
114 115 116 117 118
			}
			info->rule_action->resultRelation = new_result_reln;
		}
	}
	return info;
119 120
}

121
static List *
122
OptimizeRIRRules(List *locks)
123
{
124 125 126
	List	   *attr_level = NIL,
			   *i;
	List	   *relation_level = NIL;
127 128 129

	foreach(i, locks)
	{
130
		RewriteRule *rule_lock = lfirst(i);
131 132 133 134 135 136 137

		if (rule_lock->attrno == -1)
			relation_level = lappend(relation_level, rule_lock);
		else
			attr_level = lappend(attr_level, rule_lock);
	}
	return nconc(relation_level, attr_level);
138 139 140 141 142 143
}

/*
 * idea is to put instead rules before regular rules so that
 * excess semantically queasy queries aren't processed
 */
144
static List *
145
orderRules(List *locks)
146
{
147 148 149
	List	   *regular = NIL,
			   *i;
	List	   *instead_rules = NIL;
150 151 152

	foreach(i, locks)
	{
153
		RewriteRule *rule_lock = (RewriteRule *) lfirst(i);
154 155 156 157 158 159 160

		if (rule_lock->isInstead)
			instead_rules = lappend(instead_rules, rule_lock);
		else
			regular = lappend(regular, rule_lock);
	}
	return nconc(regular, instead_rules);
161 162 163
}

static int
164
AllRetrieve(List *actions)
165
{
166
	List	   *n;
167 168 169

	foreach(n, actions)
	{
170
		Query	   *pt = lfirst(n);
171 172 173 174 175 176 177 178 179 180

		/*
		 * in the old postgres code, we check whether command_type is a
		 * consp of '('*'.commandType). but we've never supported
		 * transitive closures. Hence removed	 - ay 10/94.
		 */
		if (pt->commandType != CMD_SELECT)
			return false;
	}
	return true;
181 182
}

183
static List *
184
FireRetrieveRulesAtQuery(Query *parsetree,
185 186
						 int rt_index,
						 Relation relation,
187
						 bool *instead_flag,
188
						 int rule_flag)
189
{
190 191 192 193
	List	   *i,
			   *locks;
	RuleLock   *rt_entry_locks = NULL;
	List	   *work = NIL;
194

195
	if ((rt_entry_locks = relation->rd_rules) == NULL)
196
		return NIL;
197

198
	locks = matchLocks(CMD_SELECT, rt_entry_locks, rt_index, parsetree);	
199 200 201 202

	/* find all retrieve instead */
	foreach(i, locks)
	{
203
		RewriteRule *rule_lock = (RewriteRule *) lfirst(i);
204 205 206 207 208 209 210 211 212 213

		if (!rule_lock->isInstead)
			continue;
		work = lappend(work, rule_lock);
	}
	if (work != NIL)
	{
		work = OptimizeRIRRules(locks);
		foreach(i, work)
		{
214 215 216
			RewriteRule *rule_lock = lfirst(i);
			int			relation_level;
			int			modified = FALSE;
217 218 219 220 221 222 223 224 225 226 227 228 229 230

			relation_level = (rule_lock->attrno == -1);
			if (rule_lock->actions == NIL)
			{
				*instead_flag = TRUE;
				return NIL;
			}
			if (!rule_flag &&
				length(rule_lock->actions) >= 2 &&
				AllRetrieve(rule_lock->actions))
			{
				*instead_flag = TRUE;
				return rule_lock->actions;
			}
M
Marc G. Fournier 已提交
231
			ApplyRetrieveRule(parsetree, rule_lock, rt_index, relation_level, relation,
232 233 234 235 236
							  &modified);
			if (modified)
			{
				*instead_flag = TRUE;
				FixResdomTypes(parsetree->targetList);
237

238 239 240
				return lcons(parsetree, NIL);
			}
		}
241
	}
242 243
	return NIL;
}
244 245 246 247 248


/* Idea is like this:
 *
 * retrieve-instead-retrieve rules have different semantics than update nodes
249
 * Separate RIR rules from others.	Pass others to FireRules.
250 251 252 253 254
 * Order RIR rules and process.
 *
 * side effect: parsetree's rtable field might be changed
 */
static void
255 256
ApplyRetrieveRule(Query *parsetree,
				  RewriteRule *rule,
257 258
				  int rt_index,
				  int relation_level,
M
Marc G. Fournier 已提交
259
				  Relation relation,
260
				  int *modified)
261
{
262 263 264 265 266 267 268
	Query	   *rule_action = NULL;
	Node	   *rule_qual;
	List	   *rtable,
			   *rt;
	int			nothing,
				rt_length;
	int			badsql = FALSE;
M
Marc G. Fournier 已提交
269
	int			viewAclOverride = FALSE;
270 271 272 273 274

	rule_qual = rule->qual;
	if (rule->actions)
	{
		if (length(rule->actions) > 1)	/* ??? because we don't handle
275 276 277 278 279 280 281 282 283
										 * rules with more than one
										 * action? -ay */

			/*
			 * WARNING!!! If we sometimes handle rules with more than one
			 * action, the view acl checks might get broken.
			 * viewAclOverride should only become true (below) if this is
			 * a relation_level, instead, select query - Jan
			 */
284 285 286
			return;
		rule_action = copyObject(lfirst(rule->actions));
		nothing = FALSE;
M
Marc G. Fournier 已提交
287 288

		/*
289 290 291 292
		 * If this rule is on the relation level, the rule action is a
		 * select and the rule is instead then it must be a view.
		 * Permissions for views now follow the owner of the view, not the
		 * current user.
M
Marc G. Fournier 已提交
293 294
		 */
		if (relation_level && rule_action->commandType == CMD_SELECT
295
			&& rule->isInstead)
M
Marc G. Fournier 已提交
296
		{
297 298
			CheckViewPerms(relation, rule_action->rtable);
			viewAclOverride = TRUE;
M
Marc G. Fournier 已提交
299
		}
300 301 302 303 304 305 306
	}
	else
		nothing = TRUE;

	rtable = copyObject(parsetree->rtable);
	foreach(rt, rtable)
	{
307
		RangeTblEntry *rte = lfirst(rt);
308 309 310 311 312 313 314 315

		/*
		 * this is to prevent add_missing_vars_to_base_rels() from adding
		 * a bogus entry to the new target list.
		 */
		rte->inFromCl = false;
	}
	rt_length = length(rtable);
M
Marc G. Fournier 已提交
316 317 318

	if (viewAclOverride)
	{
319 320 321
		List	   *rule_rtable,
				   *rule_rt;
		RangeTblEntry *rte;
M
Marc G. Fournier 已提交
322 323 324 325 326 327 328

		rule_rtable = copyObject(rule_action->rtable);
		foreach(rule_rt, rule_rtable)
		{
			rte = lfirst(rule_rt);

			/*
329 330
			 * tell the executor that the ACL check on this range table
			 * entry is already done
M
Marc G. Fournier 已提交
331 332 333 334 335 336 337 338
			 */
			rte->skipAcl = true;
		}

		rtable = nconc(rtable, rule_rtable);
	}
	else
		rtable = nconc(rtable, copyObject(rule_action->rtable));
339 340 341 342 343 344
	parsetree->rtable = rtable;

	rule_action->rtable = rtable;
	OffsetVarNodes(rule_action->qual, rt_length);
	OffsetVarNodes((Node *) rule_action->targetList, rt_length);
	OffsetVarNodes(rule_qual, rt_length);
345 346 347 348
	
	OffsetVarNodes((Node *) rule_action->groupClause, rt_length);
	OffsetVarNodes((Node *) rule_action->havingQual, rt_length);

349
	ChangeVarNodes(rule_action->qual,
350
				   PRS2_CURRENT_VARNO + rt_length, rt_index, 0);
351
	ChangeVarNodes((Node *) rule_action->targetList,
352 353
				   PRS2_CURRENT_VARNO + rt_length, rt_index, 0);
	ChangeVarNodes(rule_qual, PRS2_CURRENT_VARNO + rt_length, rt_index, 0);
354 355 356 357 358 359

	ChangeVarNodes((Node *) rule_action->groupClause,
				   PRS2_CURRENT_VARNO + rt_length, rt_index, 0);
	ChangeVarNodes((Node *) rule_action->havingQual,
				   PRS2_CURRENT_VARNO + rt_length, rt_index, 0);

360 361
	if (relation_level)
	{
362 363
	  HandleViewRule(parsetree, rtable, rule_action->targetList, rt_index,
			 modified);
364 365 366
	}
	else
	{
367 368
	  HandleRIRAttributeRule(parsetree, rtable, rule_action->targetList,
				 rt_index, rule->attrno, modified, &badsql);
369
	}
370 371 372 373 374 375 376 377 378 379
	if (*modified && !badsql) {
	  AddQual(parsetree, rule_action->qual);
	  /* This will only work if the query made to the view defined by the following
	   * groupClause groups by the same attributes or does not use group at all! */
	  if (parsetree->groupClause == NULL)
	    parsetree->groupClause=rule_action->groupClause;
	  AddHavingQual(parsetree, rule_action->havingQual);
	  parsetree->hasAggs = (rule_action->hasAggs || parsetree->hasAggs);
	  parsetree->hasSubLinks = (rule_action->hasSubLinks ||  parsetree->hasSubLinks);
	}	
380 381
}

382
static List *
383 384 385
ProcessRetrieveQuery(Query *parsetree,
					 List *rtable,
					 bool *instead_flag,
386
					 bool rule)
387
{
388 389 390
	List	   *rt;
	List	   *product_queries = NIL;
	int			rt_index = 0;
391

392

393 394
	foreach(rt, rtable)
	{
395 396 397
		RangeTblEntry *rt_entry = lfirst(rt);
		Relation	rt_entry_relation = NULL;
		List	   *result = NIL;
398 399 400 401

		rt_index++;
		rt_entry_relation = heap_openr(rt_entry->relname);

402 403


404 405 406 407 408 409 410 411 412 413 414 415
		if (rt_entry_relation->rd_rules != NULL)
		{
			result =
				FireRetrieveRulesAtQuery(parsetree,
										 rt_index,
										 rt_entry_relation,
										 instead_flag,
										 rule);
		}
		heap_close(rt_entry_relation);
		if (*instead_flag)
			return result;
416
	}
417 418
	if (rule)
		return NIL;
419

420 421
	foreach(rt, rtable)
	{
422 423 424 425 426 427
		RangeTblEntry *rt_entry = lfirst(rt);
		Relation	rt_entry_relation = NULL;
		RuleLock   *rt_entry_locks = NULL;
		List	   *result = NIL;
		List	   *locks = NIL;
		List	   *dummy_products;
428 429 430 431 432 433

		rt_index++;
		rt_entry_relation = heap_openr(rt_entry->relname);
		rt_entry_locks = rt_entry_relation->rd_rules;
		heap_close(rt_entry_relation);

434

435 436 437 438 439 440 441 442 443 444 445 446 447 448
		if (rt_entry_locks)
		{
			locks =
				matchLocks(CMD_SELECT, rt_entry_locks, rt_index, parsetree);
		}
		if (locks != NIL)
		{
			result = fireRules(parsetree, rt_index, CMD_SELECT,
							   instead_flag, locks, &dummy_products);
			if (*instead_flag)
				return lappend(NIL, result);
			if (result != NIL)
				product_queries = nconc(product_queries, result);
		}
449
	}
450
	return product_queries;
451 452
}

453
static Query *
454 455 456
CopyAndAddQual(Query *parsetree,
			   List *actions,
			   Node *rule_qual,
457 458
			   int rt_index,
			   CmdType event)
459
{
460 461 462
	Query	   *new_tree = (Query *) copyObject(parsetree);
	Node	   *new_qual = NULL;
	Query	   *rule_action = NULL;
463 464 465 466 467 468 469

	if (actions)
		rule_action = lfirst(actions);
	if (rule_qual != NULL)
		new_qual = (Node *) copyObject(rule_qual);
	if (rule_action != NULL)
	{
470 471
		List	   *rtable;
		int			rt_length;
472 473 474 475 476 477

		rtable = new_tree->rtable;
		rt_length = length(rtable);
		rtable = append(rtable, listCopy(rule_action->rtable));
		new_tree->rtable = rtable;
		OffsetVarNodes(new_qual, rt_length);
478
		ChangeVarNodes(new_qual, PRS2_CURRENT_VARNO + rt_length, rt_index, 0);
479 480 481 482 483
	}
	/* XXX -- where current doesn't work for instead nothing.... yet */
	AddNotQual(new_tree, new_qual);

	return new_tree;
484 485 486 487
}


/*
488 489 490 491 492
 *	fireRules -
 *	   Iterate through rule locks applying rules.  After an instead rule
 *	   rule has been applied, return just new parsetree and let RewriteQuery
 *	   start the process all over again.  The locks are reordered to maintain
 *	   sensible semantics.	remember: reality is for dead birds -- glass
493 494
 *
 */
495
static List *
496
fireRules(Query *parsetree,
497 498
		  int rt_index,
		  CmdType event,
499 500 501
		  bool *instead_flag,
		  List *locks,
		  List **qual_products)
502
{
503 504 505
	RewriteInfo *info;
	List	   *results = NIL;
	List	   *i;
506 507 508 509 510 511 512

	/* choose rule to fire from list of rules */
	if (locks == NIL)
	{
		ProcessRetrieveQuery(parsetree,
							 parsetree->rtable,
							 instead_flag, TRUE);
513
		if (*instead_flag)
514
			return lappend(NIL, parsetree);
515
		else
516
			return NIL;
517
	}
518 519 520 521

	locks = orderRules(locks);	/* instead rules first */
	foreach(i, locks)
	{
522 523 524 525 526 527
		RewriteRule *rule_lock = (RewriteRule *) lfirst(i);
		Node	   *qual,
				   *event_qual;
		List	   *actions;
		List	   *r;
		bool		orig_instead_flag = *instead_flag;
528 529 530 531 532 533 534 535 536 537 538 539

		/* multiple rule action time */
		*instead_flag = rule_lock->isInstead;
		event_qual = rule_lock->qual;
		actions = rule_lock->actions;
		if (event_qual != NULL && *instead_flag)
			*qual_products =
				lappend(*qual_products,
						CopyAndAddQual(parsetree, actions, event_qual,
									   rt_index, event));
		foreach(r, actions)
		{
540 541
			Query	   *rule_action = lfirst(r);
			Node	   *rule_qual = copyObject(event_qual);
542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607

			/*--------------------------------------------------
			 * Step 1:
			 *	  Rewrite current.attribute or current to tuple variable
			 *	  this appears to be done in parser?
			 *--------------------------------------------------
			 */
			info = gatherRewriteMeta(parsetree, rule_action, rule_qual,
									 rt_index, event, instead_flag);

			/* handle escapable cases, or those handled by other code */
			if (info->nothing)
			{
				if (*instead_flag)
					return NIL;
				else
					continue;
			}

			if (info->action == info->event &&
				info->event == CMD_SELECT)
				continue;

			/*
			 * Event Qualification forces copying of parsetree --- XXX and
			 * splitting into two queries one w/rule_qual, one w/NOT
			 * rule_qual. Also add user query qual onto rule action
			 */
			qual = parsetree->qual;
			AddQual(info->rule_action, qual);

			if (info->rule_qual != NULL)
				AddQual(info->rule_action, info->rule_qual);

			/*--------------------------------------------------
			 * Step 2:
			 *	  Rewrite new.attribute w/ right hand side of target-list
			 *	  entry for appropriate field name in insert/update
			 *--------------------------------------------------
			 */
			if ((info->event == CMD_INSERT) || (info->event == CMD_UPDATE))
				FixNew(info, parsetree);

			/*--------------------------------------------------
			 * Step 3:
			 *	  rewriting due to retrieve rules
			 *--------------------------------------------------
			 */
			info->rule_action->rtable = info->rt;
			ProcessRetrieveQuery(info->rule_action, info->rt,
								 &orig_instead_flag, TRUE);

			/*--------------------------------------------------
			 * Step 4
			 *	  Simplify? hey, no algorithm for simplification... let
			 *	  the planner do it.
			 *--------------------------------------------------
			 */
			results = lappend(results, info->rule_action);

			pfree(info);
		}
		if (*instead_flag)
			break;
	}
	return results;
608 609
}

610
static List *
611
RewriteQuery(Query *parsetree, bool *instead_flag, List **qual_products)
612
{
613 614 615
	CmdType		event;
	List	   *product_queries = NIL;
	int			result_relation = 0;
616

617 618 619 620 621 622
	Assert(parsetree != NULL);

	event = parsetree->commandType;

	if (event == CMD_UTILITY)
		return NIL;
623 624

	/*
625
	 * only for a delete may the targetlist be NULL
626
	 */
627 628 629 630 631 632 633 634 635 636 637
	if (event != CMD_DELETE)
		Assert(parsetree->targetList != NULL);

	result_relation = parsetree->resultRelation;

	if (event != CMD_SELECT)
	{

		/*
		 * the statement is an update, insert or delete
		 */
638 639 640
		RangeTblEntry *rt_entry;
		Relation	rt_entry_relation = NULL;
		RuleLock   *rt_entry_locks = NULL;
641 642 643 644 645 646 647 648

		rt_entry = rt_fetch(result_relation, parsetree->rtable);
		rt_entry_relation = heap_openr(rt_entry->relname);
		rt_entry_locks = rt_entry_relation->rd_rules;
		heap_close(rt_entry_relation);

		if (rt_entry_locks != NULL)
		{
649
			List	   *locks =
650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667
			matchLocks(event, rt_entry_locks, result_relation, parsetree);

			product_queries =
				fireRules(parsetree,
						  result_relation,
						  event,
						  instead_flag,
						  locks,
						  qual_products);
		}
		return product_queries;
	}
	else
	{

		/*
		 * the statement is a select
		 */
668
		Query	   *other;
669

B
Bruce Momjian 已提交
670
		/*
671 672
		 * ApplyRetrieveRule changes the range table XXX Unions are copied
		 * again.
B
Bruce Momjian 已提交
673 674 675
		 */
		other = copyObject(parsetree);

676 677 678
		return
			ProcessRetrieveQuery(other, parsetree->rtable,
								 instead_flag, FALSE);
679 680 681 682 683 684 685 686
	}
}

/*
 * to avoid infinite recursion, we restrict the number of times a query
 * can be rewritten. Detecting cycles is left for the reader as an excercise.
 */
#ifndef REWRITE_INVOKE_MAX
687
#define REWRITE_INVOKE_MAX		10
688 689
#endif

690
static int	numQueryRewriteInvoked = 0;
691 692 693

/*
 * QueryRewrite -
694 695 696
 *	  rewrite one query via QueryRewrite system, possibly returning 0, or many
 *	  queries
 */
697
List *
698
QueryRewrite(Query *parsetree)
699 700
{
	QueryRewriteSubLink(parsetree->qual);
701 702
	QueryRewriteSubLink(parsetree->havingQual);

703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728
	return QueryRewriteOne(parsetree);
}

/*
 *	QueryRewriteSubLink
 *
 *	This rewrites the SubLink subqueries first, doing the lowest ones first.
 *	We already have code in the main rewrite loops to process correlated
 *	variables from upper queries that exist in subqueries.
 */
static void
QueryRewriteSubLink(Node *node)
{
	if (node == NULL)
		return;

	switch (nodeTag(node))
	{
		case T_TargetEntry:
			break;
		case T_Aggreg:
			break;
		case T_Expr:
			{
				Expr	   *expr = (Expr *) node;

729
				QueryRewriteSubLink((Node *) expr->args);
730 731 732 733 734 735 736 737 738 739 740 741 742 743
			}
			break;
		case T_Var:
			break;
		case T_List:
			{
				List	   *l;

				foreach(l, (List *) node)
					QueryRewriteSubLink(lfirst(l));
			}
			break;
		case T_SubLink:
			{
744 745 746
				SubLink    *sublink = (SubLink *) node;
				Query	   *query = (Query *) sublink->subselect;
				List	   *ret;
747 748

				/*
749 750
				 * Nest down first.  We do this so if a rewrite adds a
				 * SubLink we don't process it as part of this loop.
751
				 */
752
				QueryRewriteSubLink((Node *) query->qual);
753 754
				
				QueryRewriteSubLink((Node *) query->havingQual);
755 756 757 758 759 760 761

				ret = QueryRewriteOne(query);
				if (!ret)
					sublink->subselect = NULL;
				else if (lnext(ret) == NIL)
					sublink->subselect = lfirst(ret);
				else
762
					elog(ERROR, "Don't know how to process subquery that rewrites to multiple queries.");
763 764 765 766 767 768 769 770 771 772 773 774 775
			}
			break;
		default:
			/* ignore the others */
			break;
	}
	return;
}

/*
 * QueryOneRewrite -
 *	  rewrite one query
 */
776
static List *
777
QueryRewriteOne(Query *parsetree)
778
{
779
	numQueryRewriteInvoked = 0;
780

781 782 783 784
	/*
	 * take a deep breath and apply all the rewrite rules - ay
	 */
	return deepRewriteQuery(parsetree);
785 786 787 788
}

/*
 * deepRewriteQuery -
789
 *	  rewrites the query and apply the rules again on the queries rewritten
790
 */
791
static List *
792
deepRewriteQuery(Query *parsetree)
793
{
794 795 796 797 798
	List	   *n;
	List	   *rewritten = NIL;
	List	   *result = NIL;
	bool		instead;
	List	   *qual_products = NIL;
799

800 801


802 803
	if (++numQueryRewriteInvoked > REWRITE_INVOKE_MAX)
	{
804
		elog(ERROR, "query rewritten %d times, may contain cycles",
805 806 807 808 809 810 811
			 numQueryRewriteInvoked - 1);
	}

	instead = FALSE;
	result = RewriteQuery(parsetree, &instead, &qual_products);
	if (!instead)
		rewritten = lcons(parsetree, NIL);
812

813 814
	foreach(n, result)
	{
815 816
		Query	   *pt = lfirst(n);
		List	   *newstuff = NIL;
817 818 819 820 821 822 823 824 825 826

		newstuff = deepRewriteQuery(pt);
		if (newstuff != NIL)
			rewritten = nconc(rewritten, newstuff);
	}
	if (qual_products != NIL)
		rewritten = nconc(rewritten, qual_products);

	return rewritten;
}
M
Marc G. Fournier 已提交
827 828 829 830 831 832 833


static void
CheckViewPerms(Relation view, List *rtable)
{
	HeapTuple	utup;
	NameData	uname;
834 835
	List	   *rt;
	RangeTblEntry *rte;
M
Marc G. Fournier 已提交
836 837 838 839 840 841 842 843 844
	int32		aclcheck_res;

	/*
	 * get the usename of the view's owner
	 */
	utup = SearchSysCacheTuple(USESYSID, view->rd_rel->relowner, 0, 0, 0);
	if (!HeapTupleIsValid(utup))
	{
		elog(ERROR, "cache lookup for userid %d failed",
845
			 view->rd_rel->relowner);
M
Marc G. Fournier 已提交
846 847
	}
	StrNCpy(uname.data,
848
			((Form_pg_shadow) GETSTRUCT(utup))->usename.data,
M
Marc G. Fournier 已提交
849 850 851
			NAMEDATALEN);

	/*
852 853
	 * check that we have read access to all the classes in the range
	 * table of the view
M
Marc G. Fournier 已提交
854 855 856
	 */
	foreach(rt, rtable)
	{
857
		rte = (RangeTblEntry *) lfirst(rt);
M
Marc G. Fournier 已提交
858 859 860 861 862 863

		aclcheck_res = pg_aclcheck(rte->relname, uname.data, ACL_RD);
		if (aclcheck_res != ACLCHECK_OK)
			elog(ERROR, "%s: %s", rte->relname, aclcheck_error_strings[aclcheck_res]);
	}
}