lock.c 108.6 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * lock.c
4
 *	  POSTGRES primary lock mechanism
5
 *
B
Bruce Momjian 已提交
6
 * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
B
Add:  
Bruce Momjian 已提交
7
 * Portions Copyright (c) 1994, Regents of the University of California
8 9 10
 *
 *
 * IDENTIFICATION
11
 *	  src/backend/storage/lmgr/lock.c
12 13
 *
 * NOTES
14
 *	  A lock table is a shared memory hash table.  When
15
 *	  a process tries to acquire a lock of a type that conflicts
16 17
 *	  with existing locks, it is put to sleep using the routines
 *	  in storage/lmgr/proc.c.
18
 *
19 20 21
 *	  For the most part, this code should be invoked via lmgr.c
 *	  or another lock-management module, not directly.
 *
22
 *	Interface:
23
 *
24 25
 *	InitLocks(), GetLocksMethodTable(),
 *	LockAcquire(), LockRelease(), LockReleaseAll(),
26
 *	LockCheckConflicts(), GrantLock()
27 28 29
 *
 *-------------------------------------------------------------------------
 */
30 31
#include "postgres.h"

M
 
Marc G. Fournier 已提交
32
#include <signal.h>
33
#include <unistd.h>
B
Bruce Momjian 已提交
34

35
#include "access/transam.h"
36 37
#include "access/twophase.h"
#include "access/twophase_rmgr.h"
B
Bruce Momjian 已提交
38
#include "miscadmin.h"
39
#include "pg_trace.h"
40
#include "pgstat.h"
41
#include "storage/sinvaladt.h"
42
#include "storage/spin.h"
43
#include "storage/standby.h"
44
#include "utils/memutils.h"
M
 
Marc G. Fournier 已提交
45
#include "utils/ps_status.h"
46
#include "utils/resowner.h"
47

48 49

/* This configuration variable is used to set the lock table size */
50
int			max_locks_per_xact; /* set by guc.c */
51

B
Bruce Momjian 已提交
52
#define NLOCKENTS() \
53
	mul_size(max_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
54 55


56
/*
57 58 59
 * Data structures defining the semantics of the standard lock methods.
 *
 * The conflict table defines the semantics of the various lock modes.
60
 */
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
static const LOCKMASK LockConflicts[] = {
	0,

	/* AccessShareLock */
	(1 << AccessExclusiveLock),

	/* RowShareLock */
	(1 << ExclusiveLock) | (1 << AccessExclusiveLock),

	/* RowExclusiveLock */
	(1 << ShareLock) | (1 << ShareRowExclusiveLock) |
	(1 << ExclusiveLock) | (1 << AccessExclusiveLock),

	/* ShareUpdateExclusiveLock */
	(1 << ShareUpdateExclusiveLock) |
	(1 << ShareLock) | (1 << ShareRowExclusiveLock) |
	(1 << ExclusiveLock) | (1 << AccessExclusiveLock),

	/* ShareLock */
	(1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
	(1 << ShareRowExclusiveLock) |
	(1 << ExclusiveLock) | (1 << AccessExclusiveLock),

	/* ShareRowExclusiveLock */
	(1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
	(1 << ShareLock) | (1 << ShareRowExclusiveLock) |
	(1 << ExclusiveLock) | (1 << AccessExclusiveLock),

	/* ExclusiveLock */
	(1 << RowShareLock) |
	(1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
	(1 << ShareLock) | (1 << ShareRowExclusiveLock) |
	(1 << ExclusiveLock) | (1 << AccessExclusiveLock),

	/* AccessExclusiveLock */
	(1 << AccessShareLock) | (1 << RowShareLock) |
	(1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
	(1 << ShareLock) | (1 << ShareRowExclusiveLock) |
	(1 << ExclusiveLock) | (1 << AccessExclusiveLock)
100

101
};
102

103
/* Names of lock modes, for debug printouts */
B
Bruce Momjian 已提交
104
static const char *const lock_mode_names[] =
105
{
106
	"INVALID",
107 108 109 110 111 112 113 114
	"AccessShareLock",
	"RowShareLock",
	"RowExclusiveLock",
	"ShareUpdateExclusiveLock",
	"ShareLock",
	"ShareRowExclusiveLock",
	"ExclusiveLock",
	"AccessExclusiveLock"
115
};
116

117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
/*
 * Count of the number of fast path lock slots we believe to be used.  This
 * might be higher than the real number if another backend has transferred
 * our locks to the primary lock table, but it can never be lower than the
 * real value, since only we can acquire locks on our own behalf.
 */
static int			FastPathLocalUseCount = 0;

/* Macros for manipulating proc->fpLockBits */
#define FAST_PATH_BITS_PER_SLOT			3
#define FAST_PATH_LOCKNUMBER_OFFSET		1
#define FAST_PATH_MASK					((1 << FAST_PATH_BITS_PER_SLOT) - 1)
#define FAST_PATH_GET_BITS(proc, n) \
	(((proc)->fpLockBits >> (FAST_PATH_BITS_PER_SLOT * n)) & FAST_PATH_MASK)
#define FAST_PATH_BIT_POSITION(n, l) \
	(AssertMacro((l) >= FAST_PATH_LOCKNUMBER_OFFSET), \
	 AssertMacro((l) < FAST_PATH_BITS_PER_SLOT+FAST_PATH_LOCKNUMBER_OFFSET), \
	 AssertMacro((n) < FP_LOCK_SLOTS_PER_BACKEND), \
	 ((l) - FAST_PATH_LOCKNUMBER_OFFSET + FAST_PATH_BITS_PER_SLOT * (n)))
#define FAST_PATH_SET_LOCKMODE(proc, n, l) \
	 (proc)->fpLockBits |= UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l)
#define FAST_PATH_CLEAR_LOCKMODE(proc, n, l) \
	 (proc)->fpLockBits &= ~(UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l))
#define FAST_PATH_CHECK_LOCKMODE(proc, n, l) \
	 ((proc)->fpLockBits & (UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l)))

/*
 * The fast-path lock mechanism is concerned only with relation locks on
 * unshared relations by backends bound to a database.  The fast-path
 * mechanism exists mostly to accelerate acquisition and release of locks
 * that rarely conflict.  Because ShareUpdateExclusiveLock is
 * self-conflicting, it can't use the fast-path mechanism; but it also does
 * not conflict with any of the locks that do, so we can ignore it completely.
 */
#define FastPathTag(locktag) \
	((locktag)->locktag_lockmethodid == DEFAULT_LOCKMETHOD && \
	(locktag)->locktag_type == LOCKTAG_RELATION && \
	(locktag)->locktag_field1 == MyDatabaseId && \
	MyDatabaseId != InvalidOid)
#define FastPathWeakMode(mode)		((mode) < ShareUpdateExclusiveLock)
#define FastPathStrongMode(mode)	((mode) > ShareUpdateExclusiveLock)
#define FastPathRelevantMode(mode)	((mode) != ShareUpdateExclusiveLock)

160 161 162
static bool FastPathGrantRelationLock(Oid relid, LOCKMODE lockmode);
static bool FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode);
static bool FastPathTransferRelationLocks(LockMethod lockMethodTable,
163
					  const LOCKTAG *locktag, uint32 hashcode);
164
static PROCLOCK *FastPathGetRelationLockEntry(LOCALLOCK *locallock);
165
static void VirtualXactLockTableCleanup(void);
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191

/*
 * To make the fast-path lock mechanism work, we must have some way of
 * preventing the use of the fast-path when a conflicting lock might be
 * present.  We partition* the locktag space into FAST_PATH_HASH_BUCKETS
 * partitions, and maintain an integer count of the number of "strong" lockers
 * in each partition.  When any "strong" lockers are present (which is
 * hopefully not very often), the fast-path mechanism can't be used, and we
 * must fall back to the slower method of pushing matching locks directly
 * into the main lock tables.
 *
 * The deadlock detector does not know anything about the fast path mechanism,
 * so any locks that might be involved in a deadlock must be transferred from
 * the fast-path queues to the main lock table.
 */

#define FAST_PATH_STRONG_LOCK_HASH_BITS			10
#define FAST_PATH_STRONG_LOCK_HASH_PARTITIONS \
	(1 << FAST_PATH_STRONG_LOCK_HASH_BITS)
#define FastPathStrongLockHashPartition(hashcode) \
	((hashcode) % FAST_PATH_STRONG_LOCK_HASH_PARTITIONS)

typedef struct
{
	slock_t mutex;
	uint32 count[FAST_PATH_STRONG_LOCK_HASH_PARTITIONS];
192
} FastPathStrongRelationLockData;
193

194
FastPathStrongRelationLockData *FastPathStrongRelationLocks;
195

196
#ifndef LOCK_DEBUG
B
Bruce Momjian 已提交
197
static bool Dummy_trace = false;
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
#endif

static const LockMethodData default_lockmethod = {
	AccessExclusiveLock,		/* highest valid lock mode number */
	true,
	LockConflicts,
	lock_mode_names,
#ifdef LOCK_DEBUG
	&Trace_locks
#else
	&Dummy_trace
#endif
};

static const LockMethodData user_lockmethod = {
	AccessExclusiveLock,		/* highest valid lock mode number */
214
	true,
215
	LockConflicts,
216 217 218 219 220 221
	lock_mode_names,
#ifdef LOCK_DEBUG
	&Trace_userlocks
#else
	&Dummy_trace
#endif
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
};

/*
 * map from lock method id to the lock table data structures
 */
static const LockMethod LockMethods[] = {
	NULL,
	&default_lockmethod,
	&user_lockmethod
};


/* Record that's written to 2PC state file when a lock is persisted */
typedef struct TwoPhaseLockRecord
{
	LOCKTAG		locktag;
	LOCKMODE	lockmode;
} TwoPhaseLockRecord;


/*
243 244 245 246
 * Pointers to hash tables containing lock state
 *
 * The LockMethodLockHash and LockMethodProcLockHash hash tables are in
 * shared memory; LockMethodLocalHash is local to each backend.
247
 */
248 249
static HTAB *LockMethodLockHash;
static HTAB *LockMethodProcLockHash;
250 251 252 253 254 255 256
static HTAB *LockMethodLocalHash;


/* private state for GrantAwaitedLock */
static LOCALLOCK *awaitedLock;
static ResourceOwner awaitedOwner;

257 258 259 260 261 262

#ifdef LOCK_DEBUG

/*------
 * The following configuration options are available for lock debugging:
 *
B
Bruce Momjian 已提交
263 264 265 266 267 268
 *	   TRACE_LOCKS		-- give a bunch of output what's going on in this file
 *	   TRACE_USERLOCKS	-- same but for user locks
 *	   TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
 *						   (use to avoid output on system tables)
 *	   TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
 *	   DEBUG_DEADLOCKS	-- currently dumps locks at untimely occasions ;)
B
Bruce Momjian 已提交
269
 *
270 271
 * Furthermore, but in storage/lmgr/lwlock.c:
 *	   TRACE_LWLOCKS	-- trace lightweight locks (pretty useless)
272
 *
B
Bruce Momjian 已提交
273 274
 * Define LOCK_DEBUG at compile time to get all these enabled.
 * --------
275 276
 */

277
int			Trace_lock_oidmin = FirstNormalObjectId;
B
Bruce Momjian 已提交
278
bool		Trace_locks = false;
279
bool		Trace_userlocks = false;
B
Bruce Momjian 已提交
280 281
int			Trace_lock_table = 0;
bool		Debug_deadlocks = false;
282 283 284


inline static bool
285
LOCK_DEBUG_ENABLED(const LOCKTAG *tag)
286
{
B
Bruce Momjian 已提交
287
	return
288 289 290 291
		(*(LockMethods[tag->locktag_lockmethodid]->trace_flag) &&
		 ((Oid) tag->locktag_field2 >= (Oid) Trace_lock_oidmin))
		|| (Trace_lock_table &&
			(tag->locktag_field2 == Trace_lock_table));
292 293 294 295
}


inline static void
B
Bruce Momjian 已提交
296
LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
297
{
298
	if (LOCK_DEBUG_ENABLED(&lock->tag))
299
		elog(LOG,
300
			 "%s: lock(%p) id(%u,%u,%u,%u,%u,%u) grantMask(%x) "
B
Bruce Momjian 已提交
301 302
			 "req(%d,%d,%d,%d,%d,%d,%d)=%d "
			 "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
303
			 where, lock,
304 305 306 307
			 lock->tag.locktag_field1, lock->tag.locktag_field2,
			 lock->tag.locktag_field3, lock->tag.locktag_field4,
			 lock->tag.locktag_type, lock->tag.locktag_lockmethodid,
			 lock->grantMask,
B
Bruce Momjian 已提交
308
			 lock->requested[1], lock->requested[2], lock->requested[3],
309 310
			 lock->requested[4], lock->requested[5], lock->requested[6],
			 lock->requested[7], lock->nRequested,
B
Bruce Momjian 已提交
311 312 313
			 lock->granted[1], lock->granted[2], lock->granted[3],
			 lock->granted[4], lock->granted[5], lock->granted[6],
			 lock->granted[7], lock->nGranted,
314 315
			 lock->waitProcs.size,
			 LockMethods[LOCK_LOCKMETHOD(*lock)]->lockModeNames[type]);
316 317 318 319
}


inline static void
320
PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
321
{
322
	if (LOCK_DEBUG_ENABLED(&proclockP->tag.myLock->tag))
323
		elog(LOG,
324 325
			 "%s: proclock(%p) lock(%p) method(%u) proc(%p) hold(%x)",
			 where, proclockP, proclockP->tag.myLock,
326
			 PROCLOCK_LOCKMETHOD(*(proclockP)),
327
			 proclockP->tag.myProc, (int) proclockP->holdMask);
328
}
B
Bruce Momjian 已提交
329
#else							/* not LOCK_DEBUG */
330 331

#define LOCK_PRINT(where, lock, type)
332
#define PROCLOCK_PRINT(where, proclockP)
333
#endif   /* not LOCK_DEBUG */
334

335

336
static uint32 proclock_hash(const void *key, Size keysize);
337
static void RemoveLocalLock(LOCALLOCK *locallock);
338 339
static PROCLOCK *SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
			     const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode);
340
static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
341
static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
342
static void ReleaseLockForOwner(LOCALLOCK *locallock, ResourceOwner owner);
343
static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
B
Bruce Momjian 已提交
344
			PROCLOCK *proclock, LockMethod lockMethodTable);
345
static void CleanUpLock(LOCK *lock, PROCLOCK *proclock,
346
			LockMethod lockMethodTable, uint32 hashcode,
347
			bool wakeupNeeded);
348 349 350
static void LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
					 LOCKTAG *locktag, LOCKMODE lockmode,
					 bool decrement_strong_lock_count);
351

352

B
Bruce Momjian 已提交
353
/*
354 355 356 357 358 359 360 361 362 363
 * InitLocks -- Initialize the lock manager's data structures.
 *
 * This is called from CreateSharedMemoryAndSemaphores(), which see for
 * more comments.  In the normal postmaster case, the shared hash tables
 * are created here, as well as a locallock hash table that will remain
 * unused and empty in the postmaster itself.  Backends inherit the pointers
 * to the shared tables via fork(), and also inherit an image of the locallock
 * hash table, which they proceed to use.  In the EXEC_BACKEND case, each
 * backend re-executes this code to obtain pointers to the already existing
 * shared hash tables and to create its locallock hash table.
364 365
 */
void
366
InitLocks(void)
367
{
368 369
	HASHCTL		info;
	int			hash_flags;
370 371
	long		init_table_size,
				max_table_size;
372
	bool		found;
373

374 375 376 377
	/*
	 * Compute init/max size to request for lock hashtables.  Note these
	 * calculations must agree with LockShmemSize!
	 */
378
	max_table_size = NLOCKENTS();
379
	init_table_size = max_table_size / 2;
380

B
Bruce Momjian 已提交
381
	/*
B
Bruce Momjian 已提交
382 383
	 * Allocate hash table for LOCK structs.  This stores per-locked-object
	 * information.
384
	 */
385
	MemSet(&info, 0, sizeof(info));
386 387
	info.keysize = sizeof(LOCKTAG);
	info.entrysize = sizeof(LOCK);
388
	info.hash = tag_hash;
389 390
	info.num_partitions = NUM_LOCK_PARTITIONS;
	hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
391

392 393 394 395 396
	LockMethodLockHash = ShmemInitHash("LOCK hash",
									   init_table_size,
									   max_table_size,
									   &info,
									   hash_flags);
397

398 399 400
	/* Assume an average of 2 holders per lock */
	max_table_size *= 2;
	init_table_size *= 2;
401

B
Bruce Momjian 已提交
402
	/*
403
	 * Allocate hash table for PROCLOCK structs.  This stores
404
	 * per-lock-per-holder information.
405
	 */
B
Bruce Momjian 已提交
406 407
	info.keysize = sizeof(PROCLOCKTAG);
	info.entrysize = sizeof(PROCLOCK);
408 409 410 411 412 413 414 415 416
	info.hash = proclock_hash;
	info.num_partitions = NUM_LOCK_PARTITIONS;
	hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);

	LockMethodProcLockHash = ShmemInitHash("PROCLOCK hash",
										   init_table_size,
										   max_table_size,
										   &info,
										   hash_flags);
417

418 419 420
	/*
	 * Allocate fast-path structures.
	 */
421 422 423
	FastPathStrongRelationLocks =
		ShmemInitStruct("Fast Path Strong Relation Lock Data",
		sizeof(FastPathStrongRelationLockData), &found);
424
	if (!found)
425
		SpinLockInit(&FastPathStrongRelationLocks->mutex);
426

427
	/*
B
Bruce Momjian 已提交
428 429
	 * Allocate non-shared hash table for LOCALLOCK structs.  This stores lock
	 * counts and resource owner information.
430
	 *
431 432 433 434
	 * The non-shared table could already exist in this process (this occurs
	 * when the postmaster is recreating shared memory after a backend crash).
	 * If so, delete and recreate it.  (We could simply leave it, since it
	 * ought to be empty in the postmaster, but for safety let's zap it.)
435
	 */
436 437
	if (LockMethodLocalHash)
		hash_destroy(LockMethodLocalHash);
438 439 440 441 442 443

	info.keysize = sizeof(LOCALLOCKTAG);
	info.entrysize = sizeof(LOCALLOCK);
	info.hash = tag_hash;
	hash_flags = (HASH_ELEM | HASH_FUNCTION);

444
	LockMethodLocalHash = hash_create("LOCALLOCK hash",
445
									  16,
446 447
									  &info,
									  hash_flags);
448 449
}

450

451
/*
452
 * Fetch the lock method table associated with a given lock
453
 */
454 455
LockMethod
GetLocksMethodTable(const LOCK *lock)
456
{
457
	LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock);
458

459 460
	Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
	return LockMethods[lockmethodid];
461 462
}

463

464
/*
465
 * Compute the hash code associated with a LOCKTAG.
466
 *
467 468 469 470
 * To avoid unnecessary recomputations of the hash code, we try to do this
 * just once per function, and then pass it around as needed.  Aside from
 * passing the hashcode to hash_search_with_hash_value(), we can extract
 * the lock partition number from the hashcode.
471
 */
472 473
uint32
LockTagHashCode(const LOCKTAG *locktag)
474
{
475 476
	return get_hash_value(LockMethodLockHash, (const void *) locktag);
}
477

478 479 480 481 482 483 484 485 486 487 488 489 490 491 492
/*
 * Compute the hash code associated with a PROCLOCKTAG.
 *
 * Because we want to use just one set of partition locks for both the
 * LOCK and PROCLOCK hash tables, we have to make sure that PROCLOCKs
 * fall into the same partition number as their associated LOCKs.
 * dynahash.c expects the partition number to be the low-order bits of
 * the hash code, and therefore a PROCLOCKTAG's hash code must have the
 * same low-order bits as the associated LOCKTAG's hash code.  We achieve
 * this with this specialized hash function.
 */
static uint32
proclock_hash(const void *key, Size keysize)
{
	const PROCLOCKTAG *proclocktag = (const PROCLOCKTAG *) key;
B
Bruce Momjian 已提交
493 494
	uint32		lockhash;
	Datum		procptr;
495 496 497 498 499 500 501 502 503

	Assert(keysize == sizeof(PROCLOCKTAG));

	/* Look into the associated LOCK object, and compute its hash code */
	lockhash = LockTagHashCode(&proclocktag->myLock->tag);

	/*
	 * To make the hash code also depend on the PGPROC, we xor the proc
	 * struct's address into the hash code, left-shifted so that the
B
Bruce Momjian 已提交
504 505 506
	 * partition-number bits don't change.  Since this is only a hash, we
	 * don't care if we lose high-order bits of the address; use an
	 * intermediate variable to suppress cast-pointer-to-int warnings.
507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522
	 */
	procptr = PointerGetDatum(proclocktag->myProc);
	lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS;

	return lockhash;
}

/*
 * Compute the hash code associated with a PROCLOCKTAG, given the hashcode
 * for its underlying LOCK.
 *
 * We use this just to avoid redundant calls of LockTagHashCode().
 */
static inline uint32
ProcLockHashCode(const PROCLOCKTAG *proclocktag, uint32 hashcode)
{
B
Bruce Momjian 已提交
523 524
	uint32		lockhash = hashcode;
	Datum		procptr;
525 526 527 528 529 530 531 532

	/*
	 * This must match proclock_hash()!
	 */
	procptr = PointerGetDatum(proclocktag->myProc);
	lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS;

	return lockhash;
533 534 535
}


536 537
/*
 * LockAcquire -- Check for lock conflicts, sleep if conflict found,
538
 *		set lock if/when no conflicts.
539
 *
540 541 542 543 544 545
 * Inputs:
 *	locktag: unique identifier for the lockable object
 *	lockmode: lock mode to acquire
 *	sessionLock: if true, acquire lock for session not current transaction
 *	dontWait: if true, don't wait to acquire lock
 *
546 547 548 549 550 551 552 553
 * Returns one of:
 *		LOCKACQUIRE_NOT_AVAIL		lock not available, and dontWait=true
 *		LOCKACQUIRE_OK				lock successfully acquired
 *		LOCKACQUIRE_ALREADY_HELD	incremented count for lock already held
 *
 * In the normal case where dontWait=false and the caller doesn't need to
 * distinguish a freshly acquired lock from one already taken earlier in
 * this same transaction, there is no need to examine the return value.
554
 *
555 556 557 558
 * Side Effects: The lock is acquired and recorded in lock tables.
 *
 * NOTE: if we wait for the lock, there is no way to abort the wait
 * short of aborting the transaction.
559
 */
560
LockAcquireResult
561
LockAcquire(const LOCKTAG *locktag,
562 563 564
			LOCKMODE lockmode,
			bool sessionLock,
			bool dontWait)
565 566 567 568 569 570 571 572 573 574 575 576 577 578 579
{
	return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, true);
}

/*
 * LockAcquireExtended - allows us to specify additional options
 *
 * reportMemoryError specifies whether a lock request that fills the
 * lock table should generate an ERROR or not. This allows a priority
 * caller to note that the lock table is full and then begin taking
 * extreme action to reduce the number of other lock holders before
 * retrying the action.
 */
LockAcquireResult
LockAcquireExtended(const LOCKTAG *locktag,
B
Bruce Momjian 已提交
580 581 582 583
					LOCKMODE lockmode,
					bool sessionLock,
					bool dontWait,
					bool reportMemoryError)
584
{
585 586
	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
	LockMethod	lockMethodTable;
587 588 589
	LOCALLOCKTAG localtag;
	LOCALLOCK  *locallock;
	LOCK	   *lock;
590
	PROCLOCK   *proclock;
591
	bool		found;
592
	ResourceOwner owner;
593
	uint32		hashcode;
594
	LWLockId	partitionLock;
595
	int			status;
596
	bool		log_lock = false;
597

598 599 600 601 602 603
	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
	lockMethodTable = LockMethods[lockmethodid];
	if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
		elog(ERROR, "unrecognized lock mode: %d", lockmode);

604 605
	if (RecoveryInProgress() && !InRecovery &&
		(locktag->locktag_type == LOCKTAG_OBJECT ||
B
Bruce Momjian 已提交
606
		 locktag->locktag_type == LOCKTAG_RELATION) &&
607 608 609
		lockmode > RowExclusiveLock)
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
P
Peter Eisentraut 已提交
610
				 errmsg("cannot acquire lock mode %s on database objects while recovery is in progress",
B
Bruce Momjian 已提交
611
						lockMethodTable->lockModeNames[lockmode]),
612 613
				 errhint("Only RowExclusiveLock or less can be acquired on database objects during recovery.")));

614
#ifdef LOCK_DEBUG
615 616
	if (LOCK_DEBUG_ENABLED(locktag))
		elog(LOG, "LockAcquire: lock [%u,%u] %s",
617
			 locktag->locktag_field1, locktag->locktag_field2,
618
			 lockMethodTable->lockModeNames[lockmode]);
619 620
#endif

621 622
	/* Session locks are never transactional, else check table */
	if (!sessionLock && lockMethodTable->transactional)
623 624 625 626 627 628 629
		owner = CurrentResourceOwner;
	else
		owner = NULL;

	/*
	 * Find or create a LOCALLOCK entry for this lock and lockmode
	 */
B
Bruce Momjian 已提交
630
	MemSet(&localtag, 0, sizeof(localtag));		/* must clear padding */
631 632 633
	localtag.lock = *locktag;
	localtag.mode = lockmode;

634
	locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
635 636 637 638 639 640 641 642 643 644
										  (void *) &localtag,
										  HASH_ENTER, &found);

	/*
	 * if it's a new locallock object, initialize it
	 */
	if (!found)
	{
		locallock->lock = NULL;
		locallock->proclock = NULL;
645
		locallock->hashcode = LockTagHashCode(&(localtag.lock));
646 647 648
		locallock->nLocks = 0;
		locallock->numLockOwners = 0;
		locallock->maxLockOwners = 8;
649
		locallock->holdsStrongLockCount = FALSE;
650 651 652
		locallock->lockOwners = NULL;
		locallock->lockOwners = (LOCALLOCKOWNER *)
			MemoryContextAlloc(TopMemoryContext,
B
Bruce Momjian 已提交
653
						  locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
654 655 656 657 658 659
	}
	else
	{
		/* Make sure there will be room to remember the lock */
		if (locallock->numLockOwners >= locallock->maxLockOwners)
		{
B
Bruce Momjian 已提交
660
			int			newsize = locallock->maxLockOwners * 2;
661 662 663 664 665 666 667

			locallock->lockOwners = (LOCALLOCKOWNER *)
				repalloc(locallock->lockOwners,
						 newsize * sizeof(LOCALLOCKOWNER));
			locallock->maxLockOwners = newsize;
		}
	}
668
	hashcode = locallock->hashcode;
669 670

	/*
B
Bruce Momjian 已提交
671
	 * If we already hold the lock, we can just increase the count locally.
672 673 674 675
	 */
	if (locallock->nLocks > 0)
	{
		GrantLockLocal(locallock, owner);
676
		return LOCKACQUIRE_ALREADY_HELD;
677 678
	}

679 680 681 682 683
	/*
	 * Emit a WAL record if acquisition of this lock needs to be replayed in a
	 * standby server. Only AccessExclusiveLocks can conflict with lock types
	 * that read-only transactions can acquire in a standby server.
	 *
684 685
	 * Make sure this definition matches the one in
	 * GetRunningTransactionLocks().
686 687 688 689 690 691 692 693 694 695 696 697
	 *
	 * First we prepare to log, then after lock acquired we issue log record.
	 */
	if (lockmode >= AccessExclusiveLock &&
		locktag->locktag_type == LOCKTAG_RELATION &&
		!RecoveryInProgress() &&
		XLogStandbyInfoActive())
	{
		LogAccessExclusiveLockPrepare();
		log_lock = true;
	}

698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726
	/* Locks that participate in the fast path require special handling. */
	if (FastPathTag(locktag) && FastPathRelevantMode(lockmode))
	{
		uint32	fasthashcode;

		fasthashcode = FastPathStrongLockHashPartition(hashcode);

		/*
		 * If we remember having filled up the fast path array, we don't
		 * attempt to make any further use of it until we release some locks.
		 * It's possible that some other backend has transferred some of those
		 * locks to the shared hash table, leaving space free, but it's not
		 * worth acquiring the LWLock just to check.  It's also possible that
		 * we're acquiring a second or third lock type on a relation we have
		 * already locked using the fast-path, but for now we don't worry about
		 * that case either.
		 */
		if (FastPathWeakMode(lockmode)
			&& FastPathLocalUseCount < FP_LOCK_SLOTS_PER_BACKEND)
		{
			bool	acquired;

			/*
			 * LWLockAcquire acts as a memory sequencing point, so it's safe
			 * to assume that any strong locker whose increment to
			 * FastPathStrongLocks->counts becomes visible after we test it has
			 * yet to begin to transfer fast-path locks.
			 */
			LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE);
727
			if (FastPathStrongRelationLocks->count[fasthashcode] != 0)
728 729
				acquired = false;
			else
730 731
				acquired = FastPathGrantRelationLock(locktag->locktag_field2,
													 lockmode);
732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749
			LWLockRelease(MyProc->backendLock);
			if (acquired)
			{
				GrantLockLocal(locallock, owner);
				return LOCKACQUIRE_OK;
			}
		}
		else if (FastPathStrongMode(lockmode))
		{
			/*
			 * Adding to a memory location is not atomic, so we take a
			 * spinlock to ensure we don't collide with someone else trying
			 * to bump the count at the same time.
			 *
			 * XXX: It might be worth considering using an atomic fetch-and-add
			 * instruction here, on architectures where that is supported.
			 */
			Assert(locallock->holdsStrongLockCount == FALSE);
750 751
			SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
			FastPathStrongRelationLocks->count[fasthashcode]++;
752
			locallock->holdsStrongLockCount = TRUE;
753 754 755
			SpinLockRelease(&FastPathStrongRelationLocks->mutex);
			if (!FastPathTransferRelationLocks(lockMethodTable, locktag,
											   hashcode))
756 757 758 759 760 761 762 763 764 765 766 767
			{
				if (reportMemoryError)
					ereport(ERROR,
							(errcode(ERRCODE_OUT_OF_MEMORY),
							 errmsg("out of shared memory"),
							 errhint("You might need to increase max_locks_per_transaction.")));
				else
					return LOCKACQUIRE_NOT_AVAIL;
			}
		}
	}

768 769 770
	/*
	 * Otherwise we've got to mess with the shared lock table.
	 */
771
	partitionLock = LockHashPartitionLock(hashcode);
772

773
	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
774

775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930
	/*
	 * Find or create a proclock entry with this tag
	 */
	proclock = SetupLockInTable(lockMethodTable, MyProc, locktag,
								hashcode, lockmode);
	if (!proclock)
	{
		LWLockRelease(partitionLock);
		if (reportMemoryError)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of shared memory"),
					 errhint("You might need to increase max_locks_per_transaction.")));
		else
			return LOCKACQUIRE_NOT_AVAIL;
	}
	locallock->proclock = proclock;
	lock = proclock->tag.myLock;
	locallock->lock = lock;

	/*
	 * If lock requested conflicts with locks requested by waiters, must join
	 * wait queue.	Otherwise, check for conflict with already-held locks.
	 * (That's last because most complex check.)
	 */
	if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
		status = STATUS_FOUND;
	else
		status = LockCheckConflicts(lockMethodTable, lockmode,
									lock, proclock, MyProc);

	if (status == STATUS_OK)
	{
		/* No conflict with held or previously requested locks */
		GrantLock(lock, proclock, lockmode);
		GrantLockLocal(locallock, owner);
	}
	else
	{
		Assert(status == STATUS_FOUND);

		/*
		 * We can't acquire the lock immediately.  If caller specified no
		 * blocking, remove useless table entries and return NOT_AVAIL without
		 * waiting.
		 */
		if (dontWait)
		{
			if (proclock->holdMask == 0)
			{
				uint32		proclock_hashcode;

				proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
				SHMQueueDelete(&proclock->lockLink);
				SHMQueueDelete(&proclock->procLink);
				if (!hash_search_with_hash_value(LockMethodProcLockHash,
												 (void *) &(proclock->tag),
												 proclock_hashcode,
												 HASH_REMOVE,
												 NULL))
					elog(PANIC, "proclock table corrupted");
			}
			else
				PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
			lock->nRequested--;
			lock->requested[lockmode]--;
			LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
			Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
			Assert(lock->nGranted <= lock->nRequested);
			LWLockRelease(partitionLock);
			if (locallock->nLocks == 0)
				RemoveLocalLock(locallock);
			return LOCKACQUIRE_NOT_AVAIL;
		}

		/*
		 * Set bitmask of locks this process already holds on this object.
		 */
		MyProc->heldLocks = proclock->holdMask;

		/*
		 * Sleep till someone wakes me up.
		 */

		TRACE_POSTGRESQL_LOCK_WAIT_START(locktag->locktag_field1,
										 locktag->locktag_field2,
										 locktag->locktag_field3,
										 locktag->locktag_field4,
										 locktag->locktag_type,
										 lockmode);

		WaitOnLock(locallock, owner);

		TRACE_POSTGRESQL_LOCK_WAIT_DONE(locktag->locktag_field1,
										locktag->locktag_field2,
										locktag->locktag_field3,
										locktag->locktag_field4,
										locktag->locktag_type,
										lockmode);

		/*
		 * NOTE: do not do any material change of state between here and
		 * return.	All required changes in locktable state must have been
		 * done when the lock was granted to us --- see notes in WaitOnLock.
		 */

		/*
		 * Check the proclock entry status, in case something in the ipc
		 * communication doesn't work correctly.
		 */
		if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
		{
			PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
			LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
			/* Should we retry ? */
			LWLockRelease(partitionLock);
			elog(ERROR, "LockAcquire failed");
		}
		PROCLOCK_PRINT("LockAcquire: granted", proclock);
		LOCK_PRINT("LockAcquire: granted", lock, lockmode);
	}

	LWLockRelease(partitionLock);

	/*
	 * Emit a WAL record if acquisition of this lock need to be replayed in a
	 * standby server.
	 */
	if (log_lock)
	{
		/*
		 * Decode the locktag back to the original values, to avoid sending
		 * lots of empty bytes with every message.	See lock.h to check how a
		 * locktag is defined for LOCKTAG_RELATION
		 */
		LogAccessExclusiveLock(locktag->locktag_field1,
							   locktag->locktag_field2);
	}

	return LOCKACQUIRE_OK;
}

/*
 * Find or create LOCK and PROCLOCK objects as needed for a new lock
 * request.
 */
static PROCLOCK *
SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
				 const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode)
{
	LOCK	   *lock;
	PROCLOCK   *proclock;
	PROCLOCKTAG proclocktag;
	uint32		proclock_hashcode;
	bool		found;

M
 
Marc G. Fournier 已提交
931
	/*
932 933
	 * Find or create a lock with this tag.
	 *
934 935
	 * Note: if the locallock object already existed, it might have a pointer
	 * to the lock already ... but we probably should not assume that that
936 937
	 * pointer is valid, since a lock object with no locks can go away
	 * anytime.
M
 
Marc G. Fournier 已提交
938
	 */
939
	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
940
												(const void *) locktag,
941 942 943
												hashcode,
												HASH_ENTER_NULL,
												&found);
944
	if (!lock)
945
		return NULL;
946

B
Bruce Momjian 已提交
947
	/*
948
	 * if it's a new lock object, initialize it
949 950
	 */
	if (!found)
951
	{
952 953
		lock->grantMask = 0;
		lock->waitMask = 0;
954
		SHMQueueInit(&(lock->procLocks));
955
		ProcQueueInit(&(lock->waitProcs));
956 957
		lock->nRequested = 0;
		lock->nGranted = 0;
958 959
		MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
		MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
M
 
Marc G. Fournier 已提交
960
		LOCK_PRINT("LockAcquire: new", lock, lockmode);
961 962 963
	}
	else
	{
M
 
Marc G. Fournier 已提交
964
		LOCK_PRINT("LockAcquire: found", lock, lockmode);
965 966 967
		Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
		Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
		Assert(lock->nGranted <= lock->nRequested);
968
	}
969

B
Bruce Momjian 已提交
970
	/*
971
	 * Create the hash key for the proclock table.
972
	 */
973
	proclocktag.myLock = lock;
974
	proclocktag.myProc = proc;
975 976

	proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
977

M
 
Marc G. Fournier 已提交
978
	/*
979
	 * Find or create a proclock entry with this tag
M
 
Marc G. Fournier 已提交
980
	 */
981 982 983 984 985
	proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
														(void *) &proclocktag,
														proclock_hashcode,
														HASH_ENTER_NULL,
														&found);
986
	if (!proclock)
987
	{
988 989 990 991 992 993 994 995 996 997
		/* Ooops, not enough shmem for the proclock */
		if (lock->nRequested == 0)
		{
			/*
			 * There are no other requestors of this lock, so garbage-collect
			 * the lock object.  We *must* do this to avoid a permanent leak
			 * of shared memory, because there won't be anything to cause
			 * anyone to release the lock object later.
			 */
			Assert(SHMQueueEmpty(&(lock->procLocks)));
998 999 1000 1001 1002
			if (!hash_search_with_hash_value(LockMethodLockHash,
											 (void *) &(lock->tag),
											 hashcode,
											 HASH_REMOVE,
											 NULL))
1003
				elog(PANIC, "lock table corrupted");
1004
		}
1005
		return NULL;
1006
	}
M
 
Marc G. Fournier 已提交
1007 1008

	/*
1009
	 * If new, initialize the new entry
M
 
Marc G. Fournier 已提交
1010
	 */
1011
	if (!found)
1012
	{
1013 1014
		uint32		partition = LockHashPartition(hashcode);

1015
		proclock->holdMask = 0;
1016
		proclock->releaseMask = 0;
1017
		/* Add proclock to appropriate lists */
1018
		SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
1019
		SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
1020
							 &proclock->procLink);
1021
		PROCLOCK_PRINT("LockAcquire: new", proclock);
1022 1023 1024
	}
	else
	{
1025
		PROCLOCK_PRINT("LockAcquire: found", proclock);
1026
		Assert((proclock->holdMask & ~lock->grantMask) == 0);
1027

1028
#ifdef CHECK_DEADLOCK_RISK
B
Bruce Momjian 已提交
1029

1030
		/*
B
Bruce Momjian 已提交
1031 1032 1033 1034 1035
		 * Issue warning if we already hold a lower-level lock on this object
		 * and do not hold a lock of the requested level or higher. This
		 * indicates a deadlock-prone coding practice (eg, we'd have a
		 * deadlock if another backend were following the same code path at
		 * about the same time).
1036
		 *
B
Bruce Momjian 已提交
1037 1038 1039
		 * This is not enabled by default, because it may generate log entries
		 * about user-level coding practices that are in fact safe in context.
		 * It can be enabled to help find system-level problems.
1040
		 *
B
Bruce Momjian 已提交
1041 1042
		 * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
		 * better to use a table.  For now, though, this works.
1043 1044
		 */
		{
B
Bruce Momjian 已提交
1045
			int			i;
1046 1047

			for (i = lockMethodTable->numLockModes; i > 0; i--)
1048
			{
1049 1050 1051
				if (proclock->holdMask & LOCKBIT_ON(i))
				{
					if (i >= (int) lockmode)
B
Bruce Momjian 已提交
1052
						break;	/* safe: we have a lock >= req level */
1053 1054
					elog(LOG, "deadlock risk: raising lock level"
						 " from %s to %s on object %u/%u/%u",
1055 1056
						 lockMethodTable->lockModeNames[i],
						 lockMethodTable->lockModeNames[lockmode],
1057 1058 1059 1060
						 lock->tag.locktag_field1, lock->tag.locktag_field2,
						 lock->tag.locktag_field3);
					break;
				}
1061 1062
			}
		}
1063
#endif   /* CHECK_DEADLOCK_RISK */
1064
	}
1065

B
Bruce Momjian 已提交
1066
	/*
1067
	 * lock->nRequested and lock->requested[] count the total number of
B
Bruce Momjian 已提交
1068 1069
	 * requests, whether granted or waiting, so increment those immediately.
	 * The other counts don't increment till we get the lock.
1070
	 */
1071 1072 1073
	lock->nRequested++;
	lock->requested[lockmode]++;
	Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
1074

B
Bruce Momjian 已提交
1075
	/*
B
Bruce Momjian 已提交
1076 1077
	 * We shouldn't already hold the desired lock; else locallock table is
	 * broken.
1078
	 */
1079 1080
	if (proclock->holdMask & LOCKBIT_ON(lockmode))
		elog(ERROR, "lock %s on object %u/%u/%u is already held",
1081
			 lockMethodTable->lockModeNames[lockmode],
1082 1083
			 lock->tag.locktag_field1, lock->tag.locktag_field2,
			 lock->tag.locktag_field3);
1084

1085
	return proclock;
1086 1087
}

1088 1089 1090 1091 1092 1093 1094 1095
/*
 * Subroutine to free a locallock entry
 */
static void
RemoveLocalLock(LOCALLOCK *locallock)
{
	pfree(locallock->lockOwners);
	locallock->lockOwners = NULL;
1096 1097 1098 1099 1100
	if (locallock->holdsStrongLockCount)
	{
		uint32	fasthashcode;
		fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode);

1101 1102 1103
		SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
		Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0);
		FastPathStrongRelationLocks->count[fasthashcode]--;
1104
		locallock->holdsStrongLockCount = FALSE;
1105
		SpinLockRelease(&FastPathStrongRelationLocks->mutex);
1106
	}
1107
	if (!hash_search(LockMethodLocalHash,
1108 1109
					 (void *) &(locallock->tag),
					 HASH_REMOVE, NULL))
1110 1111 1112
		elog(WARNING, "locallock table corrupted");
}

B
Bruce Momjian 已提交
1113
/*
1114 1115 1116 1117
 * LockCheckConflicts -- test whether requested lock conflicts
 *		with those already granted
 *
 * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
1118 1119
 *
 * NOTES:
1120
 *		Here's what makes this complicated: one process's locks don't
1121 1122
 * conflict with one another, no matter what purpose they are held for
 * (eg, session and transaction locks do not conflict).
1123 1124
 * So, we must subtract off our own locks when determining whether the
 * requested new lock conflicts with those already held.
1125 1126
 */
int
1127
LockCheckConflicts(LockMethod lockMethodTable,
1128 1129
				   LOCKMODE lockmode,
				   LOCK *lock,
1130
				   PROCLOCK *proclock,
1131
				   PGPROC *proc)
1132
{
B
Bruce Momjian 已提交
1133
	int			numLockModes = lockMethodTable->numLockModes;
1134 1135
	LOCKMASK	myLocks;
	LOCKMASK	otherLocks;
1136
	int			i;
1137

B
Bruce Momjian 已提交
1138
	/*
B
Bruce Momjian 已提交
1139 1140
	 * first check for global conflicts: If no locks conflict with my request,
	 * then I get the lock.
1141
	 *
1142 1143 1144 1145
	 * Checking for conflict: lock->grantMask represents the types of
	 * currently held locks.  conflictTable[lockmode] has a bit set for each
	 * type of lock that conflicts with request.   Bitwise compare tells if
	 * there is a conflict.
1146
	 */
B
Bruce Momjian 已提交
1147
	if (!(lockMethodTable->conflictTab[lockmode] & lock->grantMask))
1148
	{
1149
		PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock);
1150
		return STATUS_OK;
1151
	}
1152

B
Bruce Momjian 已提交
1153
	/*
B
Bruce Momjian 已提交
1154 1155 1156
	 * Rats.  Something conflicts.	But it could still be my own lock. We have
	 * to construct a conflict mask that does not reflect our own locks, but
	 * only lock types held by other processes.
1157
	 */
1158 1159
	myLocks = proclock->holdMask;
	otherLocks = 0;
1160
	for (i = 1; i <= numLockModes; i++)
1161
	{
B
Bruce Momjian 已提交
1162
		int			myHolding = (myLocks & LOCKBIT_ON(i)) ? 1 : 0;
1163 1164 1165

		if (lock->granted[i] > myHolding)
			otherLocks |= LOCKBIT_ON(i);
1166
	}
1167

B
Bruce Momjian 已提交
1168
	/*
1169
	 * now check again for conflicts.  'otherLocks' describes the types of
B
Bruce Momjian 已提交
1170 1171
	 * locks held by other processes.  If one of these conflicts with the kind
	 * of lock that I want, there is a conflict and I have to sleep.
1172
	 */
1173
	if (!(lockMethodTable->conflictTab[lockmode] & otherLocks))
1174
	{
1175
		/* no conflict. OK to get the lock */
1176
		PROCLOCK_PRINT("LockCheckConflicts: resolved", proclock);
1177
		return STATUS_OK;
1178
	}
1179

1180
	PROCLOCK_PRINT("LockCheckConflicts: conflicting", proclock);
1181
	return STATUS_FOUND;
1182 1183
}

1184
/*
1185
 * GrantLock -- update the lock and proclock data structures to show
1186
 *		the lock request has been granted.
1187 1188
 *
 * NOTE: if proc was blocked, it also needs to be removed from the wait list
1189
 * and have its waitLock/waitProcLock fields cleared.  That's not done here.
1190
 *
1191 1192 1193
 * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK
 * table entry; but since we may be awaking some other process, we can't do
 * that here; it's done by GrantLockLocal, instead.
M
 
Marc G. Fournier 已提交
1194 1195
 */
void
1196
GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
M
 
Marc G. Fournier 已提交
1197
{
1198 1199
	lock->nGranted++;
	lock->granted[lockmode]++;
1200
	lock->grantMask |= LOCKBIT_ON(lockmode);
1201
	if (lock->granted[lockmode] == lock->requested[lockmode])
1202
		lock->waitMask &= LOCKBIT_OFF(lockmode);
1203
	proclock->holdMask |= LOCKBIT_ON(lockmode);
M
 
Marc G. Fournier 已提交
1204
	LOCK_PRINT("GrantLock", lock, lockmode);
1205 1206
	Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
	Assert(lock->nGranted <= lock->nRequested);
1207 1208
}

1209
/*
B
Bruce Momjian 已提交
1210
 * UnGrantLock -- opposite of GrantLock.
1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221
 *
 * Updates the lock and proclock data structures to show that the lock
 * is no longer held nor requested by the current holder.
 *
 * Returns true if there were any waiters waiting on the lock that
 * should now be woken up with ProcLockWakeup.
 */
static bool
UnGrantLock(LOCK *lock, LOCKMODE lockmode,
			PROCLOCK *proclock, LockMethod lockMethodTable)
{
B
Bruce Momjian 已提交
1222
	bool		wakeupNeeded = false;
1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244

	Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
	Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
	Assert(lock->nGranted <= lock->nRequested);

	/*
	 * fix the general lock stats
	 */
	lock->nRequested--;
	lock->requested[lockmode]--;
	lock->nGranted--;
	lock->granted[lockmode]--;

	if (lock->granted[lockmode] == 0)
	{
		/* change the conflict mask.  No more of this lock type. */
		lock->grantMask &= LOCKBIT_OFF(lockmode);
	}

	LOCK_PRINT("UnGrantLock: updated", lock, lockmode);

	/*
B
Bruce Momjian 已提交
1245 1246 1247 1248 1249 1250 1251
	 * We need only run ProcLockWakeup if the released lock conflicts with at
	 * least one of the lock types requested by waiter(s).	Otherwise whatever
	 * conflict made them wait must still exist.  NOTE: before MVCC, we could
	 * skip wakeup if lock->granted[lockmode] was still positive. But that's
	 * not true anymore, because the remaining granted locks might belong to
	 * some waiter, who could now be awakened because he doesn't conflict with
	 * his own locks.
1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264
	 */
	if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
		wakeupNeeded = true;

	/*
	 * Now fix the per-proclock state.
	 */
	proclock->holdMask &= LOCKBIT_OFF(lockmode);
	PROCLOCK_PRINT("UnGrantLock: updated", proclock);

	return wakeupNeeded;
}

1265
/*
B
Bruce Momjian 已提交
1266
 * CleanUpLock -- clean up after releasing a lock.	We garbage-collect the
1267 1268 1269 1270 1271
 * proclock and lock objects if possible, and call ProcLockWakeup if there
 * are remaining requests and the caller says it's OK.  (Normally, this
 * should be called after UnGrantLock, and wakeupNeeded is the result from
 * UnGrantLock.)
 *
1272
 * The appropriate partition lock must be held at entry, and will be
1273 1274 1275
 * held at exit.
 */
static void
1276
CleanUpLock(LOCK *lock, PROCLOCK *proclock,
1277
			LockMethod lockMethodTable, uint32 hashcode,
1278 1279 1280
			bool wakeupNeeded)
{
	/*
B
Bruce Momjian 已提交
1281 1282
	 * If this was my last hold on this lock, delete my entry in the proclock
	 * table.
1283 1284 1285
	 */
	if (proclock->holdMask == 0)
	{
1286 1287
		uint32		proclock_hashcode;

1288 1289 1290
		PROCLOCK_PRINT("CleanUpLock: deleting", proclock);
		SHMQueueDelete(&proclock->lockLink);
		SHMQueueDelete(&proclock->procLink);
1291 1292 1293 1294 1295 1296
		proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
		if (!hash_search_with_hash_value(LockMethodProcLockHash,
										 (void *) &(proclock->tag),
										 proclock_hashcode,
										 HASH_REMOVE,
										 NULL))
1297 1298 1299 1300 1301 1302
			elog(PANIC, "proclock table corrupted");
	}

	if (lock->nRequested == 0)
	{
		/*
B
Bruce Momjian 已提交
1303 1304
		 * The caller just released the last lock, so garbage-collect the lock
		 * object.
1305 1306 1307
		 */
		LOCK_PRINT("CleanUpLock: deleting", lock, 0);
		Assert(SHMQueueEmpty(&(lock->procLocks)));
1308 1309 1310 1311 1312
		if (!hash_search_with_hash_value(LockMethodLockHash,
										 (void *) &(lock->tag),
										 hashcode,
										 HASH_REMOVE,
										 NULL))
1313 1314 1315 1316 1317
			elog(PANIC, "lock table corrupted");
	}
	else if (wakeupNeeded)
	{
		/* There are waiters on this lock, so wake them up. */
1318
		ProcLockWakeup(lockMethodTable, lock);
1319 1320 1321
	}
}

1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332
/*
 * GrantLockLocal -- update the locallock data structures to show
 *		the lock request has been granted.
 *
 * We expect that LockAcquire made sure there is room to add a new
 * ResourceOwner entry.
 */
static void
GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
{
	LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
B
Bruce Momjian 已提交
1333
	int			i;
1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365

	Assert(locallock->numLockOwners < locallock->maxLockOwners);
	/* Count the total */
	locallock->nLocks++;
	/* Count the per-owner lock */
	for (i = 0; i < locallock->numLockOwners; i++)
	{
		if (lockOwners[i].owner == owner)
		{
			lockOwners[i].nLocks++;
			return;
		}
	}
	lockOwners[i].owner = owner;
	lockOwners[i].nLocks = 1;
	locallock->numLockOwners++;
}

/*
 * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing
 *		WaitOnLock on.
 *
 * proc.c needs this for the case where we are booted off the lock by
 * timeout, but discover that someone granted us the lock anyway.
 *
 * We could just export GrantLockLocal, but that would require including
 * resowner.h in lock.h, which creates circularity.
 */
void
GrantAwaitedLock(void)
{
	GrantLockLocal(awaitedLock, awaitedOwner);
M
 
Marc G. Fournier 已提交
1366 1367
}

1368 1369 1370
/*
 * WaitOnLock -- wait to acquire a lock
 *
1371
 * Caller must have set MyProc->heldLocks to reflect locks already held
1372
 * on the lockable object by this process.
1373
 *
1374
 * The appropriate partition lock must be held at entry.
1375
 */
1376
static void
1377
WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
1378
{
1379
	LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock);
1380
	LockMethod	lockMethodTable = LockMethods[lockmethodid];
1381
	char	   *volatile new_status = NULL;
1382

1383 1384
	LOCK_PRINT("WaitOnLock: sleeping on lock",
			   locallock->lock, locallock->tag.mode);
1385

1386
	/* Report change to waiting status */
1387 1388
	if (update_process_title)
	{
1389 1390 1391
		const char *old_status;
		int			len;

1392 1393 1394 1395 1396
		old_status = get_ps_display(&len);
		new_status = (char *) palloc(len + 8 + 1);
		memcpy(new_status, old_status, len);
		strcpy(new_status + len, " waiting");
		set_ps_display(new_status, false);
B
Bruce Momjian 已提交
1397
		new_status[len] = '\0'; /* truncate off " waiting" */
1398
	}
1399 1400
	pgstat_report_waiting(true);

1401 1402 1403
	awaitedLock = locallock;
	awaitedOwner = owner;

B
Bruce Momjian 已提交
1404
	/*
1405
	 * NOTE: Think not to put any shared-state cleanup after the call to
B
Bruce Momjian 已提交
1406 1407 1408 1409 1410 1411
	 * ProcSleep, in either the normal or failure path.  The lock state must
	 * be fully set by the lock grantor, or by CheckDeadLock if we give up
	 * waiting for the lock.  This is necessary because of the possibility
	 * that a cancel/die interrupt will interrupt ProcSleep after someone else
	 * grants us the lock, but before we've noticed it. Hence, after granting,
	 * the locktable state must fully reflect the fact that we own the lock;
1412 1413
	 * we can't do additional work on return.
	 *
1414 1415 1416 1417 1418 1419
	 * We can and do use a PG_TRY block to try to clean up after failure, but
	 * this still has a major limitation: elog(FATAL) can occur while waiting
	 * (eg, a "die" interrupt), and then control won't come back here. So all
	 * cleanup of essential state should happen in LockWaitCancel, not here.
	 * We can use PG_TRY to clear the "waiting" status flags, since doing that
	 * is unimportant if the process exits.
1420
	 */
1421 1422 1423 1424 1425
	PG_TRY();
	{
		if (ProcSleep(locallock, lockMethodTable) != STATUS_OK)
		{
			/*
1426 1427
			 * We failed as a result of a deadlock, see CheckDeadLock(). Quit
			 * now.
1428 1429 1430 1431 1432
			 */
			awaitedLock = NULL;
			LOCK_PRINT("WaitOnLock: aborting on lock",
					   locallock->lock, locallock->tag.mode);
			LWLockRelease(LockHashPartitionLock(locallock->hashcode));
1433

1434 1435 1436 1437 1438 1439 1440 1441 1442
			/*
			 * Now that we aren't holding the partition lock, we can give an
			 * error report including details about the detected deadlock.
			 */
			DeadLockReport();
			/* not reached */
		}
	}
	PG_CATCH();
1443
	{
1444
		/* In this path, awaitedLock remains set until LockWaitCancel */
B
Bruce Momjian 已提交
1445

1446 1447 1448 1449 1450 1451 1452 1453 1454 1455
		/* Report change to non-waiting status */
		pgstat_report_waiting(false);
		if (update_process_title)
		{
			set_ps_display(new_status, false);
			pfree(new_status);
		}

		/* and propagate the error */
		PG_RE_THROW();
1456
	}
1457
	PG_END_TRY();
1458

1459 1460
	awaitedLock = NULL;

1461
	/* Report change to non-waiting status */
1462
	pgstat_report_waiting(false);
1463 1464 1465 1466 1467
	if (update_process_title)
	{
		set_ps_display(new_status, false);
		pfree(new_status);
	}
1468

1469 1470
	LOCK_PRINT("WaitOnLock: wakeup on lock",
			   locallock->lock, locallock->tag.mode);
1471 1472
}

1473
/*
1474 1475 1476
 * Remove a proc from the wait-queue it is on (caller must know it is on one).
 * This is only used when the proc has failed to get the lock, so we set its
 * waitStatus to STATUS_ERROR.
1477
 *
1478 1479
 * Appropriate partition lock must be held by caller.  Also, caller is
 * responsible for signaling the proc if needed.
1480
 *
1481
 * NB: this does not clean up any locallock object that may exist for the lock.
1482 1483
 */
void
1484
RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode)
1485
{
B
Bruce Momjian 已提交
1486
	LOCK	   *waitLock = proc->waitLock;
1487
	PROCLOCK   *proclock = proc->waitProcLock;
B
Bruce Momjian 已提交
1488
	LOCKMODE	lockmode = proc->waitLockMode;
1489
	LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*waitLock);
1490 1491

	/* Make sure proc is waiting */
1492
	Assert(proc->waitStatus == STATUS_WAITING);
1493
	Assert(proc->links.next != NULL);
1494 1495
	Assert(waitLock);
	Assert(waitLock->waitProcs.size > 0);
1496
	Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509

	/* Remove proc from lock's wait queue */
	SHMQueueDelete(&(proc->links));
	waitLock->waitProcs.size--;

	/* Undo increments of request counts by waiting process */
	Assert(waitLock->nRequested > 0);
	Assert(waitLock->nRequested > proc->waitLock->nGranted);
	waitLock->nRequested--;
	Assert(waitLock->requested[lockmode] > 0);
	waitLock->requested[lockmode]--;
	/* don't forget to clear waitMask bit if appropriate */
	if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
1510
		waitLock->waitMask &= LOCKBIT_OFF(lockmode);
1511

1512
	/* Clean up the proc's own state, and pass it the ok/fail signal */
1513
	proc->waitLock = NULL;
1514
	proc->waitProcLock = NULL;
1515
	proc->waitStatus = STATUS_ERROR;
1516

1517 1518
	/*
	 * Delete the proclock immediately if it represents no already-held locks.
1519 1520
	 * (This must happen now because if the owner of the lock decides to
	 * release it, and the requested/granted counts then go to zero,
B
Bruce Momjian 已提交
1521 1522
	 * LockRelease expects there to be no remaining proclocks.) Then see if
	 * any other waiters for the lock can be woken up now.
1523
	 */
1524
	CleanUpLock(waitLock, proclock,
1525
				LockMethods[lockmethodid], hashcode,
1526
				true);
1527 1528
}

1529
/*
1530 1531 1532
 * LockRelease -- look up 'locktag' and release one 'lockmode' lock on it.
 *		Release a session lock if 'sessionLock' is true, else release a
 *		regular transaction lock.
1533
 *
1534 1535 1536 1537
 * Side Effects: find any waiting processes that are now wakable,
 *		grant them their requested locks and awaken them.
 *		(We have to grant the lock here to avoid a race between
 *		the waking process and any new process to
1538
 *		come along and request the lock.)
1539 1540
 */
bool
1541
LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
1542
{
1543 1544
	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
	LockMethod	lockMethodTable;
1545 1546
	LOCALLOCKTAG localtag;
	LOCALLOCK  *locallock;
1547
	LOCK	   *lock;
1548
	PROCLOCK   *proclock;
1549
	LWLockId	partitionLock;
1550
	bool		wakeupNeeded;
1551

1552 1553 1554 1555 1556 1557
	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
	lockMethodTable = LockMethods[lockmethodid];
	if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
		elog(ERROR, "unrecognized lock mode: %d", lockmode);

1558
#ifdef LOCK_DEBUG
1559 1560
	if (LOCK_DEBUG_ENABLED(locktag))
		elog(LOG, "LockRelease: lock [%u,%u] %s",
1561
			 locktag->locktag_field1, locktag->locktag_field2,
1562
			 lockMethodTable->lockModeNames[lockmode]);
1563 1564
#endif

1565
	/*
1566
	 * Find the LOCALLOCK entry for this lock and lockmode
1567
	 */
B
Bruce Momjian 已提交
1568
	MemSet(&localtag, 0, sizeof(localtag));		/* must clear padding */
1569 1570 1571
	localtag.lock = *locktag;
	localtag.mode = lockmode;

1572
	locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
1573 1574
										  (void *) &localtag,
										  HASH_FIND, NULL);
1575

1576
	/*
B
Bruce Momjian 已提交
1577
	 * let the caller print its own error message, too. Do not ereport(ERROR).
1578
	 */
1579
	if (!locallock || locallock->nLocks <= 0)
1580
	{
1581
		elog(WARNING, "you don't own a lock of type %s",
1582
			 lockMethodTable->lockModeNames[lockmode]);
1583
		return FALSE;
1584
	}
1585

M
 
Marc G. Fournier 已提交
1586
	/*
1587
	 * Decrease the count for the resource owner.
M
 
Marc G. Fournier 已提交
1588
	 */
1589
	{
1590 1591
		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
		ResourceOwner owner;
B
Bruce Momjian 已提交
1592
		int			i;
1593

1594 1595
		/* Session locks are never transactional, else check table */
		if (!sessionLock && lockMethodTable->transactional)
1596
			owner = CurrentResourceOwner;
1597
		else
1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618
			owner = NULL;

		for (i = locallock->numLockOwners - 1; i >= 0; i--)
		{
			if (lockOwners[i].owner == owner)
			{
				Assert(lockOwners[i].nLocks > 0);
				if (--lockOwners[i].nLocks == 0)
				{
					/* compact out unused slot */
					locallock->numLockOwners--;
					if (i < locallock->numLockOwners)
						lockOwners[i] = lockOwners[locallock->numLockOwners];
				}
				break;
			}
		}
		if (i < 0)
		{
			/* don't release a lock belonging to another owner */
			elog(WARNING, "you don't own a lock of type %s",
1619
				 lockMethodTable->lockModeNames[lockmode]);
1620 1621
			return FALSE;
		}
1622
	}
1623 1624

	/*
B
Bruce Momjian 已提交
1625 1626
	 * Decrease the total local count.	If we're still holding the lock, we're
	 * done.
1627 1628 1629 1630 1631 1632
	 */
	locallock->nLocks--;

	if (locallock->nLocks > 0)
		return TRUE;

1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643
	/* Locks that participate in the fast path require special handling. */
	if (FastPathTag(locktag) && FastPathWeakMode(lockmode)
		&& FastPathLocalUseCount > 0)
	{
		bool	released;

		/*
		 * We might not find the lock here, even if we originally entered
		 * it here.  Another backend may have moved it to the main table.
		 */
		LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE);
1644 1645
		released = FastPathUnGrantRelationLock(locktag->locktag_field2,
											   lockmode);
1646 1647 1648 1649 1650 1651 1652 1653
		LWLockRelease(MyProc->backendLock);
		if (released)
		{
			RemoveLocalLock(locallock);
			return TRUE;
		}
	}

1654 1655 1656
	/*
	 * Otherwise we've got to mess with the shared lock table.
	 */
1657
	partitionLock = LockHashPartitionLock(locallock->hashcode);
1658

1659
	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
1660 1661

	/*
1662 1663 1664 1665 1666
	 * Normally, we don't need to re-find the lock or proclock, since we kept
	 * their addresses in the locallock table, and they couldn't have been
	 * removed while we were holding a lock on them.  But it's possible that
	 * the locks have been moved to the main hash table by another backend, in
	 * which case we might need to go look them up after all.
1667 1668
	 */
	lock = locallock->lock;
1669 1670 1671 1672 1673 1674 1675
	if (!lock)
	{
		PROCLOCKTAG proclocktag;
		bool		found;

		Assert(FastPathTag(locktag) && FastPathWeakMode(lockmode));
		lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
1676
													(const void *) locktag,
1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689
													locallock->hashcode,
													HASH_FIND,
													&found);
		Assert(found && lock != NULL);
		locallock->lock = lock;

		proclocktag.myLock = lock;
		proclocktag.myProc = MyProc;
		locallock->proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash,
													   (void *) &proclocktag,
													   HASH_FIND, &found);
		Assert(found);
	}
1690 1691
	LOCK_PRINT("LockRelease: found", lock, lockmode);
	proclock = locallock->proclock;
1692
	PROCLOCK_PRINT("LockRelease: found", proclock);
M
 
Marc G. Fournier 已提交
1693 1694

	/*
B
Bruce Momjian 已提交
1695 1696
	 * Double-check that we are actually holding a lock of the type we want to
	 * release.
M
 
Marc G. Fournier 已提交
1697
	 */
1698
	if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
1699
	{
1700
		PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
1701
		LWLockRelease(partitionLock);
1702
		elog(WARNING, "you don't own a lock of type %s",
1703
			 lockMethodTable->lockModeNames[lockmode]);
1704
		RemoveLocalLock(locallock);
1705
		return FALSE;
M
 
Marc G. Fournier 已提交
1706 1707
	}

1708
	/*
1709
	 * Do the releasing.  CleanUpLock will waken any now-wakable waiters.
1710
	 */
1711
	wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
1712

1713
	CleanUpLock(lock, proclock,
1714
				lockMethodTable, locallock->hashcode,
1715
				wakeupNeeded);
1716

1717
	LWLockRelease(partitionLock);
1718 1719

	RemoveLocalLock(locallock);
1720
	return TRUE;
1721 1722
}

1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747
/*
 * LockReleaseSession -- Release all session locks of the specified lock method
 *		that are held by the current process.
 */
void
LockReleaseSession(LOCKMETHODID lockmethodid)
{
	HASH_SEQ_STATUS status;
	LOCALLOCK  *locallock;

	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);

	hash_seq_init(&status, LockMethodLocalHash);

	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
	{
		/* Ignore items that are not of the specified lock method */
		if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
			continue;

		ReleaseLockForOwner(locallock, NULL);
	}
}

1748
/*
1749
 * LockReleaseAll -- Release all locks of the specified lock method that
1750
 *		are held by the current process.
1751
 *
1752
 * Well, not necessarily *all* locks.  The available behaviors are:
1753 1754
 *		allLocks == true: release all locks including session locks.
 *		allLocks == false: release all non-session locks.
1755
 */
1756
void
1757
LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
1758
{
1759
	HASH_SEQ_STATUS status;
1760
	LockMethod	lockMethodTable;
1761
	int			i,
1762
				numLockModes;
B
Bruce Momjian 已提交
1763
	LOCALLOCK  *locallock;
1764
	LOCK	   *lock;
1765 1766
	PROCLOCK   *proclock;
	int			partition;
1767
	bool		have_fast_path_lwlock = false;
1768

1769 1770 1771 1772
	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
	lockMethodTable = LockMethods[lockmethodid];

1773
#ifdef LOCK_DEBUG
1774
	if (*(lockMethodTable->trace_flag))
1775
		elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid);
1776 1777
#endif

1778 1779 1780 1781 1782 1783 1784 1785 1786
	/*
	 * Get rid of our fast-path VXID lock, if appropriate.  Note that this
	 * is the only way that the lock we hold on our own VXID can ever get
	 * released: it is always and only released when a toplevel transaction
	 * ends.
	 */
	if (lockmethodid == DEFAULT_LOCKMETHOD)
		VirtualXactLockTableCleanup();

B
Bruce Momjian 已提交
1787
	numLockModes = lockMethodTable->numLockModes;
M
 
Marc G. Fournier 已提交
1788

1789 1790
	/*
	 * First we run through the locallock table and get rid of unwanted
B
Bruce Momjian 已提交
1791 1792 1793 1794
	 * entries, then we scan the process's proclocks and get rid of those. We
	 * do this separately because we may have multiple locallock entries
	 * pointing to the same proclock, and we daren't end up with any dangling
	 * pointers.
1795
	 */
1796
	hash_seq_init(&status, LockMethodLocalHash);
1797 1798 1799 1800 1801

	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
	{
		if (locallock->proclock == NULL || locallock->lock == NULL)
		{
1802 1803 1804
			LOCKMODE	lockmode = locallock->tag.mode;
			Oid			relid;

1805
			/*
1806 1807 1808
			 * If the LOCALLOCK entry is unused, we must've run out of shared
			 * memory while trying to set up this lock.  Just forget the local
			 * entry.
1809
			 */
1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836
			if (locallock->nLocks == 0)
			{
				RemoveLocalLock(locallock);
				continue;
			}

			/*
			 * Otherwise, we should be dealing with a lock acquired via the
			 * fast-path.  If not, we've got trouble.
			 */
			if (!FastPathTag(&locallock->tag.lock)
				|| !FastPathWeakMode(lockmode))
				elog(PANIC, "locallock table corrupted");

			/*
			 * If we don't currently hold the LWLock that protects our
			 * fast-path data structures, we must acquire it before
			 * attempting to release the lock via the fast-path.
			 */
			if (!have_fast_path_lwlock)
			{
				LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE);
				have_fast_path_lwlock = true;
			}

			/* Attempt fast-path release. */
			relid = locallock->tag.lock.locktag_field2;
1837
			if (FastPathUnGrantRelationLock(relid, lockmode))
1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859
			{
				RemoveLocalLock(locallock);
				continue;
			}

			/*
			 * Our lock, originally taken via the fast path, has been
			 * transferred to the main lock table.  That's going to require
			 * some extra work, so release our fast-path lock before starting.
			 */
			LWLockRelease(MyProc->backendLock);
			have_fast_path_lwlock = false;

			/*
			 * Now dump the lock.  We haven't got a pointer to the LOCK or
			 * PROCLOCK in this case, so we have to handle this a bit
			 * differently than a normal lock release.  Unfortunately, this
			 * requires an extra LWLock acquire-and-release cycle on the
			 * partitionLock, but hopefully it shouldn't happen often.
			 */
			LockRefindAndRelease(lockMethodTable, MyProc,
								 &locallock->tag.lock, lockmode, false);
1860 1861 1862 1863 1864 1865 1866 1867
			RemoveLocalLock(locallock);
			continue;
		}

		/* Ignore items that are not of the lockmethod to be removed */
		if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
			continue;

B
Bruce Momjian 已提交
1868
		/*
B
Bruce Momjian 已提交
1869 1870 1871
		 * If we are asked to release all locks, we can just zap the entry.
		 * Otherwise, must scan to see if there are session locks. We assume
		 * there is at most one lockOwners entry for session locks.
B
Bruce Momjian 已提交
1872
		 */
1873 1874 1875
		if (!allLocks)
		{
			LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
1876

1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903
			/* If it's above array position 0, move it down to 0 */
			for (i = locallock->numLockOwners - 1; i > 0; i--)
			{
				if (lockOwners[i].owner == NULL)
				{
					lockOwners[0] = lockOwners[i];
					break;
				}
			}

			if (locallock->numLockOwners > 0 &&
				lockOwners[0].owner == NULL &&
				lockOwners[0].nLocks > 0)
			{
				/* Fix the locallock to show just the session locks */
				locallock->nLocks = lockOwners[0].nLocks;
				locallock->numLockOwners = 1;
				/* We aren't deleting this locallock, so done */
				continue;
			}
		}

		/* Mark the proclock to show we need to release this lockmode */
		if (locallock->nLocks > 0)
			locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);

		/* And remove the locallock hashtable entry */
1904 1905 1906
		RemoveLocalLock(locallock);
	}

1907 1908 1909
	if (have_fast_path_lwlock)
		LWLockRelease(MyProc->backendLock);

1910 1911 1912 1913 1914 1915 1916
	/*
	 * Now, scan each lock partition separately.
	 */
	for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
	{
		LWLockId	partitionLock = FirstLockMgrLock + partition;
		SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
1917

1918 1919
		proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
											 offsetof(PROCLOCK, procLink));
1920

1921 1922
		if (!proclock)
			continue;			/* needn't examine this partition */
1923

1924
		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
1925

1926 1927 1928 1929
		while (proclock)
		{
			bool		wakeupNeeded = false;
			PROCLOCK   *nextplock;
1930

1931 1932 1933 1934
			/* Get link first, since we may unlink/delete this proclock */
			nextplock = (PROCLOCK *)
				SHMQueueNext(procLocks, &proclock->procLink,
							 offsetof(PROCLOCK, procLink));
1935

1936
			Assert(proclock->tag.myProc == MyProc);
1937

1938
			lock = proclock->tag.myLock;
1939

1940 1941 1942
			/* Ignore items that are not of the lockmethod to be removed */
			if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
				goto next_item;
M
 
Marc G. Fournier 已提交
1943

1944 1945 1946 1947 1948 1949 1950 1951
			/*
			 * In allLocks mode, force release of all locks even if locallock
			 * table had problems
			 */
			if (allLocks)
				proclock->releaseMask = proclock->holdMask;
			else
				Assert((proclock->releaseMask & ~proclock->holdMask) == 0);
1952

1953 1954 1955 1956 1957 1958
			/*
			 * Ignore items that have nothing to be released, unless they have
			 * holdMask == 0 and are therefore recyclable
			 */
			if (proclock->releaseMask == 0 && proclock->holdMask != 0)
				goto next_item;
M
 
Marc G. Fournier 已提交
1959

1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978
			PROCLOCK_PRINT("LockReleaseAll", proclock);
			LOCK_PRINT("LockReleaseAll", lock, 0);
			Assert(lock->nRequested >= 0);
			Assert(lock->nGranted >= 0);
			Assert(lock->nGranted <= lock->nRequested);
			Assert((proclock->holdMask & ~lock->grantMask) == 0);

			/*
			 * Release the previously-marked lock modes
			 */
			for (i = 1; i <= numLockModes; i++)
			{
				if (proclock->releaseMask & LOCKBIT_ON(i))
					wakeupNeeded |= UnGrantLock(lock, i, proclock,
												lockMethodTable);
			}
			Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
			Assert(lock->nGranted <= lock->nRequested);
			LOCK_PRINT("LockReleaseAll: updated", lock, 0);
1979

1980
			proclock->releaseMask = 0;
1981

1982 1983
			/* CleanUpLock will wake up waiters if needed. */
			CleanUpLock(lock, proclock,
1984 1985
						lockMethodTable,
						LockTagHashCode(&lock->tag),
1986
						wakeupNeeded);
M
 
Marc G. Fournier 已提交
1987

B
Bruce Momjian 已提交
1988
	next_item:
1989
			proclock = nextplock;
B
Bruce Momjian 已提交
1990
		}						/* loop over PROCLOCKs within this partition */
1991 1992

		LWLockRelease(partitionLock);
B
Bruce Momjian 已提交
1993
	}							/* loop over partitions */
1994

1995
#ifdef LOCK_DEBUG
1996
	if (*(lockMethodTable->trace_flag))
1997
		elog(LOG, "LockReleaseAll done");
1998
#endif
1999 2000
}

2001 2002 2003 2004 2005 2006 2007 2008
/*
 * LockReleaseCurrentOwner
 *		Release all locks belonging to CurrentResourceOwner
 */
void
LockReleaseCurrentOwner(void)
{
	HASH_SEQ_STATUS status;
B
Bruce Momjian 已提交
2009
	LOCALLOCK  *locallock;
2010

2011
	hash_seq_init(&status, LockMethodLocalHash);
2012 2013 2014 2015

	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
	{
		/* Ignore items that must be nontransactional */
2016
		if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional)
2017 2018
			continue;

2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037
		ReleaseLockForOwner(locallock, CurrentResourceOwner);
	}
}

/*
 * Subroutine to release a lock belonging to the 'owner' if found.
 * 'owner' can be NULL to release a session lock.
 */
static void
ReleaseLockForOwner(LOCALLOCK *locallock, ResourceOwner owner)
{
	int			i;
	LOCALLOCKOWNER *lockOwners;

	/* Scan to see if there are any locks belonging to the owner */
	lockOwners = locallock->lockOwners;
	for (i = locallock->numLockOwners - 1; i >= 0; i--)
	{
		if (lockOwners[i].owner == owner)
2038
		{
2039 2040
			Assert(lockOwners[i].nLocks > 0);
			if (lockOwners[i].nLocks < locallock->nLocks)
2041
			{
2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061
				/*
				 * We will still hold this lock after forgetting this
				 * ResourceOwner.
				 */
				locallock->nLocks -= lockOwners[i].nLocks;
				/* compact out unused slot */
				locallock->numLockOwners--;
				if (i < locallock->numLockOwners)
					lockOwners[i] = lockOwners[locallock->numLockOwners];
			}
			else
			{
				Assert(lockOwners[i].nLocks == locallock->nLocks);
				/* We want to call LockRelease just once */
				lockOwners[i].nLocks = 1;
				locallock->nLocks = 1;
				if (!LockRelease(&locallock->tag.lock,
								 locallock->tag.mode,
								 owner == NULL))
					elog(WARNING, "ReleaseLockForOwner: failed??");
2062
			}
2063
			break;
2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077
		}
	}
}

/*
 * LockReassignCurrentOwner
 *		Reassign all locks belonging to CurrentResourceOwner to belong
 *		to its parent resource owner
 */
void
LockReassignCurrentOwner(void)
{
	ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
	HASH_SEQ_STATUS status;
B
Bruce Momjian 已提交
2078
	LOCALLOCK  *locallock;
2079 2080 2081 2082
	LOCALLOCKOWNER *lockOwners;

	Assert(parent != NULL);

2083
	hash_seq_init(&status, LockMethodLocalHash);
2084 2085 2086 2087 2088 2089 2090 2091

	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
	{
		int			i;
		int			ic = -1;
		int			ip = -1;

		/* Ignore items that must be nontransactional */
2092
		if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional)
2093 2094 2095
			continue;

		/*
B
Bruce Momjian 已提交
2096 2097
		 * Scan to see if there are any locks belonging to current owner or
		 * its parent
2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110
		 */
		lockOwners = locallock->lockOwners;
		for (i = locallock->numLockOwners - 1; i >= 0; i--)
		{
			if (lockOwners[i].owner == CurrentResourceOwner)
				ic = i;
			else if (lockOwners[i].owner == parent)
				ip = i;
		}

		if (ic < 0)
			continue;			/* no current locks */

2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128
		if (ip < 0)
		{
			/* Parent has no slot, so just give it child's slot */
			lockOwners[ic].owner = parent;
		}
		else
		{
			/* Merge child's count with parent's */
			lockOwners[ip].nLocks += lockOwners[ic].nLocks;
			/* compact out unused slot */
			locallock->numLockOwners--;
			if (ic < locallock->numLockOwners)
				lockOwners[ic] = lockOwners[locallock->numLockOwners];
		}
	}
}

/*
2129
 * FastPathGrantRelationLock
2130 2131 2132
 *		Grant lock using per-backend fast-path array, if there is space.
 */
static bool
2133
FastPathGrantRelationLock(Oid relid, LOCKMODE lockmode)
2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164
{
	uint32		f;
	uint32		unused_slot = FP_LOCK_SLOTS_PER_BACKEND;

	/* Scan for existing entry for this relid, remembering empty slot. */
	for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
	{
		if (FAST_PATH_GET_BITS(MyProc, f) == 0)
			unused_slot = f;
		else if (MyProc->fpRelId[f] == relid)
		{
			Assert(!FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode));
			FAST_PATH_SET_LOCKMODE(MyProc, f, lockmode);
			return true;
		}
	}

	/* If no existing entry, use any empty slot. */
	if (unused_slot < FP_LOCK_SLOTS_PER_BACKEND)
	{
		MyProc->fpRelId[unused_slot] = relid;
		FAST_PATH_SET_LOCKMODE(MyProc, unused_slot, lockmode);
		++FastPathLocalUseCount;
		return true;
	}

	/* No existing entry, and no empty slot. */
	return false;
}

/*
2165
 * FastPathUnGrantRelationLock
2166 2167 2168 2169
 *		Release fast-path lock, if present.  Update backend-private local
 *		use count, while we're at it.
 */
static bool
2170
FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode)
2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191
{
	uint32		f;
	bool		result = false;

	FastPathLocalUseCount = 0;
	for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
	{
		if (MyProc->fpRelId[f] == relid
			&& FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode))
		{
			Assert(!result);
			FAST_PATH_CLEAR_LOCKMODE(MyProc, f, lockmode);
			result = true;
		}
		if (FAST_PATH_GET_BITS(MyProc, f) != 0)
			++FastPathLocalUseCount;
	}
	return result;
}

/*
2192
 * FastPathTransferRelationLocks
2193 2194 2195 2196
 *		Transfer locks matching the given lock tag from per-backend fast-path
 *		arrays to the shared hash table.
 */
static bool
2197
FastPathTransferRelationLocks(LockMethod lockMethodTable, const LOCKTAG *locktag,
2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278
					  uint32 hashcode)
{
	LWLockId		partitionLock = LockHashPartitionLock(hashcode);
	Oid				relid = locktag->locktag_field2;
	uint32			i;

	/*
	 * Every PGPROC that can potentially hold a fast-path lock is present
	 * in ProcGlobal->allProcs.  Prepared transactions are not, but
	 * any outstanding fast-path locks held by prepared transactions are
	 * transferred to the main lock table.
	 */
	for (i = 0; i < ProcGlobal->allProcCount; i++)
	{
		PGPROC	   *proc = &ProcGlobal->allProcs[i];
		uint32		f;

		LWLockAcquire(proc->backendLock, LW_EXCLUSIVE);

		/*
		 * If the target backend isn't referencing the same database as we are,
		 * then we needn't examine the individual relation IDs at all; none of
		 * them can be relevant.
		 *
		 * proc->databaseId is set at backend startup time and never changes
		 * thereafter, so it might be safe to perform this test before
		 * acquiring proc->backendLock.  In particular, it's certainly safe to
		 * assume that if the target backend holds any fast-path locks, it must
		 * have performed a memory-fencing operation (in particular, an LWLock
		 * acquisition) since setting proc->databaseId.  However, it's less
		 * clear that our backend is certain to have performed a memory fencing
		 * operation since the other backend set proc->databaseId.  So for now,
		 * we test it after acquiring the LWLock just to be safe.
		 */
		if (proc->databaseId != MyDatabaseId)
		{
			LWLockRelease(proc->backendLock);
			continue;
		}

		for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
		{
			uint32		lockmode;

			/* Look for an allocated slot matching the given relid. */
			if (relid != proc->fpRelId[f] || FAST_PATH_GET_BITS(proc, f) == 0)
				continue;

			/* Find or create lock object. */
			LWLockAcquire(partitionLock, LW_EXCLUSIVE);
			for (lockmode = FAST_PATH_LOCKNUMBER_OFFSET;
				 lockmode < FAST_PATH_LOCKNUMBER_OFFSET+FAST_PATH_BITS_PER_SLOT;
				 ++lockmode)
			{
				PROCLOCK   *proclock;

				if (!FAST_PATH_CHECK_LOCKMODE(proc, f, lockmode))
					continue;
				proclock = SetupLockInTable(lockMethodTable, proc, locktag,
											hashcode, lockmode);
				if (!proclock)
				{
					LWLockRelease(partitionLock);
					return false;
				}
				GrantLock(proclock->tag.myLock, proclock, lockmode);
				FAST_PATH_CLEAR_LOCKMODE(proc, f, lockmode);
			}
			LWLockRelease(partitionLock);
		}
		LWLockRelease(proc->backendLock);
	}
	return true;
}

/*
 * FastPathGetLockEntry
 *		Return the PROCLOCK for a lock originally taken via the fast-path,
 *      transferring it to the primary lock table if necessary.
 */
static PROCLOCK *
2279
FastPathGetRelationLockEntry(LOCALLOCK *locallock)
2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308
{
	LockMethod		lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD];
	LOCKTAG		   *locktag = &locallock->tag.lock;
	PROCLOCK	   *proclock = NULL;
	LWLockId		partitionLock = LockHashPartitionLock(locallock->hashcode);
	Oid				relid = locktag->locktag_field2;
	uint32			f;

	LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE);

	for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
	{
		uint32		lockmode;

		/* Look for an allocated slot matching the given relid. */
		if (relid != MyProc->fpRelId[f] || FAST_PATH_GET_BITS(MyProc, f) == 0)
			continue;

		/* If we don't have a lock of the given mode, forget it! */
		lockmode = locallock->tag.mode;
		if (!FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode))
			break;

		/* Find or create lock object. */
		LWLockAcquire(partitionLock, LW_EXCLUSIVE);

		proclock = SetupLockInTable(lockMethodTable, MyProc, locktag,
									locallock->hashcode, lockmode);
		if (!proclock)
2309
		{
2310 2311 2312 2313
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of shared memory"),
		  errhint("You might need to increase max_locks_per_transaction.")));
2314
		}
2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352
		GrantLock(proclock->tag.myLock, proclock, lockmode);
		FAST_PATH_CLEAR_LOCKMODE(MyProc, f, lockmode);

		LWLockRelease(partitionLock);
	}

	LWLockRelease(MyProc->backendLock);

	/* Lock may have already been transferred by some other backend. */
	if (proclock == NULL)
	{
		LOCK	   *lock;
		PROCLOCKTAG	proclocktag;
		uint32		proclock_hashcode;

		LWLockAcquire(partitionLock, LW_SHARED);

		lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
													(void *) locktag,
													locallock->hashcode,
													HASH_FIND,
													NULL);
		if (!lock)
			elog(ERROR, "failed to re-find shared lock object");

		proclocktag.myLock = lock;
		proclocktag.myProc = MyProc;

		proclock_hashcode = ProcLockHashCode(&proclocktag, locallock->hashcode);
		proclock = (PROCLOCK *)
			hash_search_with_hash_value(LockMethodProcLockHash,
										(void *) &proclocktag,
										proclock_hashcode,
										HASH_FIND,
										NULL);
		if (!proclock)
			elog(ERROR, "failed to re-find shared proclock object");
		LWLockRelease(partitionLock);
2353 2354
	}

2355 2356
	return proclock;
}
2357

2358 2359
/*
 * GetLockConflicts
2360
 *		Get an array of VirtualTransactionIds of xacts currently holding locks
2361 2362 2363
 *		that would conflict with the specified lock/lockmode.
 *		xacts merely awaiting such a lock are NOT reported.
 *
2364 2365
 * The result array is palloc'd and is terminated with an invalid VXID.
 *
2366 2367 2368
 * Of course, the result could be out of date by the time it's returned,
 * so use of this function has to be thought about carefully.
 *
2369 2370 2371 2372
 * Note we never include the current xact's vxid in the result array,
 * since an xact never blocks itself.  Also, prepared transactions are
 * ignored, which is a bit more debatable but is appropriate for current
 * uses of the result.
2373
 */
2374
VirtualTransactionId *
2375 2376
GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
{
2377
	static VirtualTransactionId *vxids;
2378 2379 2380 2381 2382 2383 2384 2385
	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
	LockMethod	lockMethodTable;
	LOCK	   *lock;
	LOCKMASK	conflictMask;
	SHM_QUEUE  *procLocks;
	PROCLOCK   *proclock;
	uint32		hashcode;
	LWLockId	partitionLock;
2386
	int			count = 0;
2387
	int			fast_count = 0;
2388 2389 2390 2391 2392 2393 2394

	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
	lockMethodTable = LockMethods[lockmethodid];
	if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
		elog(ERROR, "unrecognized lock mode: %d", lockmode);

2395
	/*
B
Bruce Momjian 已提交
2396 2397
	 * Allocate memory to store results, and fill with InvalidVXID.  We only
	 * need enough space for MaxBackends + a terminator, since prepared xacts
2398
	 * don't count. InHotStandby allocate once in TopMemoryContext.
2399
	 */
2400
	if (InHotStandby)
2401 2402 2403
	{
		if (vxids == NULL)
			vxids = (VirtualTransactionId *)
2404
				MemoryContextAlloc(TopMemoryContext,
B
Bruce Momjian 已提交
2405
						   sizeof(VirtualTransactionId) * (MaxBackends + 1));
2406
	}
2407 2408 2409
	else
		vxids = (VirtualTransactionId *)
			palloc0(sizeof(VirtualTransactionId) * (MaxBackends + 1));
2410

2411
	/* Compute hash code and partiton lock, and look up conflicting modes. */
2412 2413
	hashcode = LockTagHashCode(locktag);
	partitionLock = LockHashPartitionLock(hashcode);
2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493
	conflictMask = lockMethodTable->conflictTab[lockmode];

	/*
	 * Fast path locks might not have been entered in the primary lock table.
	 * But only strong locks can conflict with anything that might have been
	 * taken via the fast-path mechanism.
	 */
	if (FastPathTag(locktag) && FastPathStrongMode(lockmode))
	{
		int			i;
		Oid			relid = locktag->locktag_field2;
		VirtualTransactionId	vxid;

		/*
		 * Iterate over relevant PGPROCs.  Anything held by a prepared
		 * transaction will have been transferred to the primary lock table,
		 * so we need not worry about those.  This is all a bit fuzzy,
		 * because new locks could be taken after we've visited a particular
		 * partition, but the callers had better be prepared to deal with
		 * that anyway, since the locks could equally well be taken between the
		 * time we return the value and the time the caller does something
		 * with it.
		 */
		for (i = 0; i < ProcGlobal->allProcCount; i++)
		{
			PGPROC	   *proc = &ProcGlobal->allProcs[i];
			uint32		f;

			/* A backend never blocks itself */
			if (proc == MyProc)
				continue;

			LWLockAcquire(proc->backendLock, LW_SHARED);

			/*
			 * If the target backend isn't referencing the same database as we
			 * are, then we needn't examine the individual relation IDs at all;
			 * none of them can be relevant.
			 *
			 * See FastPathTransferLocks() for discussion of why we do this
			 * test after acquiring the lock.
			 */
			if (proc->databaseId != MyDatabaseId)
			{
				LWLockRelease(proc->backendLock);
				continue;
			}

			for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
			{
				uint32		lockmask;

				/* Look for an allocated slot matching the given relid. */
				if (relid != proc->fpRelId[f])
					continue;
				lockmask = FAST_PATH_GET_BITS(proc, f);
				if (!lockmask)
					continue;
				lockmask <<= FAST_PATH_LOCKNUMBER_OFFSET;

				/*
				 * There can only be one entry per relation, so if we found
				 * it and it doesn't conflict, we can skip the rest of the
				 * slots.
				 */
				if ((lockmask & conflictMask) == 0)
					break;

				/* Conflict! */
				GET_VXID_FROM_PGPROC(vxid, *proc);

				/*
				 * If we see an invalid VXID, then either the xact has already
				 * committed (or aborted), or it's a prepared xact.  In either
				 * case we may ignore it.
				 */
				if (VirtualTransactionIdIsValid(vxid))
					vxids[count++] = vxid;
				break;
			}
2494

2495 2496 2497 2498 2499 2500 2501 2502 2503 2504
			LWLockRelease(proc->backendLock);
		}
	}

	/* Remember how many fast-path conflicts we found. */
	fast_count = count;

	/*
	 * Look up the lock object matching the tag.
	 */
2505 2506 2507
	LWLockAcquire(partitionLock, LW_SHARED);

	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
2508
												(const void *) locktag,
2509 2510 2511 2512 2513 2514
												hashcode,
												HASH_FIND,
												NULL);
	if (!lock)
	{
		/*
B
Bruce Momjian 已提交
2515 2516
		 * If the lock object doesn't exist, there is nothing holding a lock
		 * on this lockable object.
2517 2518
		 */
		LWLockRelease(partitionLock);
2519
		return vxids;
2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534
	}

	/*
	 * Examine each existing holder (or awaiter) of the lock.
	 */

	procLocks = &(lock->procLocks);

	proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
										 offsetof(PROCLOCK, lockLink));

	while (proclock)
	{
		if (conflictMask & proclock->holdMask)
		{
B
Bruce Momjian 已提交
2535
			PGPROC	   *proc = proclock->tag.myProc;
2536 2537 2538 2539

			/* A backend never blocks itself */
			if (proc != MyProc)
			{
2540 2541 2542
				VirtualTransactionId vxid;

				GET_VXID_FROM_PGPROC(vxid, *proc);
2543 2544

				/*
2545
				 * If we see an invalid VXID, then either the xact has already
B
Bruce Momjian 已提交
2546 2547
				 * committed (or aborted), or it's a prepared xact.  In either
				 * case we may ignore it.
2548
				 */
2549
				if (VirtualTransactionIdIsValid(vxid))
2550 2551 2552 2553 2554 2555 2556 2557 2558 2559
				{
					int		i;

					/* Avoid duplicate entries. */
					for (i = 0; i < fast_count; ++i)
						if (VirtualTransactionIdEquals(vxids[i], vxid))
							break;
					if (i >= fast_count)
						vxids[count++] = vxid;
				}
2560 2561 2562 2563 2564 2565 2566 2567 2568
			}
		}

		proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
											 offsetof(PROCLOCK, lockLink));
	}

	LWLockRelease(partitionLock);

2569 2570 2571 2572
	if (count > MaxBackends)	/* should never happen */
		elog(PANIC, "too many conflicting locks found");

	return vxids;
2573 2574
}

2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661
/*
 * Find a lock in the shared lock table and release it.  It is the caller's
 * responsibility to verify that this is a sane thing to do.  (For example, it
 * would be bad to release a lock here if there might still be a LOCALLOCK
 * object with pointers to it.)
 * 
 * We currently use this in two situations: first, to release locks held by
 * prepared transactions on commit (see lock_twophase_postcommit); and second,
 * to release locks taken via the fast-path, transferred to the main hash
 * table, and then released (see LockReleaseAll).
 */
static void
LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
					 LOCKTAG *locktag, LOCKMODE lockmode,
					 bool decrement_strong_lock_count)
{
	LOCK	   *lock;
	PROCLOCK   *proclock;
	PROCLOCKTAG proclocktag;
	uint32		hashcode;
	uint32		proclock_hashcode;
	LWLockId	partitionLock;
	bool		wakeupNeeded;

	hashcode = LockTagHashCode(locktag);
	partitionLock = LockHashPartitionLock(hashcode);

	LWLockAcquire(partitionLock, LW_EXCLUSIVE);

	/*
	 * Re-find the lock object (it had better be there).
	 */
	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
												(void *) locktag,
												hashcode,
												HASH_FIND,
												NULL);
	if (!lock)
		elog(PANIC, "failed to re-find shared lock object");

	/*
	 * Re-find the proclock object (ditto).
	 */
	proclocktag.myLock = lock;
	proclocktag.myProc = proc;

	proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);

	proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
														(void *) &proclocktag,
														proclock_hashcode,
														HASH_FIND,
														NULL);
	if (!proclock)
		elog(PANIC, "failed to re-find shared proclock object");

	/*
	 * Double-check that we are actually holding a lock of the type we want to
	 * release.
	 */
	if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
	{
		PROCLOCK_PRINT("lock_twophase_postcommit: WRONGTYPE", proclock);
		LWLockRelease(partitionLock);
		elog(WARNING, "you don't own a lock of type %s",
			 lockMethodTable->lockModeNames[lockmode]);
		return;
	}

	/*
	 * Do the releasing.  CleanUpLock will waken any now-wakable waiters.
	 */
	wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);

	CleanUpLock(lock, proclock,
				lockMethodTable, hashcode,
				wakeupNeeded);

	LWLockRelease(partitionLock);

	/* 
	 * Decrement strong lock count.  This logic is needed only for 2PC.
	 */
	if (decrement_strong_lock_count
		&& FastPathTag(&lock->tag) && FastPathStrongMode(lockmode))
	{
		uint32	fasthashcode = FastPathStrongLockHashPartition(hashcode);
2662 2663 2664
		SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
		FastPathStrongRelationLocks->count[fasthashcode]--;
		SpinLockRelease(&FastPathStrongRelationLocks->mutex);
2665 2666
	}
}
2667

2668 2669 2670 2671 2672
/*
 * AtPrepare_Locks
 *		Do the preparatory work for a PREPARE: make 2PC state file records
 *		for all locks currently held.
 *
2673
 * Non-transactional locks are ignored, as are VXID locks.
2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687
 *
 * There are some special cases that we error out on: we can't be holding
 * any session locks (should be OK since only VACUUM uses those) and we
 * can't be holding any locks on temporary objects (since that would mess
 * up the current backend if it tries to exit before the prepared xact is
 * committed).
 */
void
AtPrepare_Locks(void)
{
	HASH_SEQ_STATUS status;
	LOCALLOCK  *locallock;

	/*
2688 2689 2690 2691
	 * For the most part, we don't need to touch shared memory for this ---
	 * all the necessary state information is in the locallock table.
	 * Fast-path locks are an exception, however: we move any such locks
	 * to the main table before allowing PREPARE TRANSACTION to succeed.
2692
	 */
2693
	hash_seq_init(&status, LockMethodLocalHash);
2694 2695 2696 2697 2698

	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
	{
		TwoPhaseLockRecord record;
		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
B
Bruce Momjian 已提交
2699
		int			i;
2700

2701 2702
		/* Ignore nontransactional locks */
		if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional)
2703 2704
			continue;

2705 2706 2707 2708 2709 2710 2711
		/*
		 * Ignore VXID locks.  We don't want those to be held by prepared
		 * transactions, since they aren't meaningful after a restart.
		 */
		if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
			continue;

2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723
		/* Ignore it if we don't actually hold the lock */
		if (locallock->nLocks <= 0)
			continue;

		/* Scan to verify there are no session locks */
		for (i = locallock->numLockOwners - 1; i >= 0; i--)
		{
			/* elog not ereport since this should not happen */
			if (lockOwners[i].owner == NULL)
				elog(ERROR, "cannot PREPARE when session locks exist");
		}

2724 2725 2726
		/*
		 * If the local lock was taken via the fast-path, we need to move it
		 * to the primary lock table, or just get a pointer to the existing
2727
		 * primary lock table entry if by chance it's already been transferred.
2728 2729 2730
		 */
		if (locallock->proclock == NULL)
		{
2731
			locallock->proclock = FastPathGetRelationLockEntry(locallock);
2732 2733 2734 2735 2736 2737 2738 2739 2740 2741
			locallock->lock = locallock->proclock->tag.myLock;
		}

		/*
		 * Arrange not to release any strong lock count held by this lock
		 * entry.  We must retain the count until the prepared transaction
		 * is committed or rolled back.
		 */
		locallock->holdsStrongLockCount = FALSE;

2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773
		/*
		 * Create a 2PC record.
		 */
		memcpy(&(record.locktag), &(locallock->tag.lock), sizeof(LOCKTAG));
		record.lockmode = locallock->tag.mode;

		RegisterTwoPhaseRecord(TWOPHASE_RM_LOCK_ID, 0,
							   &record, sizeof(TwoPhaseLockRecord));
	}
}

/*
 * PostPrepare_Locks
 *		Clean up after successful PREPARE
 *
 * Here, we want to transfer ownership of our locks to a dummy PGPROC
 * that's now associated with the prepared transaction, and we want to
 * clean out the corresponding entries in the LOCALLOCK table.
 *
 * Note: by removing the LOCALLOCK entries, we are leaving dangling
 * pointers in the transaction's resource owner.  This is OK at the
 * moment since resowner.c doesn't try to free locks retail at a toplevel
 * transaction commit or abort.  We could alternatively zero out nLocks
 * and leave the LOCALLOCK entries to be garbage-collected by LockReleaseAll,
 * but that probably costs more cycles.
 */
void
PostPrepare_Locks(TransactionId xid)
{
	PGPROC	   *newproc = TwoPhaseGetDummyProc(xid);
	HASH_SEQ_STATUS status;
	LOCALLOCK  *locallock;
2774
	LOCK	   *lock;
2775 2776 2777
	PROCLOCK   *proclock;
	PROCLOCKTAG proclocktag;
	bool		found;
2778
	int			partition;
2779 2780 2781 2782 2783 2784

	/* This is a critical section: any error means big trouble */
	START_CRIT_SECTION();

	/*
	 * First we run through the locallock table and get rid of unwanted
B
Bruce Momjian 已提交
2785 2786
	 * entries, then we scan the process's proclocks and transfer them to the
	 * target proc.
2787
	 *
B
Bruce Momjian 已提交
2788 2789 2790
	 * We do this separately because we may have multiple locallock entries
	 * pointing to the same proclock, and we daren't end up with any dangling
	 * pointers.
2791
	 */
2792
	hash_seq_init(&status, LockMethodLocalHash);
2793 2794 2795 2796 2797 2798

	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
	{
		if (locallock->proclock == NULL || locallock->lock == NULL)
		{
			/*
B
Bruce Momjian 已提交
2799 2800
			 * We must've run out of shared memory while trying to set up this
			 * lock.  Just forget the local entry.
2801 2802 2803 2804 2805 2806
			 */
			Assert(locallock->nLocks == 0);
			RemoveLocalLock(locallock);
			continue;
		}

2807 2808
		/* Ignore nontransactional locks */
		if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional)
2809 2810
			continue;

2811 2812 2813 2814
		/* Ignore VXID locks */
		if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
			continue;

2815 2816 2817 2818 2819 2820 2821 2822 2823 2824
		/* We already checked there are no session locks */

		/* Mark the proclock to show we need to release this lockmode */
		if (locallock->nLocks > 0)
			locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);

		/* And remove the locallock hashtable entry */
		RemoveLocalLock(locallock);
	}

2825 2826 2827 2828 2829 2830 2831
	/*
	 * Now, scan each lock partition separately.
	 */
	for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
	{
		LWLockId	partitionLock = FirstLockMgrLock + partition;
		SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
2832

2833 2834
		proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
											 offsetof(PROCLOCK, procLink));
2835

2836 2837
		if (!proclock)
			continue;			/* needn't examine this partition */
2838

2839
		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2840

2841 2842 2843 2844 2845
		while (proclock)
		{
			PROCLOCK   *nextplock;
			LOCKMASK	holdMask;
			PROCLOCK   *newproclock;
2846

2847 2848 2849 2850
			/* Get link first, since we may unlink/delete this proclock */
			nextplock = (PROCLOCK *)
				SHMQueueNext(procLocks, &proclock->procLink,
							 offsetof(PROCLOCK, procLink));
2851

2852
			Assert(proclock->tag.myProc == MyProc);
2853

2854
			lock = proclock->tag.myLock;
2855

2856 2857 2858
			/* Ignore nontransactional locks */
			if (!LockMethods[LOCK_LOCKMETHOD(*lock)]->transactional)
				goto next_item;
2859

2860 2861 2862 2863
			/* Ignore VXID locks */
			if (lock->tag.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
				goto next_item;

2864 2865 2866 2867 2868 2869
			PROCLOCK_PRINT("PostPrepare_Locks", proclock);
			LOCK_PRINT("PostPrepare_Locks", lock, 0);
			Assert(lock->nRequested >= 0);
			Assert(lock->nGranted >= 0);
			Assert(lock->nGranted <= lock->nRequested);
			Assert((proclock->holdMask & ~lock->grantMask) == 0);
2870

2871 2872 2873 2874 2875 2876
			/*
			 * Since there were no session locks, we should be releasing all
			 * locks
			 */
			if (proclock->releaseMask != proclock->holdMask)
				elog(PANIC, "we seem to have dropped a bit somewhere");
2877

2878
			holdMask = proclock->holdMask;
2879

2880
			/*
2881
			 * We cannot simply modify proclock->tag.myProc to reassign
2882
			 * ownership of the lock, because that's part of the hash key and
B
Bruce Momjian 已提交
2883
			 * the proclock would then be in the wrong hash chain.	So, unlink
2884 2885 2886 2887 2888 2889 2890 2891
			 * and delete the old proclock; create a new one with the right
			 * contents; and link it into place.  We do it in this order to be
			 * certain we won't run out of shared memory (the way dynahash.c
			 * works, the deleted object is certain to be available for
			 * reallocation).
			 */
			SHMQueueDelete(&proclock->lockLink);
			SHMQueueDelete(&proclock->procLink);
2892
			if (!hash_search(LockMethodProcLockHash,
2893 2894 2895
							 (void *) &(proclock->tag),
							 HASH_REMOVE, NULL))
				elog(PANIC, "proclock table corrupted");
2896

2897 2898 2899
			/*
			 * Create the hash key for the new proclock table.
			 */
2900 2901
			proclocktag.myLock = lock;
			proclocktag.myProc = newproc;
2902

2903
			newproclock = (PROCLOCK *) hash_search(LockMethodProcLockHash,
2904 2905 2906
												   (void *) &proclocktag,
												   HASH_ENTER_NULL, &found);
			if (!newproclock)
B
Bruce Momjian 已提交
2907
				ereport(PANIC,	/* should not happen */
2908 2909 2910
						(errcode(ERRCODE_OUT_OF_MEMORY),
						 errmsg("out of shared memory"),
						 errdetail("Not enough memory for reassigning the prepared transaction's locks.")));
2911

2912 2913 2914 2915 2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933 2934 2935 2936
			/*
			 * If new, initialize the new entry
			 */
			if (!found)
			{
				newproclock->holdMask = 0;
				newproclock->releaseMask = 0;
				/* Add new proclock to appropriate lists */
				SHMQueueInsertBefore(&lock->procLocks, &newproclock->lockLink);
				SHMQueueInsertBefore(&(newproc->myProcLocks[partition]),
									 &newproclock->procLink);
				PROCLOCK_PRINT("PostPrepare_Locks: new", newproclock);
			}
			else
			{
				PROCLOCK_PRINT("PostPrepare_Locks: found", newproclock);
				Assert((newproclock->holdMask & ~lock->grantMask) == 0);
			}

			/*
			 * Pass over the identified lock ownership.
			 */
			Assert((newproclock->holdMask & holdMask) == 0);
			newproclock->holdMask |= holdMask;

B
Bruce Momjian 已提交
2937
	next_item:
2938
			proclock = nextplock;
B
Bruce Momjian 已提交
2939
		}						/* loop over PROCLOCKs within this partition */
2940

2941
		LWLockRelease(partitionLock);
B
Bruce Momjian 已提交
2942
	}							/* loop over partitions */
2943 2944 2945 2946 2947

	END_CRIT_SECTION();
}


2948 2949 2950
/*
 * Estimate shared-memory space used for lock tables
 */
2951
Size
2952
LockShmemSize(void)
2953
{
2954
	Size		size = 0;
2955
	long		max_table_size;
2956

2957
	/* lock hash table */
2958
	max_table_size = NLOCKENTS();
2959
	size = add_size(size, hash_estimate_size(max_table_size, sizeof(LOCK)));
2960

2961
	/* proclock hash table */
2962
	max_table_size *= 2;
2963
	size = add_size(size, hash_estimate_size(max_table_size, sizeof(PROCLOCK)));
2964

B
Bruce Momjian 已提交
2965
	/*
2966
	 * Since NLOCKENTS is only an estimate, add 10% safety margin.
2967
	 */
2968
	size = add_size(size, size / 10);
2969 2970

	return size;
2971 2972
}

2973 2974
/*
 * GetLockStatusData - Return a summary of the lock manager's internal
2975
 * status, for use in a user-level reporting function.
2976
 *
2977 2978 2979 2980 2981
 * The return data consists of an array of PROCLOCK objects, with the
 * associated PGPROC and LOCK objects for each.  Note that multiple
 * copies of the same PGPROC and/or LOCK objects are likely to appear.
 * It is the caller's responsibility to match up duplicates if wanted.
 *
2982
 * The design goal is to hold the LWLocks for as short a time as possible;
2983
 * thus, this function simply makes a copy of the necessary data and releases
2984
 * the locks, allowing the caller to contemplate and format the data for as
2985
 * long as it pleases.
2986
 */
2987 2988
LockData *
GetLockStatusData(void)
2989
{
B
Bruce Momjian 已提交
2990
	LockData   *data;
2991
	PROCLOCK   *proclock;
2992
	HASH_SEQ_STATUS seqstat;
2993 2994
	int			els;
	int			el;
B
Bruce Momjian 已提交
2995
	int			i;
2996

2997
	data = (LockData *) palloc(sizeof(LockData));
2998

2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021
	/* Guess how much space we'll need. */
	els = MaxBackends;
	el = 0;
	data->locks = (LockInstanceData *) palloc(sizeof(LockInstanceData) * els);

	/*
	 * First, we iterate through the per-backend fast-path arrays, locking
	 * them one at a time.  This might produce an inconsistent picture of the
	 * system state, but taking all of those LWLocks at the same time seems
	 * impractical (in particular, note MAX_SIMUL_LWLOCKS).  It shouldn't
	 * matter too much, because none of these locks can be involved in lock
	 * conflicts anyway - anything that might must be present in the main
	 * lock table.
	 */
	for (i = 0; i < ProcGlobal->allProcCount; ++i)
	{
		PGPROC	   *proc = &ProcGlobal->allProcs[i];
		uint32		f;

		LWLockAcquire(proc->backendLock, LW_SHARED);

		for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; ++f)
		{
3022
			LockInstanceData   *instance;
3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035
			uint32		lockbits = FAST_PATH_GET_BITS(proc, f);

			/* Skip unallocated slots. */
			if (!lockbits)
				continue;

			if (el >= els)
			{
				els += MaxBackends;
				data->locks = (LockInstanceData *)
					repalloc(data->locks, sizeof(LockInstanceData) * els);
			}

3036
			instance = &data->locks[el];
3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048
			SET_LOCKTAG_RELATION(instance->locktag, proc->databaseId,
								 proc->fpRelId[f]);
			instance->holdMask = lockbits << FAST_PATH_LOCKNUMBER_OFFSET;
			instance->waitLockMode = NoLock;
			instance->backend = proc->backendId;
			instance->lxid = proc->lxid;
			instance->pid = proc->pid;
			instance->fastpath = true;

			el++;
		}

3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075
		if (proc->fpVXIDLock)
		{
			VirtualTransactionId	vxid;
			LockInstanceData   *instance;

			if (el >= els)
			{
				els += MaxBackends;
				data->locks = (LockInstanceData *)
					repalloc(data->locks, sizeof(LockInstanceData) * els);
			}

			vxid.backendId = proc->backendId;
			vxid.localTransactionId = proc->fpLocalTransactionId;

			instance = &data->locks[el];
			SET_LOCKTAG_VIRTUALTRANSACTION(instance->locktag, vxid);
			instance->holdMask = LOCKBIT_ON(ExclusiveLock);
			instance->waitLockMode = NoLock;
			instance->backend = proc->backendId;
			instance->lxid = proc->lxid;
			instance->pid = proc->pid;
			instance->fastpath = true;

			el++;
		}

3076 3077 3078
		LWLockRelease(proc->backendLock);
	}

3079
	/*
3080 3081 3082
	 * Next, acquire lock on the entire shared lock data structure.  We do
	 * this so that, at least for locks in the primary lock table, the state
	 * will be self-consistent.
3083
	 *
B
Bruce Momjian 已提交
3084 3085 3086 3087 3088
	 * Since this is a read-only operation, we take shared instead of
	 * exclusive lock.	There's not a whole lot of point to this, because all
	 * the normal operations require exclusive lock, but it doesn't hurt
	 * anything either. It will at least allow two backends to do
	 * GetLockStatusData in parallel.
3089 3090 3091 3092 3093
	 *
	 * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
	 */
	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
		LWLockAcquire(FirstLockMgrLock + i, LW_SHARED);
3094 3095

	/* Now we can safely count the number of proclocks */
3096 3097 3098 3099 3100 3101 3102
	data->nelements = el + hash_get_num_entries(LockMethodProcLockHash);
	if (data->nelements > els)
	{
		els = data->nelements;
		data->locks = (LockInstanceData *)
			repalloc(data->locks, sizeof(LockInstanceData) * els);
	}
3103

3104
	/* Now scan the tables to copy the data */
3105
	hash_seq_init(&seqstat, LockMethodProcLockHash);
3106

3107 3108 3109 3110
	while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
	{
		PGPROC	   *proc = proclock->tag.myProc;
		LOCK	   *lock = proclock->tag.myLock;
3111
		LockInstanceData   *instance = &data->locks[el];
3112

3113 3114 3115 3116 3117 3118 3119 3120 3121 3122
		memcpy(&instance->locktag, &lock->tag, sizeof(LOCKTAG));
		instance->holdMask = proclock->holdMask;
		if (proc->waitLock == proclock->tag.myLock)
			instance->waitLockMode = proc->waitLockMode;
		else
			instance->waitLockMode = NoLock;
		instance->backend = proc->backendId;
		instance->lxid = proc->lxid;
		instance->pid = proc->pid;
		instance->fastpath = false;
3123

3124
		el++;
3125 3126
	}

3127
	/*
B
Bruce Momjian 已提交
3128 3129 3130 3131 3132
	 * And release locks.  We do this in reverse order for two reasons: (1)
	 * Anyone else who needs more than one of the locks will be trying to lock
	 * them in increasing order; we don't want to release the other process
	 * until it can get all the locks it needs. (2) This avoids O(N^2)
	 * behavior inside LWLockRelease.
3133
	 */
B
Bruce Momjian 已提交
3134
	for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
3135
		LWLockRelease(FirstLockMgrLock + i);
3136

3137
	Assert(el == data->nelements);
3138

3139
	return data;
3140 3141
}

3142 3143 3144 3145 3146 3147 3148 3149 3150 3151
/*
 * Returns a list of currently held AccessExclusiveLocks, for use
 * by GetRunningTransactionData().
 */
xl_standby_lock *
GetRunningTransactionLocks(int *nlocks)
{
	PROCLOCK   *proclock;
	HASH_SEQ_STATUS seqstat;
	int			i;
B
Bruce Momjian 已提交
3152
	int			index;
3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176
	int			els;
	xl_standby_lock *accessExclusiveLocks;

	/*
	 * Acquire lock on the entire shared lock data structure.
	 *
	 * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
	 */
	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
		LWLockAcquire(FirstLockMgrLock + i, LW_SHARED);

	/* Now scan the tables to copy the data */
	hash_seq_init(&seqstat, LockMethodProcLockHash);

	/* Now we can safely count the number of proclocks */
	els = hash_get_num_entries(LockMethodProcLockHash);

	/*
	 * Allocating enough space for all locks in the lock table is overkill,
	 * but it's more convenient and faster than having to enlarge the array.
	 */
	accessExclusiveLocks = palloc(els * sizeof(xl_standby_lock));

	/*
B
Bruce Momjian 已提交
3177 3178 3179 3180 3181
	 * If lock is a currently granted AccessExclusiveLock then it will have
	 * just one proclock holder, so locks are never accessed twice in this
	 * particular case. Don't copy this code for use elsewhere because in the
	 * general case this will give you duplicate locks when looking at
	 * non-exclusive lock types.
3182 3183 3184 3185 3186 3187 3188 3189
	 */
	index = 0;
	while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
	{
		/* make sure this definition matches the one used in LockAcquire */
		if ((proclock->holdMask & LOCKBIT_ON(AccessExclusiveLock)) &&
			proclock->tag.myLock->tag.locktag_type == LOCKTAG_RELATION)
		{
B
Bruce Momjian 已提交
3190
			PGPROC	   *proc = proclock->tag.myProc;
3191
			PGXACT	   *pgxact = &ProcGlobal->allPgXact[proc->pgprocno];
B
Bruce Momjian 已提交
3192
			LOCK	   *lock = proclock->tag.myLock;
3193

3194
			accessExclusiveLocks[index].xid = pgxact->xid;
B
Bruce Momjian 已提交
3195
			accessExclusiveLocks[index].dbOid = lock->tag.locktag_field1;
3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215
			accessExclusiveLocks[index].relOid = lock->tag.locktag_field2;

			index++;
		}
	}

	/*
	 * And release locks.  We do this in reverse order for two reasons: (1)
	 * Anyone else who needs more than one of the locks will be trying to lock
	 * them in increasing order; we don't want to release the other process
	 * until it can get all the locks it needs. (2) This avoids O(N^2)
	 * behavior inside LWLockRelease.
	 */
	for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
		LWLockRelease(FirstLockMgrLock + i);

	*nlocks = index;
	return accessExclusiveLocks;
}

3216 3217
/* Provide the textual name of any lock mode */
const char *
3218
GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode)
3219
{
3220 3221 3222
	Assert(lockmethodid > 0 && lockmethodid < lengthof(LockMethods));
	Assert(mode > 0 && mode <= LockMethods[lockmethodid]->numLockModes);
	return LockMethods[lockmethodid]->lockModeNames[mode];
3223
}
B
Bruce Momjian 已提交
3224

3225
#ifdef LOCK_DEBUG
3226
/*
3227
 * Dump all locks in the given proc's myProcLocks lists.
3228
 *
3229
 * Caller is responsible for having acquired appropriate LWLocks.
3230 3231
 */
void
3232
DumpLocks(PGPROC *proc)
3233
{
3234
	SHM_QUEUE  *procLocks;
3235
	PROCLOCK   *proclock;
3236
	LOCK	   *lock;
3237
	int			i;
3238

3239
	if (proc == NULL)
3240
		return;
3241

3242 3243 3244
	if (proc->waitLock)
		LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);

3245
	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
3246
	{
3247
		procLocks = &(proc->myProcLocks[i]);
3248

3249 3250
		proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
											 offsetof(PROCLOCK, procLink));
M
 
Marc G. Fournier 已提交
3251

3252 3253
		while (proclock)
		{
3254
			Assert(proclock->tag.myProc == proc);
3255

3256
			lock = proclock->tag.myLock;
3257 3258 3259 3260 3261 3262 3263 3264

			PROCLOCK_PRINT("DumpLocks", proclock);
			LOCK_PRINT("DumpLocks", lock, 0);

			proclock = (PROCLOCK *)
				SHMQueueNext(procLocks, &proclock->procLink,
							 offsetof(PROCLOCK, procLink));
		}
3265
	}
3266
}
3267

M
 
Marc G. Fournier 已提交
3268
/*
3269 3270 3271
 * Dump all lmgr locks.
 *
 * Caller is responsible for having acquired appropriate LWLocks.
M
 
Marc G. Fournier 已提交
3272 3273
 */
void
3274
DumpAllLocks(void)
M
 
Marc G. Fournier 已提交
3275
{
J
Jan Wieck 已提交
3276
	PGPROC	   *proc;
3277
	PROCLOCK   *proclock;
3278
	LOCK	   *lock;
3279
	HASH_SEQ_STATUS status;
M
 
Marc G. Fournier 已提交
3280

3281
	proc = MyProc;
M
 
Marc G. Fournier 已提交
3282

3283
	if (proc && proc->waitLock)
3284
		LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
M
 
Marc G. Fournier 已提交
3285

3286
	hash_seq_init(&status, LockMethodProcLockHash);
M
 
Marc G. Fournier 已提交
3287

3288 3289 3290
	while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL)
	{
		PROCLOCK_PRINT("DumpAllLocks", proclock);
3291

3292 3293 3294 3295 3296
		lock = proclock->tag.myLock;
		if (lock)
			LOCK_PRINT("DumpAllLocks", lock, 0);
		else
			elog(LOG, "DumpAllLocks: proclock->tag.myLock = NULL");
M
 
Marc G. Fournier 已提交
3297 3298
	}
}
3299
#endif   /* LOCK_DEBUG */
3300 3301 3302 3303 3304 3305 3306 3307 3308 3309 3310

/*
 * LOCK 2PC resource manager's routines
 */

/*
 * Re-acquire a lock belonging to a transaction that was prepared.
 *
 * Because this function is run at db startup, re-acquiring the locks should
 * never conflict with running transactions because there are none.  We
 * assume that the lock state represented by the stored 2PC files is legal.
3311 3312 3313
 *
 * When switching from Hot Standby mode to normal operation, the locks will
 * be already held by the startup process. The locks are acquired for the new
R
Robert Haas 已提交
3314
 * procs without checking for conflicts, so we don't get a conflict between the
3315 3316 3317 3318 3319 3320 3321 3322 3323 3324 3325 3326 3327 3328
 * startup process and the dummy procs, even though we will momentarily have
 * a situation where two procs are holding the same AccessExclusiveLock,
 * which isn't normally possible because the conflict. If we're in standby
 * mode, but a recovery snapshot hasn't been established yet, it's possible
 * that some but not all of the locks are already held by the startup process.
 *
 * This approach is simple, but also a bit dangerous, because if there isn't
 * enough shared memory to acquire the locks, an error will be thrown, which
 * is promoted to FATAL and recovery will abort, bringing down postmaster.
 * A safer approach would be to transfer the locks like we do in
 * AtPrepare_Locks, but then again, in hot standby mode it's possible for
 * read-only backends to use up all the shared lock memory anyway, so that
 * replaying the WAL record that needs to acquire a lock will throw an error
 * and PANIC anyway.
3329 3330 3331 3332 3333 3334 3335
 */
void
lock_twophase_recover(TransactionId xid, uint16 info,
					  void *recdata, uint32 len)
{
	TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
	PGPROC	   *proc = TwoPhaseGetDummyProc(xid);
B
Bruce Momjian 已提交
3336
	LOCKTAG    *locktag;
3337 3338 3339 3340 3341 3342
	LOCKMODE	lockmode;
	LOCKMETHODID lockmethodid;
	LOCK	   *lock;
	PROCLOCK   *proclock;
	PROCLOCKTAG proclocktag;
	bool		found;
3343 3344
	uint32		hashcode;
	uint32		proclock_hashcode;
3345 3346
	int			partition;
	LWLockId	partitionLock;
3347 3348 3349 3350 3351 3352 3353
	LockMethod	lockMethodTable;

	Assert(len == sizeof(TwoPhaseLockRecord));
	locktag = &rec->locktag;
	lockmode = rec->lockmode;
	lockmethodid = locktag->locktag_lockmethodid;

3354
	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
3355
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
3356
	lockMethodTable = LockMethods[lockmethodid];
3357

3358 3359 3360
	hashcode = LockTagHashCode(locktag);
	partition = LockHashPartition(hashcode);
	partitionLock = LockHashPartitionLock(hashcode);
3361

3362
	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
3363 3364 3365 3366

	/*
	 * Find or create a lock with this tag.
	 */
3367 3368 3369 3370 3371
	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
												(void *) locktag,
												hashcode,
												HASH_ENTER_NULL,
												&found);
3372 3373
	if (!lock)
	{
3374
		LWLockRelease(partitionLock);
3375 3376 3377
		ereport(ERROR,
				(errcode(ERRCODE_OUT_OF_MEMORY),
				 errmsg("out of shared memory"),
B
Bruce Momjian 已提交
3378
		  errhint("You might need to increase max_locks_per_transaction.")));
3379 3380 3381 3382 3383 3384 3385 3386 3387 3388 3389 3390 3391 3392 3393 3394 3395 3396 3397 3398 3399 3400 3401 3402 3403 3404 3405 3406
	}

	/*
	 * if it's a new lock object, initialize it
	 */
	if (!found)
	{
		lock->grantMask = 0;
		lock->waitMask = 0;
		SHMQueueInit(&(lock->procLocks));
		ProcQueueInit(&(lock->waitProcs));
		lock->nRequested = 0;
		lock->nGranted = 0;
		MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
		MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
		LOCK_PRINT("lock_twophase_recover: new", lock, lockmode);
	}
	else
	{
		LOCK_PRINT("lock_twophase_recover: found", lock, lockmode);
		Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
		Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
		Assert(lock->nGranted <= lock->nRequested);
	}

	/*
	 * Create the hash key for the proclock table.
	 */
3407 3408 3409 3410
	proclocktag.myLock = lock;
	proclocktag.myProc = proc;

	proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
3411 3412 3413 3414

	/*
	 * Find or create a proclock entry with this tag
	 */
3415 3416 3417 3418 3419
	proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
														(void *) &proclocktag,
														proclock_hashcode,
														HASH_ENTER_NULL,
														&found);
3420 3421 3422 3423 3424 3425 3426 3427 3428 3429 3430 3431
	if (!proclock)
	{
		/* Ooops, not enough shmem for the proclock */
		if (lock->nRequested == 0)
		{
			/*
			 * There are no other requestors of this lock, so garbage-collect
			 * the lock object.  We *must* do this to avoid a permanent leak
			 * of shared memory, because there won't be anything to cause
			 * anyone to release the lock object later.
			 */
			Assert(SHMQueueEmpty(&(lock->procLocks)));
3432 3433 3434 3435 3436
			if (!hash_search_with_hash_value(LockMethodLockHash,
											 (void *) &(lock->tag),
											 hashcode,
											 HASH_REMOVE,
											 NULL))
3437 3438
				elog(PANIC, "lock table corrupted");
		}
3439
		LWLockRelease(partitionLock);
3440 3441 3442
		ereport(ERROR,
				(errcode(ERRCODE_OUT_OF_MEMORY),
				 errmsg("out of shared memory"),
B
Bruce Momjian 已提交
3443
		  errhint("You might need to increase max_locks_per_transaction.")));
3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454
	}

	/*
	 * If new, initialize the new entry
	 */
	if (!found)
	{
		proclock->holdMask = 0;
		proclock->releaseMask = 0;
		/* Add proclock to appropriate lists */
		SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
3455 3456
		SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
							 &proclock->procLink);
3457 3458 3459 3460 3461 3462 3463 3464 3465 3466
		PROCLOCK_PRINT("lock_twophase_recover: new", proclock);
	}
	else
	{
		PROCLOCK_PRINT("lock_twophase_recover: found", proclock);
		Assert((proclock->holdMask & ~lock->grantMask) == 0);
	}

	/*
	 * lock->nRequested and lock->requested[] count the total number of
B
Bruce Momjian 已提交
3467
	 * requests, whether granted or waiting, so increment those immediately.
3468 3469 3470 3471 3472 3473 3474 3475 3476 3477
	 */
	lock->nRequested++;
	lock->requested[lockmode]++;
	Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));

	/*
	 * We shouldn't already hold the desired lock.
	 */
	if (proclock->holdMask & LOCKBIT_ON(lockmode))
		elog(ERROR, "lock %s on object %u/%u/%u is already held",
3478
			 lockMethodTable->lockModeNames[lockmode],
3479 3480 3481 3482
			 lock->tag.locktag_field1, lock->tag.locktag_field2,
			 lock->tag.locktag_field3);

	/*
B
Bruce Momjian 已提交
3483 3484
	 * We ignore any possible conflicts and just grant ourselves the lock. Not
	 * only because we don't bother, but also to avoid deadlocks when
3485
	 * switching from standby to normal mode. See function comment.
3486 3487 3488
	 */
	GrantLock(lock, proclock, lockmode);

3489 3490 3491 3492 3493 3494 3495
	/* 
	 * Bump strong lock count, to make sure any fast-path lock requests won't
	 * be granted without consulting the primary lock table.
	 */
	if (FastPathTag(&lock->tag) && FastPathStrongMode(lockmode))
	{
		uint32	fasthashcode = FastPathStrongLockHashPartition(hashcode);
3496 3497 3498
		SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
		FastPathStrongRelationLocks->count[fasthashcode]++;
		SpinLockRelease(&FastPathStrongRelationLocks->mutex);
3499 3500
	}

3501
	LWLockRelease(partitionLock);
3502 3503
}

3504 3505 3506 3507 3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518 3519 3520 3521 3522 3523 3524 3525 3526 3527 3528
/*
 * Re-acquire a lock belonging to a transaction that was prepared, when
 * when starting up into hot standby mode.
 */
void
lock_twophase_standby_recover(TransactionId xid, uint16 info,
							  void *recdata, uint32 len)
{
	TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
	LOCKTAG    *locktag;
	LOCKMODE	lockmode;
	LOCKMETHODID lockmethodid;

	Assert(len == sizeof(TwoPhaseLockRecord));
	locktag = &rec->locktag;
	lockmode = rec->lockmode;
	lockmethodid = locktag->locktag_lockmethodid;

	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);

	if (lockmode == AccessExclusiveLock &&
		locktag->locktag_type == LOCKTAG_RELATION)
	{
		StandbyAcquireAccessExclusiveLock(xid,
B
Bruce Momjian 已提交
3529 3530
										locktag->locktag_field1 /* dboid */ ,
									  locktag->locktag_field2 /* reloid */ );
3531 3532 3533 3534
	}
}


3535 3536 3537 3538 3539 3540 3541 3542 3543 3544 3545
/*
 * 2PC processing routine for COMMIT PREPARED case.
 *
 * Find and release the lock indicated by the 2PC record.
 */
void
lock_twophase_postcommit(TransactionId xid, uint16 info,
						 void *recdata, uint32 len)
{
	TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
	PGPROC	   *proc = TwoPhaseGetDummyProc(xid);
B
Bruce Momjian 已提交
3546
	LOCKTAG    *locktag;
3547 3548 3549 3550 3551 3552 3553
	LOCKMETHODID lockmethodid;
	LockMethod	lockMethodTable;

	Assert(len == sizeof(TwoPhaseLockRecord));
	locktag = &rec->locktag;
	lockmethodid = locktag->locktag_lockmethodid;

3554
	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
3555
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
3556
	lockMethodTable = LockMethods[lockmethodid];
3557

3558
	LockRefindAndRelease(lockMethodTable, proc, locktag, rec->lockmode, true);
3559 3560 3561 3562 3563 3564 3565 3566 3567 3568 3569 3570 3571
}

/*
 * 2PC processing routine for ROLLBACK PREPARED case.
 *
 * This is actually just the same as the COMMIT case.
 */
void
lock_twophase_postabort(TransactionId xid, uint16 info,
						void *recdata, uint32 len)
{
	lock_twophase_postcommit(xid, info, recdata, len);
}
3572 3573 3574 3575 3576 3577 3578 3579 3580 3581 3582 3583 3584 3585 3586 3587 3588 3589 3590 3591 3592 3593 3594 3595 3596 3597 3598 3599 3600 3601 3602 3603 3604 3605 3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616 3617 3618 3619 3620 3621 3622 3623 3624 3625 3626 3627 3628 3629 3630 3631 3632 3633 3634 3635 3636 3637 3638 3639 3640 3641 3642 3643 3644 3645 3646 3647 3648 3649 3650 3651 3652 3653 3654 3655 3656 3657 3658 3659 3660 3661 3662 3663 3664 3665 3666 3667 3668 3669 3670 3671 3672 3673 3674 3675 3676 3677 3678

/*
 *		VirtualXactLockTableInsert
 *
 *		Take vxid lock via the fast-path.  There can't be any pre-existing
 *		lockers, as we haven't advertised this vxid via the ProcArray yet.
 *
 *		Since MyProc->fpLocalTransactionId will normally contain the same data
 *		as MyProc->lxid, you might wonder if we really need both.  The
 *		difference is that MyProc->lxid is set and cleared unlocked, and
 *		examined by procarray.c, while fpLocalTransactionId is protected by
 *		backendLock and is used only by the locking subsystem.  Doing it this
 *		way makes it easier to verify that there are no funny race conditions.
 *
 *		We don't bother recording this lock in the local lock table, since it's
 *		only ever released at the end of a transaction.  Instead,
 *		LockReleaseAll() calls VirtualXactLockTableCleanup().
 */
void
VirtualXactLockTableInsert(VirtualTransactionId vxid)
{
	Assert(VirtualTransactionIdIsValid(vxid));

	LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE);

	Assert(MyProc->backendId == vxid.backendId);
	Assert(MyProc->fpLocalTransactionId == InvalidLocalTransactionId);
	Assert(MyProc->fpVXIDLock == false);

	MyProc->fpVXIDLock = true;
	MyProc->fpLocalTransactionId = vxid.localTransactionId;

	LWLockRelease(MyProc->backendLock);
}

/*
 *		VirtualXactLockTableCleanup
 *
 *		Check whether a VXID lock has been materialized; if so, release it,
 *		unblocking waiters.
 */
static void
VirtualXactLockTableCleanup()
{
	bool	fastpath;
	LocalTransactionId	lxid;

	Assert(MyProc->backendId != InvalidBackendId);

	/*
	 * Clean up shared memory state.
	 */
	LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE);

	fastpath = MyProc->fpVXIDLock;
	lxid = MyProc->fpLocalTransactionId;
	MyProc->fpVXIDLock = false;
	MyProc->fpLocalTransactionId = InvalidLocalTransactionId;

	LWLockRelease(MyProc->backendLock);

	/*
	 * If fpVXIDLock has been cleared without touching fpLocalTransactionId,
	 * that means someone transferred the lock to the main lock table.
	 */
	if (!fastpath && LocalTransactionIdIsValid(lxid))
	{
		VirtualTransactionId	vxid;
		LOCKTAG	locktag;

		vxid.backendId = MyBackendId;
		vxid.localTransactionId = lxid;
		SET_LOCKTAG_VIRTUALTRANSACTION(locktag, vxid);

		LockRefindAndRelease(LockMethods[DEFAULT_LOCKMETHOD], MyProc,
							 &locktag, ExclusiveLock, false);
	}	
}

/*
 *		VirtualXactLock
 *
 * If wait = true, wait until the given VXID has been released, and then
 * return true.
 *
 * If wait = false, just check whether the VXID is still running, and return
 * true or false.
 */
bool
VirtualXactLock(VirtualTransactionId vxid, bool wait)
{
	LOCKTAG		tag;
	PGPROC	   *proc;

	Assert(VirtualTransactionIdIsValid(vxid));

	SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid);

	/*
	 * If a lock table entry must be made, this is the PGPROC on whose behalf
	 * it must be done.  Note that the transaction might end or the PGPROC
	 * might be reassigned to a new backend before we get around to examining
	 * it, but it doesn't matter.  If we find upon examination that the
	 * relevant lxid is no longer running here, that's enough to prove that
	 * it's no longer running anywhere.
	 */
	proc = BackendIdGetProc(vxid.backendId);
R
Robert Haas 已提交
3679 3680
	if (proc == NULL)
		return true;
3681 3682 3683 3684 3685 3686 3687 3688 3689 3690 3691 3692 3693 3694 3695 3696 3697 3698 3699 3700 3701 3702 3703 3704 3705 3706 3707 3708 3709 3710 3711 3712 3713 3714 3715 3716 3717 3718 3719 3720 3721 3722 3723 3724 3725 3726 3727 3728 3729 3730 3731 3732 3733 3734 3735 3736

	/*
	 * We must acquire this lock before checking the backendId and lxid
	 * against the ones we're waiting for.  The target backend will only
	 * set or clear lxid while holding this lock.
	 */
	LWLockAcquire(proc->backendLock, LW_EXCLUSIVE);

	/* If the transaction has ended, our work here is done. */
	if (proc->backendId != vxid.backendId
		|| proc->fpLocalTransactionId != vxid.localTransactionId)
	{
		LWLockRelease(proc->backendLock);
		return true;
	}

	/*
	 * If we aren't asked to wait, there's no need to set up a lock table
	 * entry.  The transaction is still in progress, so just return false.
	 */
	if (!wait)
	{
		LWLockRelease(proc->backendLock);
		return false;
	}

	/*
	 * OK, we're going to need to sleep on the VXID.  But first, we must set
	 * up the primary lock table entry, if needed.
	 */
	if (proc->fpVXIDLock)
	{
		PROCLOCK   *proclock;
		uint32		hashcode;

		hashcode = LockTagHashCode(&tag);
		proclock = SetupLockInTable(LockMethods[DEFAULT_LOCKMETHOD], proc,
									&tag, hashcode, ExclusiveLock);
		if (!proclock)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of shared memory"),
		  errhint("You might need to increase max_locks_per_transaction.")));
		GrantLock(proclock->tag.myLock, proclock, ExclusiveLock);
		proc->fpVXIDLock = false;
	}

	/* Done with proc->fpLockBits */
	LWLockRelease(proc->backendLock);

	/* Time to wait. */
	(void) LockAcquire(&tag, ShareLock, false, false);

	LockRelease(&tag, ShareLock, false);
	return true;
}