提交 2ad36c4e 编写于 作者: R Robert Haas

Improve table locking behavior in the face of current DDL.

In the previous coding, callers were faced with an awkward choice:
look up the name, do permissions checks, and then lock the table; or
look up the name, lock the table, and then do permissions checks.
The first choice was wrong because the results of the name lookup
and permissions checks might be out-of-date by the time the table
lock was acquired, while the second allowed a user with no privileges
to interfere with access to a table by users who do have privileges
(e.g. if a malicious backend queues up for an AccessExclusiveLock on
a table on which AccessShareLock is already held, further attempts
to access the table will be blocked until the AccessExclusiveLock
is obtained and the malicious backend's transaction rolls back).

To fix, allow callers of RangeVarGetRelid() to pass a callback which
gets executed after performing the name lookup but before acquiring
the relation lock.  If the name lookup is retried (because
invalidation messages are received), the callback will be re-executed
as well, so we get the best of both worlds.  RangeVarGetRelid() is
renamed to RangeVarGetRelidExtended(); callers not wishing to supply
a callback can continue to invoke it as RangeVarGetRelid(), which is
now a macro.  Since the only one caller that uses nowait = true now
passes a callback anyway, the RangeVarGetRelid() macro defaults nowait
as well.  The callback can also be used for supplemental locking - for
example, REINDEX INDEX needs to acquire the table lock before the index
lock to reduce deadlock possibilities.

There's a lot more work to be done here to fix all the cases where this
can be a problem, but this commit provides the general infrastructure
and fixes the following specific cases: REINDEX INDEX, REINDEX TABLE,
LOCK TABLE, and and DROP TABLE/INDEX/SEQUENCE/VIEW/FOREIGN TABLE.

Per discussion with Noah Misch and Alvaro Herrera.
上级 a87ebace
......@@ -979,7 +979,7 @@ relation_openrv(const RangeVar *relation, LOCKMODE lockmode)
Oid relOid;
/* Look up and lock the appropriate relation using namespace search */
relOid = RangeVarGetRelid(relation, lockmode, false, false);
relOid = RangeVarGetRelid(relation, lockmode, false);
/* Let relation_open do the rest */
return relation_open(relOid, NoLock);
......@@ -1001,7 +1001,7 @@ relation_openrv_extended(const RangeVar *relation, LOCKMODE lockmode,
Oid relOid;
/* Look up and lock the appropriate relation using namespace search */
relOid = RangeVarGetRelid(relation, lockmode, missing_ok, false);
relOid = RangeVarGetRelid(relation, lockmode, missing_ok);
/* Return NULL on not-found */
if (!OidIsValid(relOid))
......
......@@ -606,7 +606,7 @@ objectNamesToOids(GrantObjectType objtype, List *objnames)
RangeVar *relvar = (RangeVar *) lfirst(cell);
Oid relOid;
relOid = RangeVarGetRelid(relvar, NoLock, false, false);
relOid = RangeVarGetRelid(relvar, NoLock, false);
objects = lappend_oid(objects, relOid);
}
break;
......
......@@ -111,7 +111,6 @@ static void validate_index_heapscan(Relation heapRelation,
IndexInfo *indexInfo,
Snapshot snapshot,
v_i_state *state);
static Oid IndexGetRelation(Oid indexId);
static bool ReindexIsCurrentlyProcessingIndex(Oid indexOid);
static void SetReindexProcessing(Oid heapOid, Oid indexOid);
static void ResetReindexProcessing(void);
......@@ -1301,7 +1300,7 @@ index_drop(Oid indexId)
* proceeding until we commit and send out a shared-cache-inval notice
* that will make them update their index lists.
*/
heapId = IndexGetRelation(indexId);
heapId = IndexGetRelation(indexId, false);
userHeapRelation = heap_open(heapId, AccessExclusiveLock);
userIndexRelation = index_open(indexId, AccessExclusiveLock);
......@@ -2763,8 +2762,8 @@ validate_index_heapscan(Relation heapRelation,
* IndexGetRelation: given an index's relation OID, get the OID of the
* relation it is an index on. Uses the system cache.
*/
static Oid
IndexGetRelation(Oid indexId)
Oid
IndexGetRelation(Oid indexId, bool missing_ok)
{
HeapTuple tuple;
Form_pg_index index;
......@@ -2772,7 +2771,11 @@ IndexGetRelation(Oid indexId)
tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexId));
if (!HeapTupleIsValid(tuple))
{
if (missing_ok)
return InvalidOid;
elog(ERROR, "cache lookup failed for index %u", indexId);
}
index = (Form_pg_index) GETSTRUCT(tuple);
Assert(index->indexrelid == indexId);
......@@ -2800,7 +2803,7 @@ reindex_index(Oid indexId, bool skip_constraint_checks)
* Open and lock the parent heap relation. ShareLock is sufficient since
* we only need to be sure no schema or data changes are going on.
*/
heapId = IndexGetRelation(indexId);
heapId = IndexGetRelation(indexId, false);
heapRelation = heap_open(heapId, ShareLock);
/*
......
......@@ -219,10 +219,14 @@ Datum pg_is_other_temp_schema(PG_FUNCTION_ARGS);
* otherwise raise an error.
*
* If nowait = true, throw an error if we'd have to wait for a lock.
*
* Callback allows caller to check permissions or acquire additional locks
* prior to grabbing the relation lock.
*/
Oid
RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode, bool missing_ok,
bool nowait)
RangeVarGetRelidExtended(const RangeVar *relation, LOCKMODE lockmode,
bool missing_ok, bool nowait,
RangeVarGetRelidCallback callback, void *callback_arg)
{
uint64 inval_count;
Oid relId;
......@@ -308,6 +312,19 @@ RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode, bool missing_ok,
relId = RelnameGetRelid(relation->relname);
}
/*
* Invoke caller-supplied callback, if any.
*
* This callback is a good place to check permissions: we haven't taken
* the table lock yet (and it's really best to check permissions before
* locking anything!), but we've gotten far enough to know what OID we
* think we should lock. Of course, concurrent DDL might things while
* we're waiting for the lock, but in that case the callback will be
* invoked again for the new OID.
*/
if (callback)
callback(relation, relId, oldRelId, callback_arg);
/*
* If no lock requested, we assume the caller knows what they're
* doing. They should have already acquired a heavyweight lock on
......
......@@ -111,7 +111,7 @@ ExecRenameStmt(RenameStmt *stmt)
* in RenameRelation, renameatt, or renametrig.
*/
relid = RangeVarGetRelid(stmt->relation, AccessExclusiveLock,
false, false);
false);
switch (stmt->renameType)
{
......
......@@ -63,7 +63,10 @@ static void ComputeIndexAttrs(IndexInfo *indexInfo,
static Oid GetIndexOpClass(List *opclass, Oid attrType,
char *accessMethodName, Oid accessMethodId);
static char *ChooseIndexNameAddition(List *colnames);
static void RangeVarCallbackForReindexTable(const RangeVar *relation,
Oid relId, Oid oldRelId, void *arg);
static void RangeVarCallbackForReindexIndex(const RangeVar *relation,
Oid relId, Oid oldRelId, void *arg);
/*
* CheckIndexCompatible
......@@ -129,7 +132,7 @@ CheckIndexCompatible(Oid oldId,
Datum d;
/* Caller should already have the relation locked in some way. */
relationId = RangeVarGetRelid(heapRelation, NoLock, false, false);
relationId = RangeVarGetRelid(heapRelation, NoLock, false);
/*
* We can pretend isconstraint = false unconditionally. It only serves to
* decide the text of an error message that should never happen for us.
......@@ -1725,33 +1728,74 @@ void
ReindexIndex(RangeVar *indexRelation)
{
Oid indOid;
HeapTuple tuple;
Oid heapOid = InvalidOid;
/* lock level used here should match index lock reindex_index() */
indOid = RangeVarGetRelidExtended(indexRelation, AccessExclusiveLock,
false, false,
RangeVarCallbackForReindexIndex,
(void *) &heapOid);
reindex_index(indOid, false);
}
/*
* Check permissions on table before acquiring relation lock; also lock
* the heap before the RangeVarGetRelidExtended takes the index lock, to avoid
* deadlocks.
*/
static void
RangeVarCallbackForReindexIndex(const RangeVar *relation,
Oid relId, Oid oldRelId, void *arg)
{
char relkind;
Oid *heapOid = (Oid *) arg;
/*
* XXX: This is not safe in the presence of concurrent DDL. We should
* take AccessExclusiveLock here, but that would violate the rule that
* indexes should only be locked after their parent tables. For now,
* we live with it.
* If we previously locked some other index's heap, and the name we're
* looking up no longer refers to that relation, release the now-useless
* lock.
*/
indOid = RangeVarGetRelid(indexRelation, NoLock, false, false);
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(indOid));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for relation %u", indOid);
if (relId != oldRelId && OidIsValid(oldRelId))
{
/* lock level here should match reindex_index() heap lock */
UnlockRelationOid(*heapOid, ShareLock);
*heapOid = InvalidOid;
}
/* If the relation does not exist, there's nothing more to do. */
if (!OidIsValid(relId))
return;
if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX)
/*
* If the relation does exist, check whether it's an index. But note
* that the relation might have been dropped between the time we did the
* name lookup and now. In that case, there's nothing to do.
*/
relkind = get_rel_relkind(relId);
if (!relkind)
return;
if (relkind != RELKIND_INDEX)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not an index",
indexRelation->relname)));
errmsg("\"%s\" is not an index", relation->relname)));
/* Check permissions */
if (!pg_class_ownercheck(indOid, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
indexRelation->relname);
ReleaseSysCache(tuple);
if (!pg_class_ownercheck(relId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, relation->relname);
reindex_index(indOid, false);
/* Lock heap before index to avoid deadlock. */
if (relId != oldRelId)
{
/*
* Lock level here should match reindex_index() heap lock.
* If the OID isn't valid, it means the index as concurrently dropped,
* which is not a problem for us; just return normally.
*/
*heapOid = IndexGetRelation(relId, true);
if (OidIsValid(*heapOid))
LockRelationOid(*heapOid, ShareLock);
}
}
/*
......@@ -1762,27 +1806,10 @@ void
ReindexTable(RangeVar *relation)
{
Oid heapOid;
HeapTuple tuple;
/* The lock level used here should match reindex_relation(). */
heapOid = RangeVarGetRelid(relation, ShareLock, false, false);
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(heapOid));
if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
elog(ERROR, "cache lookup failed for relation %u", heapOid);
if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_RELATION &&
((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_TOASTVALUE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table",
relation->relname)));
/* Check permissions */
if (!pg_class_ownercheck(heapOid, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
relation->relname);
ReleaseSysCache(tuple);
heapOid = RangeVarGetRelidExtended(relation, ShareLock, false, false,
RangeVarCallbackForReindexTable, NULL);
if (!reindex_relation(heapOid, REINDEX_REL_PROCESS_TOAST))
ereport(NOTICE,
......@@ -1790,6 +1817,37 @@ ReindexTable(RangeVar *relation)
relation->relname)));
}
/*
* Check permissions on table before acquiring relation lock.
*/
static void
RangeVarCallbackForReindexTable(const RangeVar *relation,
Oid relId, Oid oldRelId, void *arg)
{
char relkind;
/* Nothing to do if the relation was not found. */
if (!OidIsValid(relId))
return;
/*
* If the relation does exist, check whether it's an index. But note
* that the relation might have been dropped between the time we did the
* name lookup and now. In that case, there's nothing to do.
*/
relkind = get_rel_relkind(relId);
if (!relkind)
return;
if (relkind != RELKIND_RELATION && relkind != RELKIND_TOASTVALUE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table", relation->relname)));
/* Check permissions */
if (!pg_class_ownercheck(relId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, relation->relname);
}
/*
* ReindexDatabase
* Recreate indexes of a database.
......
......@@ -23,10 +23,12 @@
#include "storage/lmgr.h"
#include "utils/acl.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
static void LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait,
bool recurse);
static void LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait);
static AclResult LockTableAclCheck(Oid relid, LOCKMODE lockmode);
static void RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid,
Oid oldrelid, void *arg);
/*
* LOCK TABLE
......@@ -51,98 +53,124 @@ LockTableCommand(LockStmt *lockstmt)
foreach(p, lockstmt->relations)
{
RangeVar *rv = (RangeVar *) lfirst(p);
Relation rel;
bool recurse = interpretInhOption(rv->inhOpt);
Oid reloid;
reloid = RangeVarGetRelid(rv, lockstmt->mode, false, lockstmt->nowait);
rel = relation_open(reloid, NoLock);
reloid = RangeVarGetRelidExtended(rv, lockstmt->mode, false,
lockstmt->nowait,
RangeVarCallbackForLockTable,
(void *) &lockstmt->mode);
LockTableRecurse(rel, lockstmt->mode, lockstmt->nowait, recurse);
if (recurse)
LockTableRecurse(reloid, lockstmt->mode, lockstmt->nowait);
}
}
/*
* Apply LOCK TABLE recursively over an inheritance tree
* Before acquiring a table lock on the named table, check whether we have
* permission to do so.
*/
static void
LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait, bool recurse)
RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, Oid oldrelid,
void *arg)
{
LOCKMODE lockmode = * (LOCKMODE *) arg;
char relkind;
AclResult aclresult;
Oid reloid = RelationGetRelid(rel);
/* Verify adequate privilege */
if (lockmode == AccessShareLock)
aclresult = pg_class_aclcheck(reloid, GetUserId(),
ACL_SELECT);
else
aclresult = pg_class_aclcheck(reloid, GetUserId(),
ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_CLASS,
RelationGetRelationName(rel));
if (!OidIsValid(relid))
return; /* doesn't exist, so no permissions check */
relkind = get_rel_relkind(relid);
if (!relkind)
return; /* woops, concurrently dropped; no permissions check */
/* Currently, we only allow plain tables to be locked */
if (rel->rd_rel->relkind != RELKIND_RELATION)
if (relkind != RELKIND_RELATION)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table",
RelationGetRelationName(rel))));
rv->relname)));
/*
* If requested, recurse to children. We use find_inheritance_children
* not find_all_inheritors to avoid taking locks far in advance of
* checking privileges. This means we'll visit multiply-inheriting
* children more than once, but that's no problem.
*/
if (recurse)
/* Check permissions. */
aclresult = LockTableAclCheck(relid, lockmode);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_CLASS, rv->relname);
}
/*
* Apply LOCK TABLE recursively over an inheritance tree
*
* We use find_inheritance_children not find_all_inheritors to avoid taking
* locks far in advance of checking privileges. This means we'll visit
* multiply-inheriting children more than once, but that's no problem.
*/
static void
LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait)
{
List *children;
ListCell *lc;
children = find_inheritance_children(reloid, NoLock);
foreach(lc, children)
{
List *children = find_inheritance_children(reloid, NoLock);
ListCell *lc;
Relation childrel;
Oid childreloid = lfirst_oid(lc);
AclResult aclresult;
/* Check permissions before acquiring the lock. */
aclresult = LockTableAclCheck(childreloid, lockmode);
if (aclresult != ACLCHECK_OK)
{
char *relname = get_rel_name(childreloid);
if (!relname)
continue; /* child concurrently dropped, just skip it */
aclcheck_error(aclresult, ACL_KIND_CLASS, relname);
}
foreach(lc, children)
/* We have enough rights to lock the relation; do so. */
if (!nowait)
LockRelationOid(childreloid, lockmode);
else if (!ConditionalLockRelationOid(childreloid, lockmode))
{
Oid childreloid = lfirst_oid(lc);
/*
* Acquire the lock, to protect against concurrent drops. Note
* that a lock against an already-dropped relation's OID won't
* fail.
*/
if (!nowait)
LockRelationOid(childreloid, lockmode);
else if (!ConditionalLockRelationOid(childreloid, lockmode))
{
/* try to throw error by name; relation could be deleted... */
char *relname = get_rel_name(childreloid);
if (relname)
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("could not obtain lock on relation \"%s\"",
relname)));
else
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("could not obtain lock on relation with OID %u",
reloid)));
}
/*
* Now that we have the lock, check to see if the relation really
* exists or not.
*/
childrel = try_relation_open(childreloid, NoLock);
if (!childrel)
{
/* Release useless lock */
UnlockRelationOid(childreloid, lockmode);
}
LockTableRecurse(childrel, lockmode, nowait, recurse);
/* try to throw error by name; relation could be deleted... */
char *relname = get_rel_name(childreloid);
if (!relname)
continue; /* child concurrently dropped, just skip it */
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("could not obtain lock on relation \"%s\"",
relname)));
}
/*
* Even if we got the lock, child might have been concurrently dropped.
* If so, we can skip it.
*/
if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(childreloid)))
{
/* Release useless lock */
UnlockRelationOid(childreloid, lockmode);
continue;
}
LockTableRecurse(childreloid, lockmode, nowait);
}
}
relation_close(rel, NoLock); /* close rel, keep lock */
/*
* Check whether the current user is permitted to lock this relation.
*/
static AclResult
LockTableAclCheck(Oid reloid, LOCKMODE lockmode)
{
AclResult aclresult;
/* Verify adequate privilege */
if (lockmode == AccessShareLock)
aclresult = pg_class_aclcheck(reloid, GetUserId(),
ACL_SELECT);
else
aclresult = pg_class_aclcheck(reloid, GetUserId(),
ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE);
return aclresult;
}
......@@ -425,7 +425,7 @@ AlterSequence(AlterSeqStmt *stmt)
List *owned_by;
/* Open and lock sequence. */
relid = RangeVarGetRelid(stmt->sequence, AccessShareLock, false, false);
relid = RangeVarGetRelid(stmt->sequence, AccessShareLock, false);
init_sequence(relid, &elm, &seqrel);
/* allow ALTER to sequence owner only */
......@@ -513,7 +513,7 @@ nextval(PG_FUNCTION_ARGS)
* whether the performance penalty is material in practice, but for now,
* we do it this way.
*/
relid = RangeVarGetRelid(sequence, NoLock, false, false);
relid = RangeVarGetRelid(sequence, NoLock, false);
PG_RETURN_INT64(nextval_internal(relid));
}
......
......@@ -235,6 +235,12 @@ static const struct dropmsgstrings dropmsgstringarray[] = {
{'\0', 0, NULL, NULL, NULL, NULL}
};
struct DropRelationCallbackState
{
char relkind;
Oid heapOid;
};
/* Alter table target-type flags for ATSimplePermissions */
#define ATT_TABLE 0x0001
#define ATT_VIEW 0x0002
......@@ -375,6 +381,9 @@ static void copy_relation_data(SMgrRelation rel, SMgrRelation dst,
ForkNumber forkNum, char relpersistence);
static const char *storage_name(char c);
static void RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid,
Oid oldRelOid, void *arg);
/* ----------------------------------------------------------------
* DefineRelation
......@@ -751,9 +760,8 @@ RemoveRelations(DropStmt *drop)
{
RangeVar *rel = makeRangeVarFromNameList((List *) lfirst(cell));
Oid relOid;
HeapTuple tuple;
Form_pg_class classform;
ObjectAddress obj;
struct DropRelationCallbackState state;
/*
* These next few steps are a great deal like relation_openrv, but we
......@@ -767,15 +775,13 @@ RemoveRelations(DropStmt *drop)
*/
AcceptInvalidationMessages();
/*
* Look up the appropriate relation using namespace search.
*
* XXX: Doing this without a lock is unsafe in the presence of
* concurrent DDL, but acquiring a lock here might violate the rule
* that a table must be locked before its corresponding index.
* So, for now, we ignore the hazard.
*/
relOid = RangeVarGetRelid(rel, NoLock, true, false);
/* Look up the appropriate relation using namespace search. */
state.relkind = relkind;
state.heapOid = InvalidOid;
relOid = RangeVarGetRelidExtended(rel, AccessExclusiveLock, true,
false,
RangeVarCallbackForDropRelation,
(void *) &state);
/* Not there? */
if (!OidIsValid(relOid))
......@@ -784,57 +790,12 @@ RemoveRelations(DropStmt *drop)
continue;
}
/*
* In DROP INDEX, attempt to acquire lock on the parent table before
* locking the index. index_drop() will need this anyway, and since
* regular queries lock tables before their indexes, we risk deadlock
* if we do it the other way around. No error if we don't find a
* pg_index entry, though --- that most likely means it isn't an
* index, and we'll fail below.
*/
if (relkind == RELKIND_INDEX)
{
tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(relOid));
if (HeapTupleIsValid(tuple))
{
Form_pg_index index = (Form_pg_index) GETSTRUCT(tuple);
LockRelationOid(index->indrelid, AccessExclusiveLock);
ReleaseSysCache(tuple);
}
}
/* Get the lock before trying to fetch the syscache entry */
LockRelationOid(relOid, AccessExclusiveLock);
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for relation %u", relOid);
classform = (Form_pg_class) GETSTRUCT(tuple);
if (classform->relkind != relkind)
DropErrorMsgWrongType(rel->relname, classform->relkind, relkind);
/* Allow DROP to either table owner or schema owner */
if (!pg_class_ownercheck(relOid, GetUserId()) &&
!pg_namespace_ownercheck(classform->relnamespace, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
rel->relname);
if (!allowSystemTableMods && IsSystemClass(classform))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied: \"%s\" is a system catalog",
rel->relname)));
/* OK, we're ready to delete this one */
obj.classId = RelationRelationId;
obj.objectId = relOid;
obj.objectSubId = 0;
add_exact_object_address(&obj, objects);
ReleaseSysCache(tuple);
}
performMultipleDeletions(objects, drop->behavior);
......@@ -842,6 +803,74 @@ RemoveRelations(DropStmt *drop)
free_object_addresses(objects);
}
/*
* Before acquiring a table lock, check whether we have sufficient rights.
* In the case of DROP INDEX, also try to lock the table before the index.
*/
static void
RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid, Oid oldRelOid,
void *arg)
{
HeapTuple tuple;
struct DropRelationCallbackState *state;
char relkind;
Form_pg_class classform;
state = (struct DropRelationCallbackState *) arg;
relkind = state->relkind;
/*
* If we previously locked some other index's heap, and the name we're
* looking up no longer refers to that relation, release the now-useless
* lock.
*/
if (relOid != oldRelOid && OidIsValid(state->heapOid))
{
UnlockRelationOid(state->heapOid, AccessExclusiveLock);
state->heapOid = InvalidOid;
}
/* Didn't find a relation, so need for locking or permission checks. */
if (!OidIsValid(relOid))
return;
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid));
if (!HeapTupleIsValid(tuple))
return; /* concurrently dropped, so nothing to do */
classform = (Form_pg_class) GETSTRUCT(tuple);
if (classform->relkind != relkind)
DropErrorMsgWrongType(rel->relname, classform->relkind, relkind);
/* Allow DROP to either table owner or schema owner */
if (!pg_class_ownercheck(relOid, GetUserId()) &&
!pg_namespace_ownercheck(classform->relnamespace, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
rel->relname);
if (!allowSystemTableMods && IsSystemClass(classform))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied: \"%s\" is a system catalog",
rel->relname)));
ReleaseSysCache(tuple);
/*
* In DROP INDEX, attempt to acquire lock on the parent table before
* locking the index. index_drop() will need this anyway, and since
* regular queries lock tables before their indexes, we risk deadlock
* if we do it the other way around. No error if we don't find a
* pg_index entry, though --- the relation may have been droppd.
*/
if (relkind == RELKIND_INDEX && relOid != oldRelOid)
{
state->heapOid = IndexGetRelation(relOid, true);
if (OidIsValid(state->heapOid))
LockRelationOid(state->heapOid, AccessExclusiveLock);
}
}
/*
* ExecuteTruncate
* Executes a TRUNCATE command.
......
......@@ -201,8 +201,7 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString,
* we might end up creating a pg_constraint entry referencing a
* nonexistent table.
*/
constrrelid = RangeVarGetRelid(stmt->constrrel, AccessShareLock, false,
false);
constrrelid = RangeVarGetRelid(stmt->constrrel, AccessShareLock, false);
}
/* permission checks */
......
......@@ -330,7 +330,7 @@ get_rel_oids(Oid relid, const RangeVar *vacrel)
* alternative, since we're going to commit this transaction and
* begin a new one between now and then.
*/
relid = RangeVarGetRelid(vacrel, NoLock, false, false);
relid = RangeVarGetRelid(vacrel, NoLock, false);
/* Make a relation list entry for this guy */
oldcontext = MemoryContextSwitchTo(vac_context);
......
......@@ -282,7 +282,7 @@ searchRangeTable(ParseState *pstate, RangeVar *relation)
if (!relation->schemaname)
cte = scanNameSpaceForCTE(pstate, refname, &ctelevelsup);
if (!cte)
relId = RangeVarGetRelid(relation, NoLock, true, false);
relId = RangeVarGetRelid(relation, NoLock, true);
/* Now look for RTEs matching either the relation/CTE or the alias */
for (levelsup = 0;
......
......@@ -115,7 +115,7 @@ LookupTypeName(ParseState *pstate, const TypeName *typeName,
* of concurrent DDL. But taking a lock would carry a performance
* penalty and would also require a permissions check.
*/
relid = RangeVarGetRelid(rel, NoLock, false, false);
relid = RangeVarGetRelid(rel, NoLock, false);
attnum = get_attnum(relid, field);
if (attnum == InvalidAttrNumber)
ereport(ERROR,
......
......@@ -203,8 +203,7 @@ DefineRule(RuleStmt *stmt, const char *queryString)
* Find and lock the relation. Lock level should match
* DefineQueryRewrite.
*/
relId = RangeVarGetRelid(stmt->relation, AccessExclusiveLock, false,
false);
relId = RangeVarGetRelid(stmt->relation, AccessExclusiveLock, false);
/* ... and execute */
DefineQueryRewrite(stmt->rulename,
......
......@@ -84,7 +84,7 @@ CheckRelationOwnership(RangeVar *rel, bool noCatalogs)
* AccessExclusiveLock) before verifying that the user has permissions
* is not appealing either.
*/
relOid = RangeVarGetRelid(rel, NoLock, false, false);
relOid = RangeVarGetRelid(rel, NoLock, false);
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid));
if (!HeapTupleIsValid(tuple)) /* should not happen */
......
......@@ -1940,7 +1940,7 @@ convert_table_name(text *tablename)
relrv = makeRangeVarFromNameList(textToQualifiedNameList(tablename));
/* We might not even have permissions on this relation; don't lock it. */
return RangeVarGetRelid(relrv, NoLock, false, false);
return RangeVarGetRelid(relrv, NoLock, false);
}
/*
......
......@@ -824,8 +824,7 @@ regclassin(PG_FUNCTION_ARGS)
names = stringToQualifiedNameList(class_name_or_oid);
/* We might not even have permissions on this relation; don't lock it. */
result = RangeVarGetRelid(makeRangeVarFromNameList(names), NoLock, false,
false);
result = RangeVarGetRelid(makeRangeVarFromNameList(names), NoLock, false);
PG_RETURN_OID(result);
}
......@@ -1298,7 +1297,7 @@ text_regclass(PG_FUNCTION_ARGS)
rv = makeRangeVarFromNameList(textToQualifiedNameList(relname));
/* We might not even have permissions on this relation; don't lock it. */
result = RangeVarGetRelid(rv, NoLock, false, false);
result = RangeVarGetRelid(rv, NoLock, false);
PG_RETURN_OID(result);
}
......
......@@ -390,7 +390,7 @@ pg_get_viewdef_name(PG_FUNCTION_ARGS)
/* Look up view name. Can't lock it - we might not have privileges. */
viewrel = makeRangeVarFromNameList(textToQualifiedNameList(viewname));
viewoid = RangeVarGetRelid(viewrel, NoLock, false, false);
viewoid = RangeVarGetRelid(viewrel, NoLock, false);
PG_RETURN_TEXT_P(string_to_text(pg_get_viewdef_worker(viewoid, 0)));
}
......@@ -410,7 +410,7 @@ pg_get_viewdef_name_ext(PG_FUNCTION_ARGS)
/* Look up view name. Can't lock it - we might not have privileges. */
viewrel = makeRangeVarFromNameList(textToQualifiedNameList(viewname));
viewoid = RangeVarGetRelid(viewrel, NoLock, false, false);
viewoid = RangeVarGetRelid(viewrel, NoLock, false);
PG_RETURN_TEXT_P(string_to_text(pg_get_viewdef_worker(viewoid, prettyFlags)));
}
......@@ -1576,7 +1576,7 @@ pg_get_serial_sequence(PG_FUNCTION_ARGS)
/* Look up table name. Can't lock it - we might not have privileges. */
tablerv = makeRangeVarFromNameList(textToQualifiedNameList(tablename));
tableOid = RangeVarGetRelid(tablerv, NoLock, false, false);
tableOid = RangeVarGetRelid(tablerv, NoLock, false);
/* Get the number of the column */
column = text_to_cstring(columnname);
......
......@@ -99,5 +99,6 @@ extern bool reindex_relation(Oid relid, int flags);
extern bool ReindexIsProcessingHeap(Oid heapOid);
extern bool ReindexIsProcessingIndex(Oid indexOid);
extern Oid IndexGetRelation(Oid indexId, bool missing_ok);
#endif /* INDEX_H */
......@@ -47,9 +47,16 @@ typedef struct OverrideSearchPath
bool addTemp; /* implicitly prepend temp schema? */
} OverrideSearchPath;
typedef void (*RangeVarGetRelidCallback)(const RangeVar *relation, Oid relId,
Oid oldRelId, void *callback_arg);
extern Oid RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode,
bool missing_ok, bool nowait);
#define RangeVarGetRelid(relation, lockmode, missing_ok) \
RangeVarGetRelidExtended(relation, lockmode, missing_ok, false, NULL, NULL)
extern Oid RangeVarGetRelidExtended(const RangeVar *relation,
LOCKMODE lockmode, bool missing_ok, bool nowait,
RangeVarGetRelidCallback callback,
void *callback_arg);
extern Oid RangeVarGetCreationNamespace(const RangeVar *newRelation);
extern Oid RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation);
extern void RangeVarAdjustRelationPersistence(RangeVar *newRelation, Oid nspid);
......
......@@ -1707,7 +1707,7 @@ plpgsql_parse_cwordtype(List *idents)
strVal(lsecond(idents)),
-1);
/* Can't lock relation - we might not have privileges. */
classOid = RangeVarGetRelid(relvar, NoLock, true, false);
classOid = RangeVarGetRelid(relvar, NoLock, true);
if (!OidIsValid(classOid))
goto done;
fldname = strVal(lthird(idents));
......@@ -1808,7 +1808,7 @@ plpgsql_parse_cwordrowtype(List *idents)
relvar = makeRangeVar(strVal(linitial(idents)),
strVal(lsecond(idents)),
-1);
classOid = RangeVarGetRelid(relvar, NoLock, false, false);
classOid = RangeVarGetRelid(relvar, NoLock, false);
MemoryContextSwitchTo(oldCxt);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册