locks.c 5.4 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * locks.c
4 5 6 7 8
 *
 * Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
9
 *	  $Header: /cvsroot/pgsql/src/backend/rewrite/Attic/locks.c,v 1.23 1999/10/01 04:08:24 tgl Exp $
10 11 12
 *
 *-------------------------------------------------------------------------
 */
B
Bruce Momjian 已提交
13
#include "postgres.h"
14

B
Bruce Momjian 已提交
15
#include "access/heapam.h"
B
Bruce Momjian 已提交
16
#include "catalog/pg_shadow.h"
17
#include "optimizer/clauses.h"
B
Bruce Momjian 已提交
18
#include "rewrite/locks.h"
19 20
#include "utils/acl.h"
#include "utils/builtins.h"
B
Bruce Momjian 已提交
21 22
#include "utils/syscache.h"
#include "utils/syscache.h"
23 24


25
/*
26
 * thisLockWasTriggered
27 28 29 30 31
 *
 * walk the tree, if there we find a varnode,
 * we check the varattno against the attnum
 * if we find at least one such match, we return true
 * otherwise, we return false
32 33
 *
 * XXX this should be unified with attribute_used()
34
 */
35 36 37 38 39 40 41

typedef struct {
	int			varno;
	int			attnum;
	int			sublevels_up;
} thisLockWasTriggered_context;

42
static bool
43 44
thisLockWasTriggered_walker (Node *node,
							 thisLockWasTriggered_context *context)
45
{
46
	if (node == NULL)
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
		return false;
	if (IsA(node, Var))
	{
		Var		   *var = (Var *) node;

		if (var->varlevelsup == context->sublevels_up &&
			var->varno == context->varno &&
			(var->varattno == context->attnum || context->attnum == -1))
			return true;
		return false;
	}
	if (IsA(node, SubLink))
	{
		/*
		 * Standard expression_tree_walker will not recurse into subselect,
		 * but here we must do so.
		 */
		SubLink    *sub = (SubLink *) node;

		if (thisLockWasTriggered_walker((Node *) (sub->lefthand), context))
			return true;
		context->sublevels_up++;
		if (thisLockWasTriggered_walker((Node *) (sub->subselect), context))
		{
			context->sublevels_up--; /* not really necessary */
			return true;
		}
		context->sublevels_up--;
		return false;
	}
	if (IsA(node, Query))
78
	{
79 80 81 82 83 84 85 86 87 88
		/* Reach here after recursing down into subselect above... */
		Query	   *qry = (Query *) node;

		if (thisLockWasTriggered_walker((Node *) (qry->targetList), context))
			return true;
		if (thisLockWasTriggered_walker((Node *) (qry->qual), context))
			return true;
		if (thisLockWasTriggered_walker((Node *) (qry->havingQual), context))
			return true;
		return false;
89
	}
90 91
	return expression_tree_walker(node, thisLockWasTriggered_walker,
								  (void *) context);
92 93
}

94
static bool
95
thisLockWasTriggered(int varno,
96
					 int attnum,
97
					 Query *parsetree)
98
{
99
	thisLockWasTriggered_context context;
100

101 102 103
	context.varno = varno;
	context.attnum = attnum;
	context.sublevels_up = 0;
104

105
	return thisLockWasTriggered_walker((Node *) parsetree, &context);
106 107 108 109
}

/*
 * matchLocks -
110
 *	  match the list of locks and returns the matching rules
111
 */
112
List *
113
matchLocks(CmdType event,
114
		   RuleLock *rulelocks,
115
		   int varno,
116
		   Query *parsetree)
117
{
118 119 120
	List	   *real_locks = NIL;
	int			nlocks;
	int			i;
121 122 123 124 125 126 127

	Assert(rulelocks != NULL);	/* we get called iff there is some lock */
	Assert(parsetree != NULL);

	if (parsetree->commandType != CMD_SELECT)
	{
		if (parsetree->resultRelation != varno)
128
			return NULL;
129 130
	}

131 132 133 134
	nlocks = rulelocks->numLocks;

	for (i = 0; i < nlocks; i++)
	{
135
		RewriteRule *oneLock = rulelocks->rules[i];
136 137 138 139 140 141 142 143 144

		if (oneLock->event == event)
		{
			if (parsetree->commandType != CMD_SELECT ||
				thisLockWasTriggered(varno,
									 oneLock->attrno,
									 parsetree))
				real_locks = lappend(real_locks, oneLock);
		}
145 146
	}

147 148
	checkLockPerms(real_locks, parsetree, varno);

149
	return real_locks;
150
}
151 152


153
void
154 155 156 157
checkLockPerms(List *locks, Query *parsetree, int rt_index)
{
	Relation	ev_rel;
	HeapTuple	usertup;
158 159
	char	   *evowner;
	RangeTblEntry *rte;
160 161
	int32		reqperm;
	int32		aclcheck_res;
162 163
	int			i;
	List	   *l;
164 165 166 167 168 169 170

	if (locks == NIL)
		return;

	/*
	 * Get the usename of the rules event relation owner
	 */
171
	rte = (RangeTblEntry *) nth(rt_index - 1, parsetree->rtable);
172
	ev_rel = heap_openr(rte->relname, AccessShareLock);
173
	usertup = SearchSysCacheTuple(USESYSID,
174 175
							  ObjectIdGetDatum(ev_rel->rd_rel->relowner),
								  0, 0, 0);
176 177 178 179 180
	if (!HeapTupleIsValid(usertup))
	{
		elog(ERROR, "cache lookup for userid %d failed",
			 ev_rel->rd_rel->relowner);
	}
181
	heap_close(ev_rel, AccessShareLock);
182
	evowner = nameout(&(((Form_pg_shadow) GETSTRUCT(usertup))->usename));
183

184 185 186
	/*
	 * Check all the locks, that should get fired on this query
	 */
187 188 189 190
	foreach(l, locks)
	{
		RewriteRule *onelock = (RewriteRule *) lfirst(l);
		List	   *action;
191 192 193 194

		/*
		 * In each lock check every action
		 */
195 196 197
		foreach(action, onelock->actions)
		{
			Query	   *query = (Query *) lfirst(action);
198 199

			/*
200 201 202
			 * In each action check every rangetable entry for read/write
			 * permission of the event relations owner depending on if
			 * it's the result relation (write) or not (read)
203
			 */
204 205
			for (i = 2; i < length(query->rtable); i++)
			{
206
				if (i + 1 == query->resultRelation)
207 208
					switch (query->resultRelation)
					{
209 210 211 212 213 214 215 216 217 218
						case CMD_INSERT:
							reqperm = ACL_AP;
							break;
						default:
							reqperm = ACL_WR;
							break;
					}
				else
					reqperm = ACL_RD;

219 220 221 222 223 224 225 226
				rte = (RangeTblEntry *) nth(i, query->rtable);
				aclcheck_res = pg_aclcheck(rte->relname,
										   evowner, reqperm);
				if (aclcheck_res != ACLCHECK_OK)
				{
					elog(ERROR, "%s: %s",
						 rte->relname,
						 aclcheck_error_strings[aclcheck_res]);
227 228 229
				}

				/*
230 231
				 * So this is allowed due to the permissions of the rules
				 * event relation owner. But let's see if the next one too
232 233 234 235 236 237 238 239 240 241 242
				 */
				rte->skipAcl = TRUE;
			}
		}
	}

	/*
	 * Phew, that was close
	 */
	return;
}