rewriteHandler.c 19.4 KB
Newer Older
1 2 3 4 5 6 7 8
/*-------------------------------------------------------------------------
 *
 * rewriteHandler.c--
 *
 * Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
9
 *	  $Header: /cvsroot/pgsql/src/backend/rewrite/rewriteHandler.c,v 1.16 1998/06/15 19:29:07 momjian Exp $
10 11 12
 *
 *-------------------------------------------------------------------------
 */
M
Marc G. Fournier 已提交
13
#include <string.h>
14 15 16 17 18
#include "postgres.h"
#include "miscadmin.h"
#include "utils/palloc.h"
#include "utils/elog.h"
#include "utils/rel.h"
19
#include "nodes/pg_list.h"
20 21
#include "nodes/primnodes.h"

22
#include "parser/parsetree.h"	/* for parsetree manipulation */
23 24
#include "nodes/parsenodes.h"

25
#include "rewrite/rewriteSupport.h"
26 27 28 29 30 31 32
#include "rewrite/rewriteHandler.h"
#include "rewrite/rewriteManip.h"
#include "rewrite/locks.h"

#include "commands/creatinh.h"
#include "access/heapam.h"

M
Marc G. Fournier 已提交
33 34
#include "utils/syscache.h"
#include "utils/acl.h"
35
#include "catalog/pg_shadow.h"
M
Marc G. Fournier 已提交
36

37 38
static void
ApplyRetrieveRule(Query *parsetree, RewriteRule *rule,
M
Marc G. Fournier 已提交
39 40
				  int rt_index, int relation_level,
				  Relation relation, int *modified);
41 42
static List *
fireRules(Query *parsetree, int rt_index, CmdType event,
43
		  bool *instead_flag, List *locks, List **qual_products);
44
static void QueryRewriteSubLink(Node *node);
45
static List *QueryRewriteOne(Query *parsetree);
46
static List *deepRewriteQuery(Query *parsetree);
M
Marc G. Fournier 已提交
47
static void CheckViewPerms(Relation view, List *rtable);
48 49 50

/*
 * gatherRewriteMeta -
51 52 53
 *	  Gather meta information about parsetree, and rule. Fix rule body
 *	  and qualifier so that they can be mixed with the parsetree and
 *	  maintain semantic validity
54 55
 */
static RewriteInfo *
56 57 58
gatherRewriteMeta(Query *parsetree,
				  Query *rule_action,
				  Node *rule_qual,
59 60
				  int rt_index,
				  CmdType event,
61
				  bool *instead_flag)
62
{
63 64 65
	RewriteInfo *info;
	int			rt_length;
	int			result_reln;
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88

	info = (RewriteInfo *) palloc(sizeof(RewriteInfo));
	info->rt_index = rt_index;
	info->event = event;
	info->instead_flag = *instead_flag;
	info->rule_action = (Query *) copyObject(rule_action);
	info->rule_qual = (Node *) copyObject(rule_qual);
	if (info->rule_action == NULL)
		info->nothing = TRUE;
	else
	{
		info->nothing = FALSE;
		info->action = info->rule_action->commandType;
		info->current_varno = rt_index;
		info->rt = parsetree->rtable;
		rt_length = length(info->rt);
		info->rt = append(info->rt, info->rule_action->rtable);

		info->new_varno = PRS2_NEW_VARNO + rt_length;
		OffsetVarNodes(info->rule_action->qual, rt_length);
		OffsetVarNodes((Node *) info->rule_action->targetList, rt_length);
		OffsetVarNodes(info->rule_qual, rt_length);
		ChangeVarNodes((Node *) info->rule_action->qual,
89
					   PRS2_CURRENT_VARNO + rt_length, rt_index, 0);
90
		ChangeVarNodes((Node *) info->rule_action->targetList,
91
					   PRS2_CURRENT_VARNO + rt_length, rt_index, 0);
92
		ChangeVarNodes(info->rule_qual,
93
					   PRS2_CURRENT_VARNO + rt_length, rt_index, 0);
94 95 96 97 98 99 100 101

		/*
		 * bug here about replace CURRENT  -- sort of replace current is
		 * deprecated now so this code shouldn't really need to be so
		 * clutzy but.....
		 */
		if (info->action != CMD_SELECT)
		{						/* i.e update XXXXX */
102
			int			new_result_reln = 0;
103 104 105 106

			result_reln = info->rule_action->resultRelation;
			switch (result_reln)
			{
107 108 109 110 111 112 113
				case PRS2_CURRENT_VARNO:
					new_result_reln = rt_index;
					break;
				case PRS2_NEW_VARNO:	/* XXX */
				default:
					new_result_reln = result_reln + rt_length;
					break;
114 115 116 117 118
			}
			info->rule_action->resultRelation = new_result_reln;
		}
	}
	return info;
119 120
}

121
static List *
122
OptimizeRIRRules(List *locks)
123
{
124 125 126
	List	   *attr_level = NIL,
			   *i;
	List	   *relation_level = NIL;
127 128 129

	foreach(i, locks)
	{
130
		RewriteRule *rule_lock = lfirst(i);
131 132 133 134 135 136 137

		if (rule_lock->attrno == -1)
			relation_level = lappend(relation_level, rule_lock);
		else
			attr_level = lappend(attr_level, rule_lock);
	}
	return nconc(relation_level, attr_level);
138 139 140 141 142 143
}

/*
 * idea is to put instead rules before regular rules so that
 * excess semantically queasy queries aren't processed
 */
144
static List *
145
orderRules(List *locks)
146
{
147 148 149
	List	   *regular = NIL,
			   *i;
	List	   *instead_rules = NIL;
150 151 152

	foreach(i, locks)
	{
153
		RewriteRule *rule_lock = (RewriteRule *) lfirst(i);
154 155 156 157 158 159 160

		if (rule_lock->isInstead)
			instead_rules = lappend(instead_rules, rule_lock);
		else
			regular = lappend(regular, rule_lock);
	}
	return nconc(regular, instead_rules);
161 162 163
}

static int
164
AllRetrieve(List *actions)
165
{
166
	List	   *n;
167 168 169

	foreach(n, actions)
	{
170
		Query	   *pt = lfirst(n);
171 172 173 174 175 176 177 178 179 180

		/*
		 * in the old postgres code, we check whether command_type is a
		 * consp of '('*'.commandType). but we've never supported
		 * transitive closures. Hence removed	 - ay 10/94.
		 */
		if (pt->commandType != CMD_SELECT)
			return false;
	}
	return true;
181 182
}

183
static List *
184
FireRetrieveRulesAtQuery(Query *parsetree,
185 186
						 int rt_index,
						 Relation relation,
187
						 bool *instead_flag,
188
						 int rule_flag)
189
{
190 191 192 193
	List	   *i,
			   *locks;
	RuleLock   *rt_entry_locks = NULL;
	List	   *work = NIL;
194

195
	if ((rt_entry_locks = relation->rd_rules) == NULL)
196
		return NIL;
197

198
	locks = matchLocks(CMD_SELECT, rt_entry_locks, rt_index, parsetree);	
199 200 201 202

	/* find all retrieve instead */
	foreach(i, locks)
	{
203
		RewriteRule *rule_lock = (RewriteRule *) lfirst(i);
204 205 206 207 208 209 210 211 212 213

		if (!rule_lock->isInstead)
			continue;
		work = lappend(work, rule_lock);
	}
	if (work != NIL)
	{
		work = OptimizeRIRRules(locks);
		foreach(i, work)
		{
214 215 216
			RewriteRule *rule_lock = lfirst(i);
			int			relation_level;
			int			modified = FALSE;
217 218 219 220 221 222 223 224 225 226 227 228 229 230

			relation_level = (rule_lock->attrno == -1);
			if (rule_lock->actions == NIL)
			{
				*instead_flag = TRUE;
				return NIL;
			}
			if (!rule_flag &&
				length(rule_lock->actions) >= 2 &&
				AllRetrieve(rule_lock->actions))
			{
				*instead_flag = TRUE;
				return rule_lock->actions;
			}
M
Marc G. Fournier 已提交
231
			ApplyRetrieveRule(parsetree, rule_lock, rt_index, relation_level, relation,
232 233 234 235 236
							  &modified);
			if (modified)
			{
				*instead_flag = TRUE;
				FixResdomTypes(parsetree->targetList);
237

238 239 240
				return lcons(parsetree, NIL);
			}
		}
241
	}
242 243
	return NIL;
}
244 245 246 247 248


/* Idea is like this:
 *
 * retrieve-instead-retrieve rules have different semantics than update nodes
249
 * Separate RIR rules from others.	Pass others to FireRules.
250 251 252 253 254
 * Order RIR rules and process.
 *
 * side effect: parsetree's rtable field might be changed
 */
static void
255 256
ApplyRetrieveRule(Query *parsetree,
				  RewriteRule *rule,
257 258
				  int rt_index,
				  int relation_level,
M
Marc G. Fournier 已提交
259
				  Relation relation,
260
				  int *modified)
261
{
262 263 264 265 266 267 268
	Query	   *rule_action = NULL;
	Node	   *rule_qual;
	List	   *rtable,
			   *rt;
	int			nothing,
				rt_length;
	int			badsql = FALSE;
M
Marc G. Fournier 已提交
269
	int			viewAclOverride = FALSE;
270 271 272 273 274

	rule_qual = rule->qual;
	if (rule->actions)
	{
		if (length(rule->actions) > 1)	/* ??? because we don't handle
275 276 277 278 279 280 281 282 283
										 * rules with more than one
										 * action? -ay */

			/*
			 * WARNING!!! If we sometimes handle rules with more than one
			 * action, the view acl checks might get broken.
			 * viewAclOverride should only become true (below) if this is
			 * a relation_level, instead, select query - Jan
			 */
284 285 286
			return;
		rule_action = copyObject(lfirst(rule->actions));
		nothing = FALSE;
M
Marc G. Fournier 已提交
287 288

		/*
289 290 291 292
		 * If this rule is on the relation level, the rule action is a
		 * select and the rule is instead then it must be a view.
		 * Permissions for views now follow the owner of the view, not the
		 * current user.
M
Marc G. Fournier 已提交
293 294
		 */
		if (relation_level && rule_action->commandType == CMD_SELECT
295
			&& rule->isInstead)
M
Marc G. Fournier 已提交
296
		{
297 298
			CheckViewPerms(relation, rule_action->rtable);
			viewAclOverride = TRUE;
M
Marc G. Fournier 已提交
299
		}
300 301 302 303 304 305 306
	}
	else
		nothing = TRUE;

	rtable = copyObject(parsetree->rtable);
	foreach(rt, rtable)
	{
307
		RangeTblEntry *rte = lfirst(rt);
308 309 310 311 312 313 314 315

		/*
		 * this is to prevent add_missing_vars_to_base_rels() from adding
		 * a bogus entry to the new target list.
		 */
		rte->inFromCl = false;
	}
	rt_length = length(rtable);
M
Marc G. Fournier 已提交
316 317 318

	if (viewAclOverride)
	{
319 320 321
		List	   *rule_rtable,
				   *rule_rt;
		RangeTblEntry *rte;
M
Marc G. Fournier 已提交
322 323 324 325 326 327 328

		rule_rtable = copyObject(rule_action->rtable);
		foreach(rule_rt, rule_rtable)
		{
			rte = lfirst(rule_rt);

			/*
329 330
			 * tell the executor that the ACL check on this range table
			 * entry is already done
M
Marc G. Fournier 已提交
331 332 333 334 335 336 337 338
			 */
			rte->skipAcl = true;
		}

		rtable = nconc(rtable, rule_rtable);
	}
	else
		rtable = nconc(rtable, copyObject(rule_action->rtable));
339 340 341 342 343 344 345
	parsetree->rtable = rtable;

	rule_action->rtable = rtable;
	OffsetVarNodes(rule_action->qual, rt_length);
	OffsetVarNodes((Node *) rule_action->targetList, rt_length);
	OffsetVarNodes(rule_qual, rt_length);
	ChangeVarNodes(rule_action->qual,
346
				   PRS2_CURRENT_VARNO + rt_length, rt_index, 0);
347
	ChangeVarNodes((Node *) rule_action->targetList,
348 349
				   PRS2_CURRENT_VARNO + rt_length, rt_index, 0);
	ChangeVarNodes(rule_qual, PRS2_CURRENT_VARNO + rt_length, rt_index, 0);
350 351 352 353 354 355 356 357 358 359 360 361
	if (relation_level)
	{
		HandleViewRule(parsetree, rtable, rule_action->targetList, rt_index,
					   modified);
	}
	else
	{
		HandleRIRAttributeRule(parsetree, rtable, rule_action->targetList,
							   rt_index, rule->attrno, modified, &badsql);
	}
	if (*modified && !badsql)
		AddQual(parsetree, rule_action->qual);
362 363
}

364
static List *
365 366 367
ProcessRetrieveQuery(Query *parsetree,
					 List *rtable,
					 bool *instead_flag,
368
					 bool rule)
369
{
370 371 372
	List	   *rt;
	List	   *product_queries = NIL;
	int			rt_index = 0;
373

374

375 376
	foreach(rt, rtable)
	{
377 378 379
		RangeTblEntry *rt_entry = lfirst(rt);
		Relation	rt_entry_relation = NULL;
		List	   *result = NIL;
380 381 382 383

		rt_index++;
		rt_entry_relation = heap_openr(rt_entry->relname);

384 385


386 387 388 389 390 391 392 393 394 395 396 397
		if (rt_entry_relation->rd_rules != NULL)
		{
			result =
				FireRetrieveRulesAtQuery(parsetree,
										 rt_index,
										 rt_entry_relation,
										 instead_flag,
										 rule);
		}
		heap_close(rt_entry_relation);
		if (*instead_flag)
			return result;
398
	}
399 400
	if (rule)
		return NIL;
401

402 403
	foreach(rt, rtable)
	{
404 405 406 407 408 409
		RangeTblEntry *rt_entry = lfirst(rt);
		Relation	rt_entry_relation = NULL;
		RuleLock   *rt_entry_locks = NULL;
		List	   *result = NIL;
		List	   *locks = NIL;
		List	   *dummy_products;
410 411 412 413 414 415

		rt_index++;
		rt_entry_relation = heap_openr(rt_entry->relname);
		rt_entry_locks = rt_entry_relation->rd_rules;
		heap_close(rt_entry_relation);

416

417 418 419 420 421 422 423 424 425 426 427 428 429 430
		if (rt_entry_locks)
		{
			locks =
				matchLocks(CMD_SELECT, rt_entry_locks, rt_index, parsetree);
		}
		if (locks != NIL)
		{
			result = fireRules(parsetree, rt_index, CMD_SELECT,
							   instead_flag, locks, &dummy_products);
			if (*instead_flag)
				return lappend(NIL, result);
			if (result != NIL)
				product_queries = nconc(product_queries, result);
		}
431
	}
432
	return product_queries;
433 434
}

435
static Query *
436 437 438
CopyAndAddQual(Query *parsetree,
			   List *actions,
			   Node *rule_qual,
439 440
			   int rt_index,
			   CmdType event)
441
{
442 443 444
	Query	   *new_tree = (Query *) copyObject(parsetree);
	Node	   *new_qual = NULL;
	Query	   *rule_action = NULL;
445 446 447 448 449 450 451

	if (actions)
		rule_action = lfirst(actions);
	if (rule_qual != NULL)
		new_qual = (Node *) copyObject(rule_qual);
	if (rule_action != NULL)
	{
452 453
		List	   *rtable;
		int			rt_length;
454 455 456 457 458 459

		rtable = new_tree->rtable;
		rt_length = length(rtable);
		rtable = append(rtable, listCopy(rule_action->rtable));
		new_tree->rtable = rtable;
		OffsetVarNodes(new_qual, rt_length);
460
		ChangeVarNodes(new_qual, PRS2_CURRENT_VARNO + rt_length, rt_index, 0);
461 462 463 464 465
	}
	/* XXX -- where current doesn't work for instead nothing.... yet */
	AddNotQual(new_tree, new_qual);

	return new_tree;
466 467 468 469
}


/*
470 471 472 473 474
 *	fireRules -
 *	   Iterate through rule locks applying rules.  After an instead rule
 *	   rule has been applied, return just new parsetree and let RewriteQuery
 *	   start the process all over again.  The locks are reordered to maintain
 *	   sensible semantics.	remember: reality is for dead birds -- glass
475 476
 *
 */
477
static List *
478
fireRules(Query *parsetree,
479 480
		  int rt_index,
		  CmdType event,
481 482 483
		  bool *instead_flag,
		  List *locks,
		  List **qual_products)
484
{
485 486 487
	RewriteInfo *info;
	List	   *results = NIL;
	List	   *i;
488 489 490 491 492 493 494

	/* choose rule to fire from list of rules */
	if (locks == NIL)
	{
		ProcessRetrieveQuery(parsetree,
							 parsetree->rtable,
							 instead_flag, TRUE);
495
		if (*instead_flag)
496
			return lappend(NIL, parsetree);
497
		else
498
			return NIL;
499
	}
500 501 502 503

	locks = orderRules(locks);	/* instead rules first */
	foreach(i, locks)
	{
504 505 506 507 508 509
		RewriteRule *rule_lock = (RewriteRule *) lfirst(i);
		Node	   *qual,
				   *event_qual;
		List	   *actions;
		List	   *r;
		bool		orig_instead_flag = *instead_flag;
510 511 512 513 514 515 516 517 518 519 520 521

		/* multiple rule action time */
		*instead_flag = rule_lock->isInstead;
		event_qual = rule_lock->qual;
		actions = rule_lock->actions;
		if (event_qual != NULL && *instead_flag)
			*qual_products =
				lappend(*qual_products,
						CopyAndAddQual(parsetree, actions, event_qual,
									   rt_index, event));
		foreach(r, actions)
		{
522 523
			Query	   *rule_action = lfirst(r);
			Node	   *rule_qual = copyObject(event_qual);
524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589

			/*--------------------------------------------------
			 * Step 1:
			 *	  Rewrite current.attribute or current to tuple variable
			 *	  this appears to be done in parser?
			 *--------------------------------------------------
			 */
			info = gatherRewriteMeta(parsetree, rule_action, rule_qual,
									 rt_index, event, instead_flag);

			/* handle escapable cases, or those handled by other code */
			if (info->nothing)
			{
				if (*instead_flag)
					return NIL;
				else
					continue;
			}

			if (info->action == info->event &&
				info->event == CMD_SELECT)
				continue;

			/*
			 * Event Qualification forces copying of parsetree --- XXX and
			 * splitting into two queries one w/rule_qual, one w/NOT
			 * rule_qual. Also add user query qual onto rule action
			 */
			qual = parsetree->qual;
			AddQual(info->rule_action, qual);

			if (info->rule_qual != NULL)
				AddQual(info->rule_action, info->rule_qual);

			/*--------------------------------------------------
			 * Step 2:
			 *	  Rewrite new.attribute w/ right hand side of target-list
			 *	  entry for appropriate field name in insert/update
			 *--------------------------------------------------
			 */
			if ((info->event == CMD_INSERT) || (info->event == CMD_UPDATE))
				FixNew(info, parsetree);

			/*--------------------------------------------------
			 * Step 3:
			 *	  rewriting due to retrieve rules
			 *--------------------------------------------------
			 */
			info->rule_action->rtable = info->rt;
			ProcessRetrieveQuery(info->rule_action, info->rt,
								 &orig_instead_flag, TRUE);

			/*--------------------------------------------------
			 * Step 4
			 *	  Simplify? hey, no algorithm for simplification... let
			 *	  the planner do it.
			 *--------------------------------------------------
			 */
			results = lappend(results, info->rule_action);

			pfree(info);
		}
		if (*instead_flag)
			break;
	}
	return results;
590 591
}

592
static List *
593
RewriteQuery(Query *parsetree, bool *instead_flag, List **qual_products)
594
{
595 596 597
	CmdType		event;
	List	   *product_queries = NIL;
	int			result_relation = 0;
598

599 600 601 602 603 604
	Assert(parsetree != NULL);

	event = parsetree->commandType;

	if (event == CMD_UTILITY)
		return NIL;
605 606

	/*
607
	 * only for a delete may the targetlist be NULL
608
	 */
609 610 611 612 613 614 615 616 617 618 619
	if (event != CMD_DELETE)
		Assert(parsetree->targetList != NULL);

	result_relation = parsetree->resultRelation;

	if (event != CMD_SELECT)
	{

		/*
		 * the statement is an update, insert or delete
		 */
620 621 622
		RangeTblEntry *rt_entry;
		Relation	rt_entry_relation = NULL;
		RuleLock   *rt_entry_locks = NULL;
623 624 625 626 627 628 629 630

		rt_entry = rt_fetch(result_relation, parsetree->rtable);
		rt_entry_relation = heap_openr(rt_entry->relname);
		rt_entry_locks = rt_entry_relation->rd_rules;
		heap_close(rt_entry_relation);

		if (rt_entry_locks != NULL)
		{
631
			List	   *locks =
632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649
			matchLocks(event, rt_entry_locks, result_relation, parsetree);

			product_queries =
				fireRules(parsetree,
						  result_relation,
						  event,
						  instead_flag,
						  locks,
						  qual_products);
		}
		return product_queries;
	}
	else
	{

		/*
		 * the statement is a select
		 */
650
		Query	   *other;
651

B
Bruce Momjian 已提交
652
		/*
653 654
		 * ApplyRetrieveRule changes the range table XXX Unions are copied
		 * again.
B
Bruce Momjian 已提交
655 656 657
		 */
		other = copyObject(parsetree);

658 659 660
		return
			ProcessRetrieveQuery(other, parsetree->rtable,
								 instead_flag, FALSE);
661 662 663 664 665 666 667 668
	}
}

/*
 * to avoid infinite recursion, we restrict the number of times a query
 * can be rewritten. Detecting cycles is left for the reader as an excercise.
 */
#ifndef REWRITE_INVOKE_MAX
669
#define REWRITE_INVOKE_MAX		10
670 671
#endif

672
static int	numQueryRewriteInvoked = 0;
673 674 675

/*
 * QueryRewrite -
676 677 678
 *	  rewrite one query via QueryRewrite system, possibly returning 0, or many
 *	  queries
 */
679
List *
680
QueryRewrite(Query *parsetree)
681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708
{
	QueryRewriteSubLink(parsetree->qual);
	return QueryRewriteOne(parsetree);
}

/*
 *	QueryRewriteSubLink
 *
 *	This rewrites the SubLink subqueries first, doing the lowest ones first.
 *	We already have code in the main rewrite loops to process correlated
 *	variables from upper queries that exist in subqueries.
 */
static void
QueryRewriteSubLink(Node *node)
{
	if (node == NULL)
		return;

	switch (nodeTag(node))
	{
		case T_TargetEntry:
			break;
		case T_Aggreg:
			break;
		case T_Expr:
			{
				Expr	   *expr = (Expr *) node;

709
				QueryRewriteSubLink((Node *) expr->args);
710 711 712 713 714 715 716 717 718 719 720 721 722 723
			}
			break;
		case T_Var:
			break;
		case T_List:
			{
				List	   *l;

				foreach(l, (List *) node)
					QueryRewriteSubLink(lfirst(l));
			}
			break;
		case T_SubLink:
			{
724 725 726
				SubLink    *sublink = (SubLink *) node;
				Query	   *query = (Query *) sublink->subselect;
				List	   *ret;
727 728

				/*
729 730
				 * Nest down first.  We do this so if a rewrite adds a
				 * SubLink we don't process it as part of this loop.
731
				 */
732
				QueryRewriteSubLink((Node *) query->qual);
733 734 735 736 737 738 739

				ret = QueryRewriteOne(query);
				if (!ret)
					sublink->subselect = NULL;
				else if (lnext(ret) == NIL)
					sublink->subselect = lfirst(ret);
				else
740
					elog(ERROR, "Don't know how to process subquery that rewrites to multiple queries.");
741 742 743 744 745 746 747 748 749 750 751 752 753
			}
			break;
		default:
			/* ignore the others */
			break;
	}
	return;
}

/*
 * QueryOneRewrite -
 *	  rewrite one query
 */
754
static List *
755
QueryRewriteOne(Query *parsetree)
756
{
757
	numQueryRewriteInvoked = 0;
758

759 760 761 762
	/*
	 * take a deep breath and apply all the rewrite rules - ay
	 */
	return deepRewriteQuery(parsetree);
763 764 765 766
}

/*
 * deepRewriteQuery -
767
 *	  rewrites the query and apply the rules again on the queries rewritten
768
 */
769
static List *
770
deepRewriteQuery(Query *parsetree)
771
{
772 773 774 775 776
	List	   *n;
	List	   *rewritten = NIL;
	List	   *result = NIL;
	bool		instead;
	List	   *qual_products = NIL;
777

778 779


780 781
	if (++numQueryRewriteInvoked > REWRITE_INVOKE_MAX)
	{
782
		elog(ERROR, "query rewritten %d times, may contain cycles",
783 784 785 786 787 788 789
			 numQueryRewriteInvoked - 1);
	}

	instead = FALSE;
	result = RewriteQuery(parsetree, &instead, &qual_products);
	if (!instead)
		rewritten = lcons(parsetree, NIL);
790

791 792
	foreach(n, result)
	{
793 794
		Query	   *pt = lfirst(n);
		List	   *newstuff = NIL;
795 796 797 798 799 800 801 802 803 804

		newstuff = deepRewriteQuery(pt);
		if (newstuff != NIL)
			rewritten = nconc(rewritten, newstuff);
	}
	if (qual_products != NIL)
		rewritten = nconc(rewritten, qual_products);

	return rewritten;
}
M
Marc G. Fournier 已提交
805 806 807 808 809 810 811


static void
CheckViewPerms(Relation view, List *rtable)
{
	HeapTuple	utup;
	NameData	uname;
812 813
	List	   *rt;
	RangeTblEntry *rte;
M
Marc G. Fournier 已提交
814 815 816 817 818 819 820 821 822
	int32		aclcheck_res;

	/*
	 * get the usename of the view's owner
	 */
	utup = SearchSysCacheTuple(USESYSID, view->rd_rel->relowner, 0, 0, 0);
	if (!HeapTupleIsValid(utup))
	{
		elog(ERROR, "cache lookup for userid %d failed",
823
			 view->rd_rel->relowner);
M
Marc G. Fournier 已提交
824 825
	}
	StrNCpy(uname.data,
826
			((Form_pg_shadow) GETSTRUCT(utup))->usename.data,
M
Marc G. Fournier 已提交
827 828 829
			NAMEDATALEN);

	/*
830 831
	 * check that we have read access to all the classes in the range
	 * table of the view
M
Marc G. Fournier 已提交
832 833 834
	 */
	foreach(rt, rtable)
	{
835
		rte = (RangeTblEntry *) lfirst(rt);
M
Marc G. Fournier 已提交
836 837 838 839 840 841

		aclcheck_res = pg_aclcheck(rte->relname, uname.data, ACL_RD);
		if (aclcheck_res != ACLCHECK_OK)
			elog(ERROR, "%s: %s", rte->relname, aclcheck_error_strings[aclcheck_res]);
	}
}