rewriteDefine.c 17.1 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * rewriteDefine.c
4
 *	  routines for defining a rewrite rule
5
 *
B
Bruce Momjian 已提交
6
 * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
B
Add:  
Bruce Momjian 已提交
7
 * Portions Copyright (c) 1994, Regents of the University of California
8 9 10
 *
 *
 * IDENTIFICATION
11
 *	  $PostgreSQL: pgsql/src/backend/rewrite/rewriteDefine.c,v 1.92 2004/01/14 23:01:55 tgl Exp $
12 13 14 15 16
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

B
Bruce Momjian 已提交
17
#include "access/heapam.h"
18
#include "catalog/catname.h"
19
#include "catalog/dependency.h"
20 21
#include "catalog/indexing.h"
#include "catalog/pg_rewrite.h"
22 23 24
#include "commands/view.h"
#include "miscadmin.h"
#include "optimizer/clauses.h"
25
#include "parser/parse_relation.h"
26
#include "rewrite/rewriteDefine.h"
27
#include "rewrite/rewriteManip.h"
28
#include "rewrite/rewriteSupport.h"
29
#include "storage/smgr.h"
30
#include "utils/acl.h"
31
#include "utils/builtins.h"
32
#include "utils/lsyscache.h"
33
#include "utils/syscache.h"
34 35


36
static void setRuleCheckAsUser(Query *qry, AclId userid);
37
static bool setRuleCheckAsUser_walker(Node *node, AclId *context);
38

39

40 41
/*
 * InsertRule -
42
 *	  takes the arguments and inserts them as a row into the system
43
 *	  relation "pg_rewrite"
44
 */
45
static Oid
46
InsertRule(char *rulname,
47
		   int evtype,
48 49
		   Oid eventrel_oid,
		   AttrNumber evslot_index,
50
		   bool evinstead,
51
		   Node *event_qual,
52 53
		   List *action,
		   bool replace)
54
{
55 56
	char	   *evqual = nodeToString(event_qual);
	char	   *actiontree = nodeToString((Node *) action);
57 58 59
	int			i;
	Datum		values[Natts_pg_rewrite];
	char		nulls[Natts_pg_rewrite];
60
	char		replaces[Natts_pg_rewrite];
61 62
	NameData	rname;
	Relation	pg_rewrite_desc;
63 64
	HeapTuple	tup,
				oldtup;
65
	Oid			rewriteObjectId;
B
Bruce Momjian 已提交
66 67
	ObjectAddress myself,
				referenced;
68
	bool		is_update = false;
69

70 71
	/*
	 * Set up *nulls and *values arrays
72 73 74 75 76
	 */
	MemSet(nulls, ' ', sizeof(nulls));

	i = 0;
	namestrcpy(&rname, rulname);
B
Bruce Momjian 已提交
77 78 79 80 81 82 83
	values[i++] = NameGetDatum(&rname); /* rulename */
	values[i++] = ObjectIdGetDatum(eventrel_oid);		/* ev_class */
	values[i++] = Int16GetDatum(evslot_index);	/* ev_attr */
	values[i++] = CharGetDatum(evtype + '0');	/* ev_type */
	values[i++] = BoolGetDatum(evinstead);		/* is_instead */
	values[i++] = DirectFunctionCall1(textin, CStringGetDatum(evqual)); /* ev_qual */
	values[i++] = DirectFunctionCall1(textin, CStringGetDatum(actiontree));		/* ev_action */
84

85
	/*
86
	 * Ready to store new pg_rewrite tuple
87 88 89
	 */
	pg_rewrite_desc = heap_openr(RewriteRelationName, RowExclusiveLock);

90 91 92 93 94 95 96 97 98 99 100
	/*
	 * Check to see if we are replacing an existing tuple
	 */
	oldtup = SearchSysCache(RULERELNAME,
							ObjectIdGetDatum(eventrel_oid),
							PointerGetDatum(rulname),
							0, 0);

	if (HeapTupleIsValid(oldtup))
	{
		if (!replace)
101 102
			ereport(ERROR,
					(errcode(ERRCODE_DUPLICATE_OBJECT),
B
Bruce Momjian 已提交
103 104
				 errmsg("rule \"%s\" for relation \"%s\" already exists",
						rulname, get_rel_name(eventrel_oid))));
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119

		/*
		 * When replacing, we don't need to replace every attribute
		 */
		MemSet(replaces, ' ', sizeof(replaces));
		replaces[Anum_pg_rewrite_ev_attr - 1] = 'r';
		replaces[Anum_pg_rewrite_ev_type - 1] = 'r';
		replaces[Anum_pg_rewrite_is_instead - 1] = 'r';
		replaces[Anum_pg_rewrite_ev_qual - 1] = 'r';
		replaces[Anum_pg_rewrite_ev_action - 1] = 'r';

		tup = heap_modifytuple(oldtup, pg_rewrite_desc,
							   values, nulls, replaces);

		simple_heap_update(pg_rewrite_desc, &tup->t_self, tup);
120

121 122 123 124 125 126 127 128
		ReleaseSysCache(oldtup);

		rewriteObjectId = HeapTupleGetOid(tup);
		is_update = true;
	}
	else
	{
		tup = heap_formtuple(pg_rewrite_desc->rd_att, values, nulls);
129

130 131
		rewriteObjectId = simple_heap_insert(pg_rewrite_desc, tup);
	}
132

133
	/* Need to update indexes in either case */
134
	CatalogUpdateIndexes(pg_rewrite_desc, tup);
135

136
	heap_freetuple(tup);
137

138 139 140 141 142
	/* If replacing, get rid of old dependencies and make new ones */
	if (is_update)
		deleteDependencyRecordsFor(RelationGetRelid(pg_rewrite_desc),
								   rewriteObjectId);

143
	/*
B
Bruce Momjian 已提交
144 145
	 * Install dependency on rule's relation to ensure it will go away on
	 * relation deletion.  If the rule is ON SELECT, make the dependency
146 147 148 149 150 151 152 153 154 155 156 157
	 * implicit --- this prevents deleting a view's SELECT rule.  Other
	 * kinds of rules can be AUTO.
	 */
	myself.classId = RelationGetRelid(pg_rewrite_desc);
	myself.objectId = rewriteObjectId;
	myself.objectSubId = 0;

	referenced.classId = RelOid_pg_class;
	referenced.objectId = eventrel_oid;
	referenced.objectSubId = 0;

	recordDependencyOn(&myself, &referenced,
B
Bruce Momjian 已提交
158
		 (evtype == CMD_SELECT) ? DEPENDENCY_INTERNAL : DEPENDENCY_AUTO);
159

160 161 162 163 164
	/*
	 * Also install dependencies on objects referenced in action and qual.
	 */
	recordDependencyOnExpr(&myself, (Node *) action, NIL,
						   DEPENDENCY_NORMAL);
165

166 167 168
	if (event_qual != NULL)
	{
		/* Find query containing OLD/NEW rtable entries */
B
Bruce Momjian 已提交
169
		Query	   *qry = (Query *) lfirst(action);
170 171 172 173 174 175

		qry = getInsertSelectQuery(qry, NULL);
		recordDependencyOnExpr(&myself, event_qual, qry->rtable,
							   DEPENDENCY_NORMAL);
	}

176
	heap_close(pg_rewrite_desc, RowExclusiveLock);
177

178
	return rewriteObjectId;
179 180 181
}

void
182
DefineQueryRewrite(RuleStmt *stmt)
183
{
184
	RangeVar   *event_obj = stmt->relation;
185
	Node	   *event_qual = stmt->whereClause;
186
	CmdType		event_type = stmt->event;
187
	bool		is_instead = stmt->instead;
188
	bool		replace = stmt->replace;
189
	List	   *action = stmt->actions;
190 191
	Relation	event_relation;
	Oid			ev_relid;
192
	Oid			ruleId;
193 194
	int			event_attno;
	Oid			event_attype;
195 196
	List	   *l;
	Query	   *query;
197
	AclResult	aclresult;
198
	bool		RelisBecomingView = false;
199

200 201 202
	/*
	 * If we are installing an ON SELECT rule, we had better grab
	 * AccessExclusiveLock to ensure no SELECTs are currently running on
B
Bruce Momjian 已提交
203 204 205 206
	 * the event relation.	For other types of rules, it might be
	 * sufficient to grab ShareLock to lock out insert/update/delete
	 * actions.  But for now, let's just grab AccessExclusiveLock all the
	 * time.
207
	 */
208
	event_relation = heap_openrv(event_obj, AccessExclusiveLock);
209 210
	ev_relid = RelationGetRelid(event_relation);

211 212 213
	/*
	 * Check user has permission to apply rules to this relation.
	 */
214 215
	aclresult = pg_class_aclcheck(ev_relid, GetUserId(), ACL_RULE);
	if (aclresult != ACLCHECK_OK)
216 217
		aclcheck_error(aclresult, ACL_KIND_CLASS,
					   RelationGetRelationName(event_relation));
218

219 220 221
	/*
	 * No rule actions that modify OLD or NEW
	 */
222
	foreach(l, action)
223 224
	{
		query = (Query *) lfirst(l);
225 226 227 228 229
		if (query->resultRelation == 0)
			continue;
		/* Don't be fooled by INSERT/SELECT */
		if (query != getInsertSelectQuery(query, NULL))
			continue;
230
		if (query->resultRelation == PRS2_OLD_VARNO)
231 232 233 234
			ereport(ERROR,
					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
					 errmsg("rule actions on OLD are not implemented"),
					 errhint("Use views or triggers instead.")));
235
		if (query->resultRelation == PRS2_NEW_VARNO)
236 237 238 239
			ereport(ERROR,
					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
					 errmsg("rule actions on NEW are not implemented"),
					 errhint("Use triggers instead.")));
240
	}
241

242 243 244
	/*
	 * Rules ON SELECT are restricted to view definitions
	 */
245 246
	if (event_type == CMD_SELECT)
	{
247
		List	   *tllist;
248 249 250 251 252
		int			i;

		/*
		 * So there cannot be INSTEAD NOTHING, ...
		 */
253
		if (length(action) == 0)
254 255
			ereport(ERROR,
					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
256
			errmsg("INSTEAD NOTHING rules on SELECT are not implemented"),
257
					 errhint("Use views instead.")));
258 259 260 261

		/*
		 * ... there cannot be multiple actions, ...
		 */
262
		if (length(action) > 1)
263 264
			ereport(ERROR,
					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
265
					 errmsg("multiple actions for rules on SELECT are not implemented")));
B
Bruce Momjian 已提交
266

267 268 269
		/*
		 * ... the one action must be a SELECT, ...
		 */
270 271
		query = (Query *) lfirst(action);
		if (!is_instead || query->commandType != CMD_SELECT)
272 273
			ereport(ERROR,
					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
274
					 errmsg("rules on SELECT must have action INSTEAD SELECT")));
275 276 277 278

		/*
		 * ... there can be no rule qual, ...
		 */
279
		if (event_qual != NULL)
280 281
			ereport(ERROR,
					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
282
					 errmsg("event qualifications are not implemented for rules on SELECT")));
283 284

		/*
B
Bruce Momjian 已提交
285 286
		 * ... the targetlist of the SELECT action must exactly match the
		 * event relation, ...
287
		 */
288
		i = 0;
B
Bruce Momjian 已提交
289
		foreach(tllist, query->targetList)
B
Bruce Momjian 已提交
290
		{
291 292 293 294 295 296 297 298 299
			TargetEntry *tle = (TargetEntry *) lfirst(tllist);
			Resdom	   *resdom = tle->resdom;
			Form_pg_attribute attr;
			char	   *attname;

			if (resdom->resjunk)
				continue;
			i++;
			if (i > event_relation->rd_att->natts)
300 301
				ereport(ERROR,
						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
302
						 errmsg("SELECT rule's target list has too many entries")));
303

304
			attr = event_relation->rd_att->attrs[i - 1];
305
			attname = NameStr(attr->attname);
306

307
			/*
B
Bruce Momjian 已提交
308 309 310 311 312 313
			 * Disallow dropped columns in the relation.  This won't
			 * happen in the cases we actually care about (namely creating
			 * a view via CREATE TABLE then CREATE RULE).  Trying to cope
			 * with it is much more trouble than it's worth, because we'd
			 * have to modify the rule to insert dummy NULLs at the right
			 * positions.
314 315
			 */
			if (attr->attisdropped)
316 317 318
				ereport(ERROR,
						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
						 errmsg("cannot convert relation containing dropped columns to view")));
319

320
			if (strcmp(resdom->resname, attname) != 0)
321 322
				ereport(ERROR,
						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
323
						 errmsg("SELECT rule's target entry %d has different column name from \"%s\"", i, attname)));
324 325

			if (attr->atttypid != resdom->restype)
326 327
				ereport(ERROR,
						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
328
						 errmsg("SELECT rule's target entry %d has different type from column \"%s\"", i, attname)));
329

330 331
			/*
			 * Allow typmods to be different only if one of them is -1,
B
Bruce Momjian 已提交
332 333 334 335
			 * ie, "unspecified".  This is necessary for cases like
			 * "numeric", where the table will have a filled-in default
			 * length but the select rule's expression will probably have
			 * typmod = -1.
336 337 338
			 */
			if (attr->atttypmod != resdom->restypmod &&
				attr->atttypmod != -1 && resdom->restypmod != -1)
339 340
				ereport(ERROR,
						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
341
						 errmsg("SELECT rule's target entry %d has different size from column \"%s\"", i, attname)));
342 343
		}

344
		if (i != event_relation->rd_att->natts)
345 346
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
347
			   errmsg("SELECT rule's target list has too few entries")));
348

349
		/*
B
Bruce Momjian 已提交
350
		 * ... there must not be another ON SELECT rule already ...
351
		 */
352
		if (!replace && event_relation->rd_rules != NULL)
B
Bruce Momjian 已提交
353 354 355 356
		{
			for (i = 0; i < event_relation->rd_rules->numLocks; i++)
			{
				RewriteRule *rule;
357 358 359

				rule = event_relation->rd_rules->rules[i];
				if (rule->event == CMD_SELECT)
360
					ereport(ERROR,
B
Bruce Momjian 已提交
361 362 363
					  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
					   errmsg("\"%s\" is already a view",
							  RelationGetRelationName(event_relation))));
364 365 366
			}
		}

367
		/*
368
		 * ... and finally the rule must be named _RETURN.
369
		 */
370
		if (strcmp(stmt->rulename, ViewSelectRuleName) != 0)
B
Bruce Momjian 已提交
371
		{
372 373 374
			/*
			 * In versions before 7.3, the expected name was _RETviewname.
			 * For backwards compatibility with old pg_dump output, accept
B
Bruce Momjian 已提交
375
			 * that and silently change it to _RETURN.	Since this is just
376 377
			 * a quick backwards-compatibility hack, limit the number of
			 * characters checked to a few less than NAMEDATALEN; this
B
Bruce Momjian 已提交
378 379
			 * saves having to worry about where a multibyte character
			 * might have gotten truncated.
380 381 382 383
			 */
			if (strncmp(stmt->rulename, "_RET", 4) != 0 ||
				strncmp(stmt->rulename + 4, event_obj->relname,
						NAMEDATALEN - 4 - 4) != 0)
384 385
				ereport(ERROR,
						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
B
Bruce Momjian 已提交
386 387
					  errmsg("view rule for \"%s\" must be named \"%s\"",
							 event_obj->relname, ViewSelectRuleName)));
388
			stmt->rulename = pstrdup(ViewSelectRuleName);
389
		}
390 391 392 393

		/*
		 * Are we converting a relation to a view?
		 *
B
Bruce Momjian 已提交
394
		 * If so, check that the relation is empty because the storage for
395 396
		 * the relation is going to be deleted.  Also insist that the rel
		 * not have any triggers, indexes, or child tables.
397 398 399
		 */
		if (event_relation->rd_rel->relkind != RELKIND_VIEW)
		{
B
Bruce Momjian 已提交
400
			HeapScanDesc scanDesc;
401

402 403
			scanDesc = heap_beginscan(event_relation, SnapshotNow, 0, NULL);
			if (heap_getnext(scanDesc, ForwardScanDirection) != NULL)
404
				ereport(ERROR,
B
Bruce Momjian 已提交
405
					  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
406
				errmsg("could not convert table \"%s\" to a view because it is not empty",
B
Bruce Momjian 已提交
407
					   event_obj->relname)));
408 409
			heap_endscan(scanDesc);

410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428
			if (event_relation->rd_rel->reltriggers != 0)
				ereport(ERROR,
						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
						 errmsg("could not convert table \"%s\" to a view because it has triggers",
								event_obj->relname),
						 errhint("In particular, the table may not be involved in any foreign key relationships.")));

			if (event_relation->rd_rel->relhasindex)
				ereport(ERROR,
						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
						 errmsg("could not convert table \"%s\" to a view because it has indexes",
								event_obj->relname)));

			if (event_relation->rd_rel->relhassubclass)
				ereport(ERROR,
						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
						 errmsg("could not convert table \"%s\" to a view because it has child tables",
								event_obj->relname)));

429 430
			RelisBecomingView = true;
		}
431
	}
432

433
	/*
434
	 * This rule is allowed - prepare to install it.
435
	 */
436 437
	event_attno = -1;
	event_attype = InvalidOid;
438

439
	/*
B
Bruce Momjian 已提交
440 441 442
	 * We want the rule's table references to be checked as though by the
	 * rule owner, not the user referencing the rule.  Therefore, scan
	 * through the rule's rtables and set the checkAsUser field on all
443
	 * rtable entries.
444 445 446 447 448 449 450
	 */
	foreach(l, action)
	{
		query = (Query *) lfirst(l);
		setRuleCheckAsUser(query, GetUserId());
	}

451
	/* discard rule if it's null action and not INSTEAD; it's a no-op */
452
	if (action != NIL || is_instead)
453 454 455
	{
		ruleId = InsertRule(stmt->rulename,
							event_type,
456 457
							ev_relid,
							event_attno,
458
							is_instead,
459
							event_qual,
460 461
							action,
							replace);
462

463
		/*
B
Bruce Momjian 已提交
464 465 466
		 * Set pg_class 'relhasrules' field TRUE for event relation. If
		 * appropriate, also modify the 'relkind' field to show that the
		 * relation is now a view.
467 468
		 *
		 * Important side effect: an SI notice is broadcast to force all
B
Bruce Momjian 已提交
469 470
		 * backends (including me!) to update relcache entries with the
		 * new rule.
471
		 */
472
		SetRelationRuleStatus(ev_relid, true, RelisBecomingView);
473
	}
474

475
	/*
B
Bruce Momjian 已提交
476 477 478
	 * IF the relation is becoming a view, delete the storage files
	 * associated with it.	NB: we had better have AccessExclusiveLock to
	 * do this ...
479 480
	 *
	 * XXX what about getting rid of its TOAST table?  For now, we don't.
481 482 483 484
	 */
	if (RelisBecomingView)
		smgrunlink(DEFAULT_SMGR, event_relation);

485 486
	/* Close rel, but keep lock till commit... */
	heap_close(event_relation, NoLock);
487
}
488 489 490 491

/*
 * setRuleCheckAsUser
 *		Recursively scan a query and set the checkAsUser field to the
492 493 494 495 496
 *		given userid in all rtable entries.
 *
 * Note: for a view (ON SELECT rule), the checkAsUser field of the *OLD*
 * RTE entry will be overridden when the view rule is expanded, and the
 * checkAsUser field of the *NEW* entry is irrelevant because that entry's
497 498
 * requiredPerms bits will always be zero.  However, for other types of rules
 * it's important to set these fields to match the rule owner.  So we just set
499
 * them always.
500 501
 */
static void
502
setRuleCheckAsUser(Query *qry, AclId userid)
503 504 505
{
	List	   *l;

506
	/* Set all the RTEs in this query node */
507 508 509 510
	foreach(l, qry->rtable)
	{
		RangeTblEntry *rte = (RangeTblEntry *) lfirst(l);

511
		if (rte->rtekind == RTE_SUBQUERY)
512
		{
513
			/* Recurse into subquery in FROM */
514 515 516 517 518 519 520
			setRuleCheckAsUser(rte->subquery, userid);
		}
		else
			rte->checkAsUser = userid;
	}

	/* If there are sublinks, search for them and process their RTEs */
521
	/* ignore subqueries in rtable because we already processed them */
522
	if (qry->hasSubLinks)
523
		query_tree_walker(qry, setRuleCheckAsUser_walker, (void *) &userid,
524
						  QTW_IGNORE_RT_SUBQUERIES);
525 526 527 528 529 530
}

/*
 * Expression-tree walker to find sublink queries
 */
static bool
531
setRuleCheckAsUser_walker(Node *node, AclId *context)
532 533 534 535 536 537 538 539 540 541 542 543 544
{
	if (node == NULL)
		return false;
	if (IsA(node, Query))
	{
		Query	   *qry = (Query *) node;

		setRuleCheckAsUser(qry, *context);
		return false;
	}
	return expression_tree_walker(node, setRuleCheckAsUser_walker,
								  (void *) context);
}
545 546 547 548 549


/*
 * Rename an existing rewrite rule.
 *
550
 * This is unused code at the moment.
551 552
 */
void
553 554
RenameRewriteRule(Oid owningRel, const char *oldName,
				  const char *newName)
555 556 557 558 559 560
{
	Relation	pg_rewrite_desc;
	HeapTuple	ruletup;

	pg_rewrite_desc = heap_openr(RewriteRelationName, RowExclusiveLock);

561 562 563 564
	ruletup = SearchSysCacheCopy(RULERELNAME,
								 ObjectIdGetDatum(owningRel),
								 PointerGetDatum(oldName),
								 0, 0);
565
	if (!HeapTupleIsValid(ruletup))
566 567 568 569
		ereport(ERROR,
				(errcode(ERRCODE_UNDEFINED_OBJECT),
				 errmsg("rule \"%s\" for relation \"%s\" does not exist",
						oldName, get_rel_name(owningRel))));
570 571

	/* should not already exist */
572
	if (IsDefinedRewriteRule(owningRel, newName))
573 574 575 576
		ereport(ERROR,
				(errcode(ERRCODE_DUPLICATE_OBJECT),
				 errmsg("rule \"%s\" for relation \"%s\" already exists",
						newName, get_rel_name(owningRel))));
577

578
	namestrcpy(&(((Form_pg_rewrite) GETSTRUCT(ruletup))->rulename), newName);
579 580 581

	simple_heap_update(pg_rewrite_desc, &ruletup->t_self, ruletup);

582 583
	/* keep system catalog indexes current */
	CatalogUpdateIndexes(pg_rewrite_desc, ruletup);
584 585 586 587

	heap_freetuple(ruletup);
	heap_close(pg_rewrite_desc, RowExclusiveLock);
}