lock.c 123.9 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * lock.c
4
 *	  POSTGRES primary lock mechanism
5
 *
B
Bruce Momjian 已提交
6
 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
B
Add:  
Bruce Momjian 已提交
7
 * Portions Copyright (c) 1994, Regents of the University of California
8 9 10
 *
 *
 * IDENTIFICATION
11
 *	  src/backend/storage/lmgr/lock.c
12 13
 *
 * NOTES
14
 *	  A lock table is a shared memory hash table.  When
15
 *	  a process tries to acquire a lock of a type that conflicts
16 17
 *	  with existing locks, it is put to sleep using the routines
 *	  in storage/lmgr/proc.c.
18
 *
19 20 21
 *	  For the most part, this code should be invoked via lmgr.c
 *	  or another lock-management module, not directly.
 *
22
 *	Interface:
23
 *
24 25
 *	InitLocks(), GetLocksMethodTable(),
 *	LockAcquire(), LockRelease(), LockReleaseAll(),
26
 *	LockCheckConflicts(), GrantLock()
27 28 29
 *
 *-------------------------------------------------------------------------
 */
30 31
#include "postgres.h"

M
 
Marc G. Fournier 已提交
32
#include <signal.h>
33
#include <unistd.h>
B
Bruce Momjian 已提交
34

35
#include "access/transam.h"
36 37
#include "access/twophase.h"
#include "access/twophase_rmgr.h"
38
#include "access/xact.h"
39
#include "access/xlog.h"
B
Bruce Momjian 已提交
40
#include "miscadmin.h"
41
#include "pg_trace.h"
42
#include "pgstat.h"
43
#include "storage/proc.h"
44
#include "storage/sinvaladt.h"
45
#include "storage/spin.h"
46
#include "storage/standby.h"
47
#include "utils/memutils.h"
M
 
Marc G. Fournier 已提交
48
#include "utils/ps_status.h"
A
Alvaro Herrera 已提交
49
#include "utils/resowner_private.h"
50

51 52

/* This configuration variable is used to set the lock table size */
53
int			max_locks_per_xact; /* set by guc.c */
54

B
Bruce Momjian 已提交
55
#define NLOCKENTS() \
56
	mul_size(max_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
57 58


59
/*
60 61 62
 * Data structures defining the semantics of the standard lock methods.
 *
 * The conflict table defines the semantics of the various lock modes.
63
 */
64 65 66 67
static const LOCKMASK LockConflicts[] = {
	0,

	/* AccessShareLock */
68
	LOCKBIT_ON(AccessExclusiveLock),
69 70

	/* RowShareLock */
71
	LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
72 73

	/* RowExclusiveLock */
74 75
	LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
	LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
76 77

	/* ShareUpdateExclusiveLock */
78 79 80
	LOCKBIT_ON(ShareUpdateExclusiveLock) |
	LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
	LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
81 82

	/* ShareLock */
83 84 85
	LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
	LOCKBIT_ON(ShareRowExclusiveLock) |
	LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
86 87

	/* ShareRowExclusiveLock */
88 89 90
	LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
	LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
	LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
91 92

	/* ExclusiveLock */
93 94 95 96
	LOCKBIT_ON(RowShareLock) |
	LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
	LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
	LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
97 98

	/* AccessExclusiveLock */
99 100 101 102
	LOCKBIT_ON(AccessShareLock) | LOCKBIT_ON(RowShareLock) |
	LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
	LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
	LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock)
103

104
};
105

106
/* Names of lock modes, for debug printouts */
B
Bruce Momjian 已提交
107
static const char *const lock_mode_names[] =
108
{
109
	"INVALID",
110 111 112 113 114 115 116 117
	"AccessShareLock",
	"RowShareLock",
	"RowExclusiveLock",
	"ShareUpdateExclusiveLock",
	"ShareLock",
	"ShareRowExclusiveLock",
	"ExclusiveLock",
	"AccessExclusiveLock"
118
};
119

120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
#ifndef LOCK_DEBUG
static bool Dummy_trace = false;
#endif

static const LockMethodData default_lockmethod = {
	AccessExclusiveLock,		/* highest valid lock mode number */
	LockConflicts,
	lock_mode_names,
#ifdef LOCK_DEBUG
	&Trace_locks
#else
	&Dummy_trace
#endif
};

static const LockMethodData user_lockmethod = {
	AccessExclusiveLock,		/* highest valid lock mode number */
	LockConflicts,
	lock_mode_names,
#ifdef LOCK_DEBUG
	&Trace_userlocks
#else
	&Dummy_trace
#endif
};

/*
 * map from lock method id to the lock table data structures
 */
static const LockMethod LockMethods[] = {
	NULL,
	&default_lockmethod,
	&user_lockmethod
};


/* Record that's written to 2PC state file when a lock is persisted */
typedef struct TwoPhaseLockRecord
{
	LOCKTAG		locktag;
	LOCKMODE	lockmode;
} TwoPhaseLockRecord;


164 165 166 167 168 169
/*
 * Count of the number of fast path lock slots we believe to be used.  This
 * might be higher than the real number if another backend has transferred
 * our locks to the primary lock table, but it can never be lower than the
 * real value, since only we can acquire locks on our own behalf.
 */
170
static int	FastPathLocalUseCount = 0;
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191

/* Macros for manipulating proc->fpLockBits */
#define FAST_PATH_BITS_PER_SLOT			3
#define FAST_PATH_LOCKNUMBER_OFFSET		1
#define FAST_PATH_MASK					((1 << FAST_PATH_BITS_PER_SLOT) - 1)
#define FAST_PATH_GET_BITS(proc, n) \
	(((proc)->fpLockBits >> (FAST_PATH_BITS_PER_SLOT * n)) & FAST_PATH_MASK)
#define FAST_PATH_BIT_POSITION(n, l) \
	(AssertMacro((l) >= FAST_PATH_LOCKNUMBER_OFFSET), \
	 AssertMacro((l) < FAST_PATH_BITS_PER_SLOT+FAST_PATH_LOCKNUMBER_OFFSET), \
	 AssertMacro((n) < FP_LOCK_SLOTS_PER_BACKEND), \
	 ((l) - FAST_PATH_LOCKNUMBER_OFFSET + FAST_PATH_BITS_PER_SLOT * (n)))
#define FAST_PATH_SET_LOCKMODE(proc, n, l) \
	 (proc)->fpLockBits |= UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l)
#define FAST_PATH_CLEAR_LOCKMODE(proc, n, l) \
	 (proc)->fpLockBits &= ~(UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l))
#define FAST_PATH_CHECK_LOCKMODE(proc, n, l) \
	 ((proc)->fpLockBits & (UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l)))

/*
 * The fast-path lock mechanism is concerned only with relation locks on
B
Bruce Momjian 已提交
192
 * unshared relations by backends bound to a database.  The fast-path
193 194 195 196 197
 * mechanism exists mostly to accelerate acquisition and release of locks
 * that rarely conflict.  Because ShareUpdateExclusiveLock is
 * self-conflicting, it can't use the fast-path mechanism; but it also does
 * not conflict with any of the locks that do, so we can ignore it completely.
 */
198
#define EligibleForRelationFastPath(locktag, mode) \
199 200 201
	((locktag)->locktag_lockmethodid == DEFAULT_LOCKMETHOD && \
	(locktag)->locktag_type == LOCKTAG_RELATION && \
	(locktag)->locktag_field1 == MyDatabaseId && \
202 203 204 205 206 207 208
	MyDatabaseId != InvalidOid && \
	(mode) < ShareUpdateExclusiveLock)
#define ConflictsWithRelationFastPath(locktag, mode) \
	((locktag)->locktag_lockmethodid == DEFAULT_LOCKMETHOD && \
	(locktag)->locktag_type == LOCKTAG_RELATION && \
	(locktag)->locktag_field1 != InvalidOid && \
	(mode) > ShareUpdateExclusiveLock)
209

210 211 212
static bool FastPathGrantRelationLock(Oid relid, LOCKMODE lockmode);
static bool FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode);
static bool FastPathTransferRelationLocks(LockMethod lockMethodTable,
213
							  const LOCKTAG *locktag, uint32 hashcode);
214
static PROCLOCK *FastPathGetRelationLockEntry(LOCALLOCK *locallock);
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238

/*
 * To make the fast-path lock mechanism work, we must have some way of
 * preventing the use of the fast-path when a conflicting lock might be
 * present.  We partition* the locktag space into FAST_PATH_HASH_BUCKETS
 * partitions, and maintain an integer count of the number of "strong" lockers
 * in each partition.  When any "strong" lockers are present (which is
 * hopefully not very often), the fast-path mechanism can't be used, and we
 * must fall back to the slower method of pushing matching locks directly
 * into the main lock tables.
 *
 * The deadlock detector does not know anything about the fast path mechanism,
 * so any locks that might be involved in a deadlock must be transferred from
 * the fast-path queues to the main lock table.
 */

#define FAST_PATH_STRONG_LOCK_HASH_BITS			10
#define FAST_PATH_STRONG_LOCK_HASH_PARTITIONS \
	(1 << FAST_PATH_STRONG_LOCK_HASH_BITS)
#define FastPathStrongLockHashPartition(hashcode) \
	((hashcode) % FAST_PATH_STRONG_LOCK_HASH_PARTITIONS)

typedef struct
{
239 240
	slock_t		mutex;
	uint32		count[FAST_PATH_STRONG_LOCK_HASH_PARTITIONS];
241
} FastPathStrongRelationLockData;
242

243
static volatile FastPathStrongRelationLockData *FastPathStrongRelationLocks;
244

245 246

/*
247 248 249 250
 * Pointers to hash tables containing lock state
 *
 * The LockMethodLockHash and LockMethodProcLockHash hash tables are in
 * shared memory; LockMethodLocalHash is local to each backend.
251
 */
252 253
static HTAB *LockMethodLockHash;
static HTAB *LockMethodProcLockHash;
254 255 256
static HTAB *LockMethodLocalHash;


257 258
/* private state for error cleanup */
static LOCALLOCK *StrongLockInProgress;
259 260 261
static LOCALLOCK *awaitedLock;
static ResourceOwner awaitedOwner;

262 263 264 265 266 267

#ifdef LOCK_DEBUG

/*------
 * The following configuration options are available for lock debugging:
 *
B
Bruce Momjian 已提交
268 269 270 271 272 273
 *	   TRACE_LOCKS		-- give a bunch of output what's going on in this file
 *	   TRACE_USERLOCKS	-- same but for user locks
 *	   TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
 *						   (use to avoid output on system tables)
 *	   TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
 *	   DEBUG_DEADLOCKS	-- currently dumps locks at untimely occasions ;)
B
Bruce Momjian 已提交
274
 *
275 276
 * Furthermore, but in storage/lmgr/lwlock.c:
 *	   TRACE_LWLOCKS	-- trace lightweight locks (pretty useless)
277
 *
B
Bruce Momjian 已提交
278 279
 * Define LOCK_DEBUG at compile time to get all these enabled.
 * --------
280 281
 */

282
int			Trace_lock_oidmin = FirstNormalObjectId;
B
Bruce Momjian 已提交
283
bool		Trace_locks = false;
284
bool		Trace_userlocks = false;
B
Bruce Momjian 已提交
285 286
int			Trace_lock_table = 0;
bool		Debug_deadlocks = false;
287 288 289


inline static bool
290
LOCK_DEBUG_ENABLED(const LOCKTAG *tag)
291
{
B
Bruce Momjian 已提交
292
	return
293 294 295 296
		(*(LockMethods[tag->locktag_lockmethodid]->trace_flag) &&
		 ((Oid) tag->locktag_field2 >= (Oid) Trace_lock_oidmin))
		|| (Trace_lock_table &&
			(tag->locktag_field2 == Trace_lock_table));
297 298 299 300
}


inline static void
B
Bruce Momjian 已提交
301
LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
302
{
303
	if (LOCK_DEBUG_ENABLED(&lock->tag))
304
		elog(LOG,
305
			 "%s: lock(%p) id(%u,%u,%u,%u,%u,%u) grantMask(%x) "
B
Bruce Momjian 已提交
306 307
			 "req(%d,%d,%d,%d,%d,%d,%d)=%d "
			 "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
308
			 where, lock,
309 310 311 312
			 lock->tag.locktag_field1, lock->tag.locktag_field2,
			 lock->tag.locktag_field3, lock->tag.locktag_field4,
			 lock->tag.locktag_type, lock->tag.locktag_lockmethodid,
			 lock->grantMask,
B
Bruce Momjian 已提交
313
			 lock->requested[1], lock->requested[2], lock->requested[3],
314 315
			 lock->requested[4], lock->requested[5], lock->requested[6],
			 lock->requested[7], lock->nRequested,
B
Bruce Momjian 已提交
316 317 318
			 lock->granted[1], lock->granted[2], lock->granted[3],
			 lock->granted[4], lock->granted[5], lock->granted[6],
			 lock->granted[7], lock->nGranted,
319 320
			 lock->waitProcs.size,
			 LockMethods[LOCK_LOCKMETHOD(*lock)]->lockModeNames[type]);
321 322 323 324
}


inline static void
325
PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
326
{
327
	if (LOCK_DEBUG_ENABLED(&proclockP->tag.myLock->tag))
328
		elog(LOG,
329 330
			 "%s: proclock(%p) lock(%p) method(%u) proc(%p) hold(%x)",
			 where, proclockP, proclockP->tag.myLock,
331
			 PROCLOCK_LOCKMETHOD(*(proclockP)),
332
			 proclockP->tag.myProc, (int) proclockP->holdMask);
333
}
B
Bruce Momjian 已提交
334
#else							/* not LOCK_DEBUG */
335

336 337
#define LOCK_PRINT(where, lock, type)  ((void) 0)
#define PROCLOCK_PRINT(where, proclockP)  ((void) 0)
338
#endif   /* not LOCK_DEBUG */
339

340

341
static uint32 proclock_hash(const void *key, Size keysize);
342
static void RemoveLocalLock(LOCALLOCK *locallock);
343
static PROCLOCK *SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
344
				 const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode);
345
static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
346 347
static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode);
static void FinishStrongLockAcquire(void);
348
static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
349
static void ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock);
350
static void LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent);
351
static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
B
Bruce Momjian 已提交
352
			PROCLOCK *proclock, LockMethod lockMethodTable);
353
static void CleanUpLock(LOCK *lock, PROCLOCK *proclock,
354
			LockMethod lockMethodTable, uint32 hashcode,
355
			bool wakeupNeeded);
356 357 358
static void LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
					 LOCKTAG *locktag, LOCKMODE lockmode,
					 bool decrement_strong_lock_count);
359

360

B
Bruce Momjian 已提交
361
/*
362 363 364 365 366 367 368 369 370 371
 * InitLocks -- Initialize the lock manager's data structures.
 *
 * This is called from CreateSharedMemoryAndSemaphores(), which see for
 * more comments.  In the normal postmaster case, the shared hash tables
 * are created here, as well as a locallock hash table that will remain
 * unused and empty in the postmaster itself.  Backends inherit the pointers
 * to the shared tables via fork(), and also inherit an image of the locallock
 * hash table, which they proceed to use.  In the EXEC_BACKEND case, each
 * backend re-executes this code to obtain pointers to the already existing
 * shared hash tables and to create its locallock hash table.
372 373
 */
void
374
InitLocks(void)
375
{
376
	HASHCTL		info;
377 378
	long		init_table_size,
				max_table_size;
379
	bool		found;
380

381 382 383 384
	/*
	 * Compute init/max size to request for lock hashtables.  Note these
	 * calculations must agree with LockShmemSize!
	 */
385
	max_table_size = NLOCKENTS();
386
	init_table_size = max_table_size / 2;
387

B
Bruce Momjian 已提交
388
	/*
B
Bruce Momjian 已提交
389 390
	 * Allocate hash table for LOCK structs.  This stores per-locked-object
	 * information.
391
	 */
392
	MemSet(&info, 0, sizeof(info));
393 394
	info.keysize = sizeof(LOCKTAG);
	info.entrysize = sizeof(LOCK);
395
	info.num_partitions = NUM_LOCK_PARTITIONS;
396

397 398 399 400
	LockMethodLockHash = ShmemInitHash("LOCK hash",
									   init_table_size,
									   max_table_size,
									   &info,
401
									HASH_ELEM | HASH_BLOBS | HASH_PARTITION);
402

403 404 405
	/* Assume an average of 2 holders per lock */
	max_table_size *= 2;
	init_table_size *= 2;
406

B
Bruce Momjian 已提交
407
	/*
408
	 * Allocate hash table for PROCLOCK structs.  This stores
409
	 * per-lock-per-holder information.
410
	 */
B
Bruce Momjian 已提交
411 412
	info.keysize = sizeof(PROCLOCKTAG);
	info.entrysize = sizeof(PROCLOCK);
413 414 415 416 417 418 419
	info.hash = proclock_hash;
	info.num_partitions = NUM_LOCK_PARTITIONS;

	LockMethodProcLockHash = ShmemInitHash("PROCLOCK hash",
										   init_table_size,
										   max_table_size,
										   &info,
420
								 HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
421

422 423 424
	/*
	 * Allocate fast-path structures.
	 */
425 426
	FastPathStrongRelationLocks =
		ShmemInitStruct("Fast Path Strong Relation Lock Data",
427
						sizeof(FastPathStrongRelationLockData), &found);
428
	if (!found)
429
		SpinLockInit(&FastPathStrongRelationLocks->mutex);
430

431
	/*
B
Bruce Momjian 已提交
432 433
	 * Allocate non-shared hash table for LOCALLOCK structs.  This stores lock
	 * counts and resource owner information.
434
	 *
435 436 437 438
	 * The non-shared table could already exist in this process (this occurs
	 * when the postmaster is recreating shared memory after a backend crash).
	 * If so, delete and recreate it.  (We could simply leave it, since it
	 * ought to be empty in the postmaster, but for safety let's zap it.)
439
	 */
440 441
	if (LockMethodLocalHash)
		hash_destroy(LockMethodLocalHash);
442 443 444 445

	info.keysize = sizeof(LOCALLOCKTAG);
	info.entrysize = sizeof(LOCALLOCK);

446
	LockMethodLocalHash = hash_create("LOCALLOCK hash",
447
									  16,
448
									  &info,
449
									  HASH_ELEM | HASH_BLOBS);
450 451
}

452

453
/*
454
 * Fetch the lock method table associated with a given lock
455
 */
456 457
LockMethod
GetLocksMethodTable(const LOCK *lock)
458
{
459
	LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock);
460

461 462
	Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
	return LockMethods[lockmethodid];
463 464
}

465

466
/*
467
 * Compute the hash code associated with a LOCKTAG.
468
 *
469 470 471 472
 * To avoid unnecessary recomputations of the hash code, we try to do this
 * just once per function, and then pass it around as needed.  Aside from
 * passing the hashcode to hash_search_with_hash_value(), we can extract
 * the lock partition number from the hashcode.
473
 */
474 475
uint32
LockTagHashCode(const LOCKTAG *locktag)
476
{
477 478
	return get_hash_value(LockMethodLockHash, (const void *) locktag);
}
479

480 481 482 483 484 485 486 487 488 489 490 491 492 493 494
/*
 * Compute the hash code associated with a PROCLOCKTAG.
 *
 * Because we want to use just one set of partition locks for both the
 * LOCK and PROCLOCK hash tables, we have to make sure that PROCLOCKs
 * fall into the same partition number as their associated LOCKs.
 * dynahash.c expects the partition number to be the low-order bits of
 * the hash code, and therefore a PROCLOCKTAG's hash code must have the
 * same low-order bits as the associated LOCKTAG's hash code.  We achieve
 * this with this specialized hash function.
 */
static uint32
proclock_hash(const void *key, Size keysize)
{
	const PROCLOCKTAG *proclocktag = (const PROCLOCKTAG *) key;
B
Bruce Momjian 已提交
495 496
	uint32		lockhash;
	Datum		procptr;
497 498 499 500 501 502 503 504 505

	Assert(keysize == sizeof(PROCLOCKTAG));

	/* Look into the associated LOCK object, and compute its hash code */
	lockhash = LockTagHashCode(&proclocktag->myLock->tag);

	/*
	 * To make the hash code also depend on the PGPROC, we xor the proc
	 * struct's address into the hash code, left-shifted so that the
B
Bruce Momjian 已提交
506 507 508
	 * partition-number bits don't change.  Since this is only a hash, we
	 * don't care if we lose high-order bits of the address; use an
	 * intermediate variable to suppress cast-pointer-to-int warnings.
509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524
	 */
	procptr = PointerGetDatum(proclocktag->myProc);
	lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS;

	return lockhash;
}

/*
 * Compute the hash code associated with a PROCLOCKTAG, given the hashcode
 * for its underlying LOCK.
 *
 * We use this just to avoid redundant calls of LockTagHashCode().
 */
static inline uint32
ProcLockHashCode(const PROCLOCKTAG *proclocktag, uint32 hashcode)
{
B
Bruce Momjian 已提交
525 526
	uint32		lockhash = hashcode;
	Datum		procptr;
527 528 529 530 531 532 533 534

	/*
	 * This must match proclock_hash()!
	 */
	procptr = PointerGetDatum(proclocktag->myProc);
	lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS;

	return lockhash;
535 536
}

537 538 539 540 541 542 543 544 545 546 547 548 549 550
/*
 * Given two lock modes, return whether they would conflict.
 */
bool
DoLockModesConflict(LOCKMODE mode1, LOCKMODE mode2)
{
	LockMethod	lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD];

	if (lockMethodTable->conflictTab[mode1] & LOCKBIT_ON(mode2))
		return true;

	return false;
}

551 552 553 554 555 556 557 558 559 560 561 562 563
/*
 * LockHasWaiters -- look up 'locktag' and check if releasing this
 *		lock would wake up other processes waiting for it.
 */
bool
LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
{
	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
	LockMethod	lockMethodTable;
	LOCALLOCKTAG localtag;
	LOCALLOCK  *locallock;
	LOCK	   *lock;
	PROCLOCK   *proclock;
564
	LWLock	   *partitionLock;
565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642
	bool		hasWaiters = false;

	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
	lockMethodTable = LockMethods[lockmethodid];
	if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
		elog(ERROR, "unrecognized lock mode: %d", lockmode);

#ifdef LOCK_DEBUG
	if (LOCK_DEBUG_ENABLED(locktag))
		elog(LOG, "LockHasWaiters: lock [%u,%u] %s",
			 locktag->locktag_field1, locktag->locktag_field2,
			 lockMethodTable->lockModeNames[lockmode]);
#endif

	/*
	 * Find the LOCALLOCK entry for this lock and lockmode
	 */
	MemSet(&localtag, 0, sizeof(localtag));		/* must clear padding */
	localtag.lock = *locktag;
	localtag.mode = lockmode;

	locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
										  (void *) &localtag,
										  HASH_FIND, NULL);

	/*
	 * let the caller print its own error message, too. Do not ereport(ERROR).
	 */
	if (!locallock || locallock->nLocks <= 0)
	{
		elog(WARNING, "you don't own a lock of type %s",
			 lockMethodTable->lockModeNames[lockmode]);
		return false;
	}

	/*
	 * Check the shared lock table.
	 */
	partitionLock = LockHashPartitionLock(locallock->hashcode);

	LWLockAcquire(partitionLock, LW_SHARED);

	/*
	 * We don't need to re-find the lock or proclock, since we kept their
	 * addresses in the locallock table, and they couldn't have been removed
	 * while we were holding a lock on them.
	 */
	lock = locallock->lock;
	LOCK_PRINT("LockHasWaiters: found", lock, lockmode);
	proclock = locallock->proclock;
	PROCLOCK_PRINT("LockHasWaiters: found", proclock);

	/*
	 * Double-check that we are actually holding a lock of the type we want to
	 * release.
	 */
	if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
	{
		PROCLOCK_PRINT("LockHasWaiters: WRONGTYPE", proclock);
		LWLockRelease(partitionLock);
		elog(WARNING, "you don't own a lock of type %s",
			 lockMethodTable->lockModeNames[lockmode]);
		RemoveLocalLock(locallock);
		return false;
	}

	/*
	 * Do the checking.
	 */
	if ((lockMethodTable->conflictTab[lockmode] & lock->waitMask) != 0)
		hasWaiters = true;

	LWLockRelease(partitionLock);

	return hasWaiters;
}

643 644
/*
 * LockAcquire -- Check for lock conflicts, sleep if conflict found,
645
 *		set lock if/when no conflicts.
646
 *
647 648 649 650 651 652
 * Inputs:
 *	locktag: unique identifier for the lockable object
 *	lockmode: lock mode to acquire
 *	sessionLock: if true, acquire lock for session not current transaction
 *	dontWait: if true, don't wait to acquire lock
 *
653 654 655 656 657 658 659 660
 * Returns one of:
 *		LOCKACQUIRE_NOT_AVAIL		lock not available, and dontWait=true
 *		LOCKACQUIRE_OK				lock successfully acquired
 *		LOCKACQUIRE_ALREADY_HELD	incremented count for lock already held
 *
 * In the normal case where dontWait=false and the caller doesn't need to
 * distinguish a freshly acquired lock from one already taken earlier in
 * this same transaction, there is no need to examine the return value.
661
 *
662 663 664 665
 * Side Effects: The lock is acquired and recorded in lock tables.
 *
 * NOTE: if we wait for the lock, there is no way to abort the wait
 * short of aborting the transaction.
666
 */
667
LockAcquireResult
668
LockAcquire(const LOCKTAG *locktag,
669 670 671
			LOCKMODE lockmode,
			bool sessionLock,
			bool dontWait)
672 673 674 675 676 677 678 679 680 681 682 683 684 685 686
{
	return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, true);
}

/*
 * LockAcquireExtended - allows us to specify additional options
 *
 * reportMemoryError specifies whether a lock request that fills the
 * lock table should generate an ERROR or not. This allows a priority
 * caller to note that the lock table is full and then begin taking
 * extreme action to reduce the number of other lock holders before
 * retrying the action.
 */
LockAcquireResult
LockAcquireExtended(const LOCKTAG *locktag,
B
Bruce Momjian 已提交
687 688 689 690
					LOCKMODE lockmode,
					bool sessionLock,
					bool dontWait,
					bool reportMemoryError)
691
{
692 693
	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
	LockMethod	lockMethodTable;
694 695 696
	LOCALLOCKTAG localtag;
	LOCALLOCK  *locallock;
	LOCK	   *lock;
697
	PROCLOCK   *proclock;
698
	bool		found;
699
	ResourceOwner owner;
700
	uint32		hashcode;
701
	LWLock	   *partitionLock;
702
	int			status;
703
	bool		log_lock = false;
704

705 706 707 708 709 710
	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
	lockMethodTable = LockMethods[lockmethodid];
	if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
		elog(ERROR, "unrecognized lock mode: %d", lockmode);

711 712
	if (RecoveryInProgress() && !InRecovery &&
		(locktag->locktag_type == LOCKTAG_OBJECT ||
B
Bruce Momjian 已提交
713
		 locktag->locktag_type == LOCKTAG_RELATION) &&
714 715 716
		lockmode > RowExclusiveLock)
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
P
Peter Eisentraut 已提交
717
				 errmsg("cannot acquire lock mode %s on database objects while recovery is in progress",
B
Bruce Momjian 已提交
718
						lockMethodTable->lockModeNames[lockmode]),
719 720
				 errhint("Only RowExclusiveLock or less can be acquired on database objects during recovery.")));

721
#ifdef LOCK_DEBUG
722 723
	if (LOCK_DEBUG_ENABLED(locktag))
		elog(LOG, "LockAcquire: lock [%u,%u] %s",
724
			 locktag->locktag_field1, locktag->locktag_field2,
725
			 lockMethodTable->lockModeNames[lockmode]);
726 727
#endif

728 729
	/* Identify owner for lock */
	if (sessionLock)
730
		owner = NULL;
731 732
	else
		owner = CurrentResourceOwner;
733 734 735 736

	/*
	 * Find or create a LOCALLOCK entry for this lock and lockmode
	 */
B
Bruce Momjian 已提交
737
	MemSet(&localtag, 0, sizeof(localtag));		/* must clear padding */
738 739 740
	localtag.lock = *locktag;
	localtag.mode = lockmode;

741
	locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
742 743 744 745 746 747 748 749 750 751
										  (void *) &localtag,
										  HASH_ENTER, &found);

	/*
	 * if it's a new locallock object, initialize it
	 */
	if (!found)
	{
		locallock->lock = NULL;
		locallock->proclock = NULL;
752
		locallock->hashcode = LockTagHashCode(&(localtag.lock));
753 754 755
		locallock->nLocks = 0;
		locallock->numLockOwners = 0;
		locallock->maxLockOwners = 8;
756
		locallock->holdsStrongLockCount = FALSE;
757
		locallock->lockOwners = NULL;	/* in case next line fails */
758 759
		locallock->lockOwners = (LOCALLOCKOWNER *)
			MemoryContextAlloc(TopMemoryContext,
B
Bruce Momjian 已提交
760
						  locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
761 762 763 764 765 766
	}
	else
	{
		/* Make sure there will be room to remember the lock */
		if (locallock->numLockOwners >= locallock->maxLockOwners)
		{
B
Bruce Momjian 已提交
767
			int			newsize = locallock->maxLockOwners * 2;
768 769 770 771 772 773 774

			locallock->lockOwners = (LOCALLOCKOWNER *)
				repalloc(locallock->lockOwners,
						 newsize * sizeof(LOCALLOCKOWNER));
			locallock->maxLockOwners = newsize;
		}
	}
775
	hashcode = locallock->hashcode;
776 777

	/*
B
Bruce Momjian 已提交
778
	 * If we already hold the lock, we can just increase the count locally.
779 780 781 782
	 */
	if (locallock->nLocks > 0)
	{
		GrantLockLocal(locallock, owner);
783
		return LOCKACQUIRE_ALREADY_HELD;
784 785
	}

786
	/*
787 788
	 * Prepare to emit a WAL record if acquisition of this lock needs to be
	 * replayed in a standby server.
789
	 *
790 791 792
	 * Here we prepare to log; after lock is acquired we'll issue log record.
	 * This arrangement simplifies error recovery in case the preparation step
	 * fails.
793
	 *
794 795 796
	 * Only AccessExclusiveLocks can conflict with lock types that read-only
	 * transactions can acquire in a standby server. Make sure this definition
	 * matches the one in GetRunningTransactionLocks().
797 798 799 800 801 802 803 804 805 806
	 */
	if (lockmode >= AccessExclusiveLock &&
		locktag->locktag_type == LOCKTAG_RELATION &&
		!RecoveryInProgress() &&
		XLogStandbyInfoActive())
	{
		LogAccessExclusiveLockPrepare();
		log_lock = true;
	}

807 808 809 810 811 812 813 814 815 816
	/*
	 * Attempt to take lock via fast path, if eligible.  But if we remember
	 * having filled up the fast path array, we don't attempt to make any
	 * further use of it until we release some locks.  It's possible that some
	 * other backend has transferred some of those locks to the shared hash
	 * table, leaving space free, but it's not worth acquiring the LWLock just
	 * to check.  It's also possible that we're acquiring a second or third
	 * lock type on a relation we have already locked using the fast-path, but
	 * for now we don't worry about that case either.
	 */
817 818
	if (EligibleForRelationFastPath(locktag, lockmode) &&
		FastPathLocalUseCount < FP_LOCK_SLOTS_PER_BACKEND)
819
	{
820 821
		uint32		fasthashcode = FastPathStrongLockHashPartition(hashcode);
		bool		acquired;
822 823

		/*
824 825
		 * LWLockAcquire acts as a memory sequencing point, so it's safe to
		 * assume that any strong locker whose increment to
826 827
		 * FastPathStrongRelationLocks->counts becomes visible after we test
		 * it has yet to begin to transfer fast-path locks.
828
		 */
829
		LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
830 831 832 833 834
		if (FastPathStrongRelationLocks->count[fasthashcode] != 0)
			acquired = false;
		else
			acquired = FastPathGrantRelationLock(locktag->locktag_field2,
												 lockmode);
835
		LWLockRelease(&MyProc->backendLock);
836
		if (acquired)
837
		{
838 839 840 841 842 843 844
			/*
			 * The locallock might contain stale pointers to some old shared
			 * objects; we MUST reset these to null before considering the
			 * lock to be acquired via fast-path.
			 */
			locallock->lock = NULL;
			locallock->proclock = NULL;
845 846
			GrantLockLocal(locallock, owner);
			return LOCKACQUIRE_OK;
847
		}
848 849 850 851 852 853 854 855 856 857
	}

	/*
	 * If this lock could potentially have been taken via the fast-path by
	 * some other backend, we must (temporarily) disable further use of the
	 * fast-path for this lock tag, and migrate any locks already taken via
	 * this method to the main lock table.
	 */
	if (ConflictsWithRelationFastPath(locktag, lockmode))
	{
858
		uint32		fasthashcode = FastPathStrongLockHashPartition(hashcode);
859 860 861 862

		BeginStrongLockAcquire(locallock, fasthashcode);
		if (!FastPathTransferRelationLocks(lockMethodTable, locktag,
										   hashcode))
863
		{
864 865 866 867 868 869 870 871
			AbortStrongLockAcquire();
			if (reportMemoryError)
				ereport(ERROR,
						(errcode(ERRCODE_OUT_OF_MEMORY),
						 errmsg("out of shared memory"),
						 errhint("You might need to increase max_locks_per_transaction.")));
			else
				return LOCKACQUIRE_NOT_AVAIL;
872 873 874
		}
	}

875
	/*
876 877 878
	 * We didn't find the lock in our LOCALLOCK table, and we didn't manage to
	 * take it via the fast-path, either, so we've got to mess with the shared
	 * lock table.
879
	 */
880
	partitionLock = LockHashPartitionLock(hashcode);
881

882
	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
883

884
	/*
885 886 887 888 889 890 891
	 * Find or create lock and proclock entries with this tag
	 *
	 * Note: if the locallock object already existed, it might have a pointer
	 * to the lock already ... but we should not assume that that pointer is
	 * valid, since a lock object with zero hold and request counts can go
	 * away anytime.  So we have to use SetupLockInTable() to recompute the
	 * lock and proclock pointers, even if they're already set.
892 893 894 895 896
	 */
	proclock = SetupLockInTable(lockMethodTable, MyProc, locktag,
								hashcode, lockmode);
	if (!proclock)
	{
897
		AbortStrongLockAcquire();
898 899 900 901 902 903 904 905 906 907 908 909 910 911 912
		LWLockRelease(partitionLock);
		if (reportMemoryError)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of shared memory"),
					 errhint("You might need to increase max_locks_per_transaction.")));
		else
			return LOCKACQUIRE_NOT_AVAIL;
	}
	locallock->proclock = proclock;
	lock = proclock->tag.myLock;
	locallock->lock = lock;

	/*
	 * If lock requested conflicts with locks requested by waiters, must join
B
Bruce Momjian 已提交
913
	 * wait queue.  Otherwise, check for conflict with already-held locks.
914 915 916 917 918 919
	 * (That's last because most complex check.)
	 */
	if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
		status = STATUS_FOUND;
	else
		status = LockCheckConflicts(lockMethodTable, lockmode,
920
									lock, proclock);
921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938

	if (status == STATUS_OK)
	{
		/* No conflict with held or previously requested locks */
		GrantLock(lock, proclock, lockmode);
		GrantLockLocal(locallock, owner);
	}
	else
	{
		Assert(status == STATUS_FOUND);

		/*
		 * We can't acquire the lock immediately.  If caller specified no
		 * blocking, remove useless table entries and return NOT_AVAIL without
		 * waiting.
		 */
		if (dontWait)
		{
939
			AbortStrongLockAcquire();
940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993
			if (proclock->holdMask == 0)
			{
				uint32		proclock_hashcode;

				proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
				SHMQueueDelete(&proclock->lockLink);
				SHMQueueDelete(&proclock->procLink);
				if (!hash_search_with_hash_value(LockMethodProcLockHash,
												 (void *) &(proclock->tag),
												 proclock_hashcode,
												 HASH_REMOVE,
												 NULL))
					elog(PANIC, "proclock table corrupted");
			}
			else
				PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
			lock->nRequested--;
			lock->requested[lockmode]--;
			LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
			Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
			Assert(lock->nGranted <= lock->nRequested);
			LWLockRelease(partitionLock);
			if (locallock->nLocks == 0)
				RemoveLocalLock(locallock);
			return LOCKACQUIRE_NOT_AVAIL;
		}

		/*
		 * Set bitmask of locks this process already holds on this object.
		 */
		MyProc->heldLocks = proclock->holdMask;

		/*
		 * Sleep till someone wakes me up.
		 */

		TRACE_POSTGRESQL_LOCK_WAIT_START(locktag->locktag_field1,
										 locktag->locktag_field2,
										 locktag->locktag_field3,
										 locktag->locktag_field4,
										 locktag->locktag_type,
										 lockmode);

		WaitOnLock(locallock, owner);

		TRACE_POSTGRESQL_LOCK_WAIT_DONE(locktag->locktag_field1,
										locktag->locktag_field2,
										locktag->locktag_field3,
										locktag->locktag_field4,
										locktag->locktag_type,
										lockmode);

		/*
		 * NOTE: do not do any material change of state between here and
B
Bruce Momjian 已提交
994
		 * return.  All required changes in locktable state must have been
995 996 997 998 999 1000 1001 1002 1003
		 * done when the lock was granted to us --- see notes in WaitOnLock.
		 */

		/*
		 * Check the proclock entry status, in case something in the ipc
		 * communication doesn't work correctly.
		 */
		if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
		{
1004
			AbortStrongLockAcquire();
1005 1006 1007 1008 1009 1010 1011 1012 1013 1014
			PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
			LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
			/* Should we retry ? */
			LWLockRelease(partitionLock);
			elog(ERROR, "LockAcquire failed");
		}
		PROCLOCK_PRINT("LockAcquire: granted", proclock);
		LOCK_PRINT("LockAcquire: granted", lock, lockmode);
	}

1015 1016 1017 1018 1019 1020
	/*
	 * Lock state is fully up-to-date now; if we error out after this, no
	 * special error cleanup is required.
	 */
	FinishStrongLockAcquire();

1021 1022 1023
	LWLockRelease(partitionLock);

	/*
1024
	 * Emit a WAL record if acquisition of this lock needs to be replayed in a
1025 1026 1027 1028 1029 1030
	 * standby server.
	 */
	if (log_lock)
	{
		/*
		 * Decode the locktag back to the original values, to avoid sending
B
Bruce Momjian 已提交
1031
		 * lots of empty bytes with every message.  See lock.h to check how a
1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043
		 * locktag is defined for LOCKTAG_RELATION
		 */
		LogAccessExclusiveLock(locktag->locktag_field1,
							   locktag->locktag_field2);
	}

	return LOCKACQUIRE_OK;
}

/*
 * Find or create LOCK and PROCLOCK objects as needed for a new lock
 * request.
1044 1045 1046 1047 1048 1049
 *
 * Returns the PROCLOCK object, or NULL if we failed to create the objects
 * for lack of shared memory.
 *
 * The appropriate partition lock must be held at entry, and will be
 * held at exit.
1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060
 */
static PROCLOCK *
SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
				 const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode)
{
	LOCK	   *lock;
	PROCLOCK   *proclock;
	PROCLOCKTAG proclocktag;
	uint32		proclock_hashcode;
	bool		found;

M
 
Marc G. Fournier 已提交
1061
	/*
1062
	 * Find or create a lock with this tag.
M
 
Marc G. Fournier 已提交
1063
	 */
1064
	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
1065
												(const void *) locktag,
1066 1067 1068
												hashcode,
												HASH_ENTER_NULL,
												&found);
1069
	if (!lock)
1070
		return NULL;
1071

B
Bruce Momjian 已提交
1072
	/*
1073
	 * if it's a new lock object, initialize it
1074 1075
	 */
	if (!found)
1076
	{
1077 1078
		lock->grantMask = 0;
		lock->waitMask = 0;
1079
		SHMQueueInit(&(lock->procLocks));
1080
		ProcQueueInit(&(lock->waitProcs));
1081 1082
		lock->nRequested = 0;
		lock->nGranted = 0;
1083 1084
		MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
		MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
M
 
Marc G. Fournier 已提交
1085
		LOCK_PRINT("LockAcquire: new", lock, lockmode);
1086 1087 1088
	}
	else
	{
M
 
Marc G. Fournier 已提交
1089
		LOCK_PRINT("LockAcquire: found", lock, lockmode);
1090 1091 1092
		Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
		Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
		Assert(lock->nGranted <= lock->nRequested);
1093
	}
1094

B
Bruce Momjian 已提交
1095
	/*
1096
	 * Create the hash key for the proclock table.
1097
	 */
1098
	proclocktag.myLock = lock;
1099
	proclocktag.myProc = proc;
1100 1101

	proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
1102

M
 
Marc G. Fournier 已提交
1103
	/*
1104
	 * Find or create a proclock entry with this tag
M
 
Marc G. Fournier 已提交
1105
	 */
1106 1107 1108 1109 1110
	proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
														(void *) &proclocktag,
														proclock_hashcode,
														HASH_ENTER_NULL,
														&found);
1111
	if (!proclock)
1112
	{
1113 1114 1115 1116 1117 1118 1119 1120 1121 1122
		/* Ooops, not enough shmem for the proclock */
		if (lock->nRequested == 0)
		{
			/*
			 * There are no other requestors of this lock, so garbage-collect
			 * the lock object.  We *must* do this to avoid a permanent leak
			 * of shared memory, because there won't be anything to cause
			 * anyone to release the lock object later.
			 */
			Assert(SHMQueueEmpty(&(lock->procLocks)));
1123 1124 1125 1126 1127
			if (!hash_search_with_hash_value(LockMethodLockHash,
											 (void *) &(lock->tag),
											 hashcode,
											 HASH_REMOVE,
											 NULL))
1128
				elog(PANIC, "lock table corrupted");
1129
		}
1130
		return NULL;
1131
	}
M
 
Marc G. Fournier 已提交
1132 1133

	/*
1134
	 * If new, initialize the new entry
M
 
Marc G. Fournier 已提交
1135
	 */
1136
	if (!found)
1137
	{
1138 1139
		uint32		partition = LockHashPartition(hashcode);

1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151
		/*
		 * It might seem unsafe to access proclock->groupLeader without a lock,
		 * but it's not really.  Either we are initializing a proclock on our
		 * own behalf, in which case our group leader isn't changing because
		 * the group leader for a process can only ever be changed by the
		 * process itself; or else we are transferring a fast-path lock to the
		 * main lock table, in which case that process can't change it's lock
		 * group leader without first releasing all of its locks (and in
		 * particular the one we are currently transferring).
		 */
		proclock->groupLeader = proc->lockGroupLeader != NULL ?
			proc->lockGroupLeader : proc;
1152
		proclock->holdMask = 0;
1153
		proclock->releaseMask = 0;
1154
		/* Add proclock to appropriate lists */
1155
		SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
1156
		SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
1157
							 &proclock->procLink);
1158
		PROCLOCK_PRINT("LockAcquire: new", proclock);
1159 1160 1161
	}
	else
	{
1162
		PROCLOCK_PRINT("LockAcquire: found", proclock);
1163
		Assert((proclock->holdMask & ~lock->grantMask) == 0);
1164

1165
#ifdef CHECK_DEADLOCK_RISK
B
Bruce Momjian 已提交
1166

1167
		/*
B
Bruce Momjian 已提交
1168 1169 1170 1171 1172
		 * Issue warning if we already hold a lower-level lock on this object
		 * and do not hold a lock of the requested level or higher. This
		 * indicates a deadlock-prone coding practice (eg, we'd have a
		 * deadlock if another backend were following the same code path at
		 * about the same time).
1173
		 *
B
Bruce Momjian 已提交
1174 1175 1176
		 * This is not enabled by default, because it may generate log entries
		 * about user-level coding practices that are in fact safe in context.
		 * It can be enabled to help find system-level problems.
1177
		 *
B
Bruce Momjian 已提交
1178 1179
		 * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
		 * better to use a table.  For now, though, this works.
1180 1181
		 */
		{
B
Bruce Momjian 已提交
1182
			int			i;
1183 1184

			for (i = lockMethodTable->numLockModes; i > 0; i--)
1185
			{
1186 1187 1188
				if (proclock->holdMask & LOCKBIT_ON(i))
				{
					if (i >= (int) lockmode)
B
Bruce Momjian 已提交
1189
						break;	/* safe: we have a lock >= req level */
1190 1191
					elog(LOG, "deadlock risk: raising lock level"
						 " from %s to %s on object %u/%u/%u",
1192 1193
						 lockMethodTable->lockModeNames[i],
						 lockMethodTable->lockModeNames[lockmode],
1194 1195 1196 1197
						 lock->tag.locktag_field1, lock->tag.locktag_field2,
						 lock->tag.locktag_field3);
					break;
				}
1198 1199
			}
		}
1200
#endif   /* CHECK_DEADLOCK_RISK */
1201
	}
1202

B
Bruce Momjian 已提交
1203
	/*
1204
	 * lock->nRequested and lock->requested[] count the total number of
B
Bruce Momjian 已提交
1205 1206
	 * requests, whether granted or waiting, so increment those immediately.
	 * The other counts don't increment till we get the lock.
1207
	 */
1208 1209 1210
	lock->nRequested++;
	lock->requested[lockmode]++;
	Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
1211

B
Bruce Momjian 已提交
1212
	/*
B
Bruce Momjian 已提交
1213 1214
	 * We shouldn't already hold the desired lock; else locallock table is
	 * broken.
1215
	 */
1216 1217
	if (proclock->holdMask & LOCKBIT_ON(lockmode))
		elog(ERROR, "lock %s on object %u/%u/%u is already held",
1218
			 lockMethodTable->lockModeNames[lockmode],
1219 1220
			 lock->tag.locktag_field1, lock->tag.locktag_field2,
			 lock->tag.locktag_field3);
1221

1222
	return proclock;
1223 1224
}

1225 1226 1227 1228 1229 1230
/*
 * Subroutine to free a locallock entry
 */
static void
RemoveLocalLock(LOCALLOCK *locallock)
{
B
Bruce Momjian 已提交
1231
	int			i;
1232 1233 1234 1235 1236 1237

	for (i = locallock->numLockOwners - 1; i >= 0; i--)
	{
		if (locallock->lockOwners[i].owner != NULL)
			ResourceOwnerForgetLock(locallock->lockOwners[i].owner, locallock);
	}
1238 1239 1240
	locallock->numLockOwners = 0;
	if (locallock->lockOwners != NULL)
		pfree(locallock->lockOwners);
1241
	locallock->lockOwners = NULL;
1242

1243 1244
	if (locallock->holdsStrongLockCount)
	{
1245 1246
		uint32		fasthashcode;

1247 1248
		fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode);

1249 1250 1251
		SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
		Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0);
		FastPathStrongRelationLocks->count[fasthashcode]--;
1252
		locallock->holdsStrongLockCount = FALSE;
1253
		SpinLockRelease(&FastPathStrongRelationLocks->mutex);
1254
	}
1255

1256
	if (!hash_search(LockMethodLocalHash,
1257 1258
					 (void *) &(locallock->tag),
					 HASH_REMOVE, NULL))
1259 1260 1261
		elog(WARNING, "locallock table corrupted");
}

B
Bruce Momjian 已提交
1262
/*
1263 1264 1265 1266
 * LockCheckConflicts -- test whether requested lock conflicts
 *		with those already granted
 *
 * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
1267 1268
 *
 * NOTES:
1269
 *		Here's what makes this complicated: one process's locks don't
1270
 * conflict with one another, no matter what purpose they are held for
1271 1272 1273 1274
 * (eg, session and transaction locks do not conflict).  Nor do the locks
 * of one process in a lock group conflict with those of another process in
 * the same group.  So, we must subtract off these locks when determining
 * whether the requested new lock conflicts with those already held.
1275 1276
 */
int
1277
LockCheckConflicts(LockMethod lockMethodTable,
1278 1279
				   LOCKMODE lockmode,
				   LOCK *lock,
1280
				   PROCLOCK *proclock)
1281
{
B
Bruce Momjian 已提交
1282
	int			numLockModes = lockMethodTable->numLockModes;
1283
	LOCKMASK	myLocks;
1284 1285 1286
	int			conflictMask = lockMethodTable->conflictTab[lockmode];
	int			conflictsRemaining[MAX_LOCKMODES];
	int			totalConflictsRemaining = 0;
1287
	int			i;
1288 1289
	SHM_QUEUE  *procLocks;
	PROCLOCK   *otherproclock;
1290

B
Bruce Momjian 已提交
1291
	/*
B
Bruce Momjian 已提交
1292 1293
	 * first check for global conflicts: If no locks conflict with my request,
	 * then I get the lock.
1294
	 *
1295 1296 1297 1298
	 * Checking for conflict: lock->grantMask represents the types of
	 * currently held locks.  conflictTable[lockmode] has a bit set for each
	 * type of lock that conflicts with request.   Bitwise compare tells if
	 * there is a conflict.
1299
	 */
1300
	if (!(conflictMask & lock->grantMask))
1301
	{
1302
		PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock);
1303
		return STATUS_OK;
1304
	}
1305

B
Bruce Momjian 已提交
1306
	/*
1307 1308 1309 1310
	 * Rats.  Something conflicts.  But it could still be my own lock, or
	 * a lock held by another member of my locking group.  First, figure out
	 * how many conflicts remain after subtracting out any locks I hold
	 * myself.
1311
	 */
1312
	myLocks = proclock->holdMask;
1313
	for (i = 1; i <= numLockModes; i++)
1314
	{
1315 1316 1317 1318 1319 1320 1321 1322 1323 1324
		if ((conflictMask & LOCKBIT_ON(i)) == 0)
		{
			conflictsRemaining[i] = 0;
			continue;
		}
		conflictsRemaining[i] = lock->granted[i];
		if (myLocks & LOCKBIT_ON(i))
			--conflictsRemaining[i];
		totalConflictsRemaining += conflictsRemaining[i];
	}
1325

1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339
	/* If no conflicts remain, we get the lock. */
	if (totalConflictsRemaining == 0)
	{
		PROCLOCK_PRINT("LockCheckConflicts: resolved (simple)", proclock);
		return STATUS_OK;
	}

	/* If no group locking, it's definitely a conflict. */
	if (proclock->groupLeader == MyProc && MyProc->lockGroupLeader == NULL)
	{
		Assert(proclock->tag.myProc == MyProc);
		PROCLOCK_PRINT("LockCheckConflicts: conflicting (simple)",
					   proclock);
		return STATUS_FOUND;
1340
	}
1341

B
Bruce Momjian 已提交
1342
	/*
1343 1344 1345 1346 1347
	 * Locks held in conflicting modes by members of our own lock group are
	 * not real conflicts; we can subtract those out and see if we still have
	 * a conflict.  This is O(N) in the number of processes holding or awaiting
	 * locks on this object.  We could improve that by making the shared memory
	 * state more complex (and larger) but it doesn't seem worth it.
1348
	 */
1349 1350 1351 1352
	procLocks = &(lock->procLocks);
	otherproclock = (PROCLOCK *)
		SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink));
	while (otherproclock != NULL)
1353
	{
1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380
		if (proclock != otherproclock &&
			proclock->groupLeader == otherproclock->groupLeader &&
			(otherproclock->holdMask & conflictMask) != 0)
		{
			int	intersectMask = otherproclock->holdMask & conflictMask;

			for (i = 1; i <= numLockModes; i++)
			{
				if ((intersectMask & LOCKBIT_ON(i)) != 0)
				{
					if (conflictsRemaining[i] <= 0)
						elog(PANIC, "proclocks held do not match lock");
					conflictsRemaining[i]--;
					totalConflictsRemaining--;
				}
			}

			if (totalConflictsRemaining == 0)
			{
				PROCLOCK_PRINT("LockCheckConflicts: resolved (group)",
							   proclock);
				return STATUS_OK;
			}
		}
		otherproclock = (PROCLOCK *)
			SHMQueueNext(procLocks, &otherproclock->lockLink,
						 offsetof(PROCLOCK, lockLink));
1381
	}
1382

1383 1384
	/* Nope, it's a real conflict. */
	PROCLOCK_PRINT("LockCheckConflicts: conflicting (group)", proclock);
1385
	return STATUS_FOUND;
1386 1387
}

1388
/*
1389
 * GrantLock -- update the lock and proclock data structures to show
1390
 *		the lock request has been granted.
1391 1392
 *
 * NOTE: if proc was blocked, it also needs to be removed from the wait list
1393
 * and have its waitLock/waitProcLock fields cleared.  That's not done here.
1394
 *
1395 1396 1397
 * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK
 * table entry; but since we may be awaking some other process, we can't do
 * that here; it's done by GrantLockLocal, instead.
M
 
Marc G. Fournier 已提交
1398 1399
 */
void
1400
GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
M
 
Marc G. Fournier 已提交
1401
{
1402 1403
	lock->nGranted++;
	lock->granted[lockmode]++;
1404
	lock->grantMask |= LOCKBIT_ON(lockmode);
1405
	if (lock->granted[lockmode] == lock->requested[lockmode])
1406
		lock->waitMask &= LOCKBIT_OFF(lockmode);
1407
	proclock->holdMask |= LOCKBIT_ON(lockmode);
M
 
Marc G. Fournier 已提交
1408
	LOCK_PRINT("GrantLock", lock, lockmode);
1409 1410
	Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
	Assert(lock->nGranted <= lock->nRequested);
1411 1412
}

1413
/*
B
Bruce Momjian 已提交
1414
 * UnGrantLock -- opposite of GrantLock.
1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425
 *
 * Updates the lock and proclock data structures to show that the lock
 * is no longer held nor requested by the current holder.
 *
 * Returns true if there were any waiters waiting on the lock that
 * should now be woken up with ProcLockWakeup.
 */
static bool
UnGrantLock(LOCK *lock, LOCKMODE lockmode,
			PROCLOCK *proclock, LockMethod lockMethodTable)
{
B
Bruce Momjian 已提交
1426
	bool		wakeupNeeded = false;
1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448

	Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
	Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
	Assert(lock->nGranted <= lock->nRequested);

	/*
	 * fix the general lock stats
	 */
	lock->nRequested--;
	lock->requested[lockmode]--;
	lock->nGranted--;
	lock->granted[lockmode]--;

	if (lock->granted[lockmode] == 0)
	{
		/* change the conflict mask.  No more of this lock type. */
		lock->grantMask &= LOCKBIT_OFF(lockmode);
	}

	LOCK_PRINT("UnGrantLock: updated", lock, lockmode);

	/*
B
Bruce Momjian 已提交
1449
	 * We need only run ProcLockWakeup if the released lock conflicts with at
B
Bruce Momjian 已提交
1450
	 * least one of the lock types requested by waiter(s).  Otherwise whatever
B
Bruce Momjian 已提交
1451 1452 1453 1454 1455
	 * conflict made them wait must still exist.  NOTE: before MVCC, we could
	 * skip wakeup if lock->granted[lockmode] was still positive. But that's
	 * not true anymore, because the remaining granted locks might belong to
	 * some waiter, who could now be awakened because he doesn't conflict with
	 * his own locks.
1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468
	 */
	if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
		wakeupNeeded = true;

	/*
	 * Now fix the per-proclock state.
	 */
	proclock->holdMask &= LOCKBIT_OFF(lockmode);
	PROCLOCK_PRINT("UnGrantLock: updated", proclock);

	return wakeupNeeded;
}

1469
/*
B
Bruce Momjian 已提交
1470
 * CleanUpLock -- clean up after releasing a lock.  We garbage-collect the
1471 1472 1473 1474 1475
 * proclock and lock objects if possible, and call ProcLockWakeup if there
 * are remaining requests and the caller says it's OK.  (Normally, this
 * should be called after UnGrantLock, and wakeupNeeded is the result from
 * UnGrantLock.)
 *
1476
 * The appropriate partition lock must be held at entry, and will be
1477 1478 1479
 * held at exit.
 */
static void
1480
CleanUpLock(LOCK *lock, PROCLOCK *proclock,
1481
			LockMethod lockMethodTable, uint32 hashcode,
1482 1483 1484
			bool wakeupNeeded)
{
	/*
B
Bruce Momjian 已提交
1485 1486
	 * If this was my last hold on this lock, delete my entry in the proclock
	 * table.
1487 1488 1489
	 */
	if (proclock->holdMask == 0)
	{
1490 1491
		uint32		proclock_hashcode;

1492 1493 1494
		PROCLOCK_PRINT("CleanUpLock: deleting", proclock);
		SHMQueueDelete(&proclock->lockLink);
		SHMQueueDelete(&proclock->procLink);
1495 1496 1497 1498 1499 1500
		proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
		if (!hash_search_with_hash_value(LockMethodProcLockHash,
										 (void *) &(proclock->tag),
										 proclock_hashcode,
										 HASH_REMOVE,
										 NULL))
1501 1502 1503 1504 1505 1506
			elog(PANIC, "proclock table corrupted");
	}

	if (lock->nRequested == 0)
	{
		/*
B
Bruce Momjian 已提交
1507 1508
		 * The caller just released the last lock, so garbage-collect the lock
		 * object.
1509 1510 1511
		 */
		LOCK_PRINT("CleanUpLock: deleting", lock, 0);
		Assert(SHMQueueEmpty(&(lock->procLocks)));
1512 1513 1514 1515 1516
		if (!hash_search_with_hash_value(LockMethodLockHash,
										 (void *) &(lock->tag),
										 hashcode,
										 HASH_REMOVE,
										 NULL))
1517 1518 1519 1520 1521
			elog(PANIC, "lock table corrupted");
	}
	else if (wakeupNeeded)
	{
		/* There are waiters on this lock, so wake them up. */
1522
		ProcLockWakeup(lockMethodTable, lock);
1523 1524 1525
	}
}

1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536
/*
 * GrantLockLocal -- update the locallock data structures to show
 *		the lock request has been granted.
 *
 * We expect that LockAcquire made sure there is room to add a new
 * ResourceOwner entry.
 */
static void
GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
{
	LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
B
Bruce Momjian 已提交
1537
	int			i;
1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553

	Assert(locallock->numLockOwners < locallock->maxLockOwners);
	/* Count the total */
	locallock->nLocks++;
	/* Count the per-owner lock */
	for (i = 0; i < locallock->numLockOwners; i++)
	{
		if (lockOwners[i].owner == owner)
		{
			lockOwners[i].nLocks++;
			return;
		}
	}
	lockOwners[i].owner = owner;
	lockOwners[i].nLocks = 1;
	locallock->numLockOwners++;
1554 1555
	if (owner != NULL)
		ResourceOwnerRememberLock(owner, locallock);
1556 1557
}

1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568
/*
 * BeginStrongLockAcquire - inhibit use of fastpath for a given LOCALLOCK,
 * and arrange for error cleanup if it fails
 */
static void
BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode)
{
	Assert(StrongLockInProgress == NULL);
	Assert(locallock->holdsStrongLockCount == FALSE);

	/*
1569 1570 1571
	 * Adding to a memory location is not atomic, so we take a spinlock to
	 * ensure we don't collide with someone else trying to bump the count at
	 * the same time.
1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600
	 *
	 * XXX: It might be worth considering using an atomic fetch-and-add
	 * instruction here, on architectures where that is supported.
	 */

	SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
	FastPathStrongRelationLocks->count[fasthashcode]++;
	locallock->holdsStrongLockCount = TRUE;
	StrongLockInProgress = locallock;
	SpinLockRelease(&FastPathStrongRelationLocks->mutex);
}

/*
 * FinishStrongLockAcquire - cancel pending cleanup for a strong lock
 * acquisition once it's no longer needed
 */
static void
FinishStrongLockAcquire(void)
{
	StrongLockInProgress = NULL;
}

/*
 * AbortStrongLockAcquire - undo strong lock state changes performed by
 * BeginStrongLockAcquire.
 */
void
AbortStrongLockAcquire(void)
{
1601
	uint32		fasthashcode;
1602
	LOCALLOCK  *locallock = StrongLockInProgress;
1603

1604 1605 1606 1607 1608 1609
	if (locallock == NULL)
		return;

	fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode);
	Assert(locallock->holdsStrongLockCount == TRUE);
	SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
1610
	Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0);
1611 1612 1613 1614 1615 1616
	FastPathStrongRelationLocks->count[fasthashcode]--;
	locallock->holdsStrongLockCount = FALSE;
	StrongLockInProgress = NULL;
	SpinLockRelease(&FastPathStrongRelationLocks->mutex);
}

1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630
/*
 * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing
 *		WaitOnLock on.
 *
 * proc.c needs this for the case where we are booted off the lock by
 * timeout, but discover that someone granted us the lock anyway.
 *
 * We could just export GrantLockLocal, but that would require including
 * resowner.h in lock.h, which creates circularity.
 */
void
GrantAwaitedLock(void)
{
	GrantLockLocal(awaitedLock, awaitedOwner);
M
 
Marc G. Fournier 已提交
1631 1632
}

1633 1634 1635
/*
 * WaitOnLock -- wait to acquire a lock
 *
1636
 * Caller must have set MyProc->heldLocks to reflect locks already held
1637
 * on the lockable object by this process.
1638
 *
1639
 * The appropriate partition lock must be held at entry.
1640
 */
1641
static void
1642
WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
1643
{
1644
	LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock);
1645
	LockMethod	lockMethodTable = LockMethods[lockmethodid];
1646
	char	   *volatile new_status = NULL;
1647

1648 1649
	LOCK_PRINT("WaitOnLock: sleeping on lock",
			   locallock->lock, locallock->tag.mode);
1650

1651
	/* Report change to waiting status */
1652 1653
	if (update_process_title)
	{
1654 1655 1656
		const char *old_status;
		int			len;

1657 1658 1659 1660 1661
		old_status = get_ps_display(&len);
		new_status = (char *) palloc(len + 8 + 1);
		memcpy(new_status, old_status, len);
		strcpy(new_status + len, " waiting");
		set_ps_display(new_status, false);
B
Bruce Momjian 已提交
1662
		new_status[len] = '\0'; /* truncate off " waiting" */
1663
	}
1664 1665
	pgstat_report_waiting(true);

1666 1667 1668
	awaitedLock = locallock;
	awaitedOwner = owner;

B
Bruce Momjian 已提交
1669
	/*
1670
	 * NOTE: Think not to put any shared-state cleanup after the call to
B
Bruce Momjian 已提交
1671 1672 1673 1674 1675 1676
	 * ProcSleep, in either the normal or failure path.  The lock state must
	 * be fully set by the lock grantor, or by CheckDeadLock if we give up
	 * waiting for the lock.  This is necessary because of the possibility
	 * that a cancel/die interrupt will interrupt ProcSleep after someone else
	 * grants us the lock, but before we've noticed it. Hence, after granting,
	 * the locktable state must fully reflect the fact that we own the lock;
1677 1678
	 * we can't do additional work on return.
	 *
1679 1680 1681
	 * We can and do use a PG_TRY block to try to clean up after failure, but
	 * this still has a major limitation: elog(FATAL) can occur while waiting
	 * (eg, a "die" interrupt), and then control won't come back here. So all
1682
	 * cleanup of essential state should happen in LockErrorCleanup, not here.
1683 1684
	 * We can use PG_TRY to clear the "waiting" status flags, since doing that
	 * is unimportant if the process exits.
1685
	 */
1686 1687 1688 1689 1690
	PG_TRY();
	{
		if (ProcSleep(locallock, lockMethodTable) != STATUS_OK)
		{
			/*
1691 1692
			 * We failed as a result of a deadlock, see CheckDeadLock(). Quit
			 * now.
1693 1694 1695 1696 1697
			 */
			awaitedLock = NULL;
			LOCK_PRINT("WaitOnLock: aborting on lock",
					   locallock->lock, locallock->tag.mode);
			LWLockRelease(LockHashPartitionLock(locallock->hashcode));
1698

1699 1700 1701 1702 1703 1704 1705 1706 1707
			/*
			 * Now that we aren't holding the partition lock, we can give an
			 * error report including details about the detected deadlock.
			 */
			DeadLockReport();
			/* not reached */
		}
	}
	PG_CATCH();
1708
	{
1709
		/* In this path, awaitedLock remains set until LockErrorCleanup */
B
Bruce Momjian 已提交
1710

1711 1712 1713 1714 1715 1716 1717 1718 1719 1720
		/* Report change to non-waiting status */
		pgstat_report_waiting(false);
		if (update_process_title)
		{
			set_ps_display(new_status, false);
			pfree(new_status);
		}

		/* and propagate the error */
		PG_RE_THROW();
1721
	}
1722
	PG_END_TRY();
1723

1724 1725
	awaitedLock = NULL;

1726
	/* Report change to non-waiting status */
1727
	pgstat_report_waiting(false);
1728 1729 1730 1731 1732
	if (update_process_title)
	{
		set_ps_display(new_status, false);
		pfree(new_status);
	}
1733

1734 1735
	LOCK_PRINT("WaitOnLock: wakeup on lock",
			   locallock->lock, locallock->tag.mode);
1736 1737
}

1738
/*
1739 1740 1741
 * Remove a proc from the wait-queue it is on (caller must know it is on one).
 * This is only used when the proc has failed to get the lock, so we set its
 * waitStatus to STATUS_ERROR.
1742
 *
1743 1744
 * Appropriate partition lock must be held by caller.  Also, caller is
 * responsible for signaling the proc if needed.
1745
 *
1746
 * NB: this does not clean up any locallock object that may exist for the lock.
1747 1748
 */
void
1749
RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode)
1750
{
B
Bruce Momjian 已提交
1751
	LOCK	   *waitLock = proc->waitLock;
1752
	PROCLOCK   *proclock = proc->waitProcLock;
B
Bruce Momjian 已提交
1753
	LOCKMODE	lockmode = proc->waitLockMode;
1754
	LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*waitLock);
1755 1756

	/* Make sure proc is waiting */
1757
	Assert(proc->waitStatus == STATUS_WAITING);
1758
	Assert(proc->links.next != NULL);
1759 1760
	Assert(waitLock);
	Assert(waitLock->waitProcs.size > 0);
1761
	Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774

	/* Remove proc from lock's wait queue */
	SHMQueueDelete(&(proc->links));
	waitLock->waitProcs.size--;

	/* Undo increments of request counts by waiting process */
	Assert(waitLock->nRequested > 0);
	Assert(waitLock->nRequested > proc->waitLock->nGranted);
	waitLock->nRequested--;
	Assert(waitLock->requested[lockmode] > 0);
	waitLock->requested[lockmode]--;
	/* don't forget to clear waitMask bit if appropriate */
	if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
1775
		waitLock->waitMask &= LOCKBIT_OFF(lockmode);
1776

1777
	/* Clean up the proc's own state, and pass it the ok/fail signal */
1778
	proc->waitLock = NULL;
1779
	proc->waitProcLock = NULL;
1780
	proc->waitStatus = STATUS_ERROR;
1781

1782 1783
	/*
	 * Delete the proclock immediately if it represents no already-held locks.
1784 1785
	 * (This must happen now because if the owner of the lock decides to
	 * release it, and the requested/granted counts then go to zero,
B
Bruce Momjian 已提交
1786 1787
	 * LockRelease expects there to be no remaining proclocks.) Then see if
	 * any other waiters for the lock can be woken up now.
1788
	 */
1789
	CleanUpLock(waitLock, proclock,
1790
				LockMethods[lockmethodid], hashcode,
1791
				true);
1792 1793
}

1794
/*
1795 1796 1797
 * LockRelease -- look up 'locktag' and release one 'lockmode' lock on it.
 *		Release a session lock if 'sessionLock' is true, else release a
 *		regular transaction lock.
1798
 *
1799 1800 1801 1802
 * Side Effects: find any waiting processes that are now wakable,
 *		grant them their requested locks and awaken them.
 *		(We have to grant the lock here to avoid a race between
 *		the waking process and any new process to
1803
 *		come along and request the lock.)
1804 1805
 */
bool
1806
LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
1807
{
1808 1809
	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
	LockMethod	lockMethodTable;
1810 1811
	LOCALLOCKTAG localtag;
	LOCALLOCK  *locallock;
1812
	LOCK	   *lock;
1813
	PROCLOCK   *proclock;
1814
	LWLock	   *partitionLock;
1815
	bool		wakeupNeeded;
1816

1817 1818 1819 1820 1821 1822
	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
	lockMethodTable = LockMethods[lockmethodid];
	if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
		elog(ERROR, "unrecognized lock mode: %d", lockmode);

1823
#ifdef LOCK_DEBUG
1824 1825
	if (LOCK_DEBUG_ENABLED(locktag))
		elog(LOG, "LockRelease: lock [%u,%u] %s",
1826
			 locktag->locktag_field1, locktag->locktag_field2,
1827
			 lockMethodTable->lockModeNames[lockmode]);
1828 1829
#endif

1830
	/*
1831
	 * Find the LOCALLOCK entry for this lock and lockmode
1832
	 */
B
Bruce Momjian 已提交
1833
	MemSet(&localtag, 0, sizeof(localtag));		/* must clear padding */
1834 1835 1836
	localtag.lock = *locktag;
	localtag.mode = lockmode;

1837
	locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
1838 1839
										  (void *) &localtag,
										  HASH_FIND, NULL);
1840

1841
	/*
B
Bruce Momjian 已提交
1842
	 * let the caller print its own error message, too. Do not ereport(ERROR).
1843
	 */
1844
	if (!locallock || locallock->nLocks <= 0)
1845
	{
1846
		elog(WARNING, "you don't own a lock of type %s",
1847
			 lockMethodTable->lockModeNames[lockmode]);
1848
		return FALSE;
1849
	}
1850

M
 
Marc G. Fournier 已提交
1851
	/*
1852
	 * Decrease the count for the resource owner.
M
 
Marc G. Fournier 已提交
1853
	 */
1854
	{
1855 1856
		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
		ResourceOwner owner;
B
Bruce Momjian 已提交
1857
		int			i;
1858

1859 1860
		/* Identify owner for lock */
		if (sessionLock)
1861
			owner = NULL;
1862 1863
		else
			owner = CurrentResourceOwner;
1864 1865 1866 1867 1868 1869 1870 1871

		for (i = locallock->numLockOwners - 1; i >= 0; i--)
		{
			if (lockOwners[i].owner == owner)
			{
				Assert(lockOwners[i].nLocks > 0);
				if (--lockOwners[i].nLocks == 0)
				{
1872 1873
					if (owner != NULL)
						ResourceOwnerForgetLock(owner, locallock);
1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885
					/* compact out unused slot */
					locallock->numLockOwners--;
					if (i < locallock->numLockOwners)
						lockOwners[i] = lockOwners[locallock->numLockOwners];
				}
				break;
			}
		}
		if (i < 0)
		{
			/* don't release a lock belonging to another owner */
			elog(WARNING, "you don't own a lock of type %s",
1886
				 lockMethodTable->lockModeNames[lockmode]);
1887 1888
			return FALSE;
		}
1889
	}
1890 1891

	/*
B
Bruce Momjian 已提交
1892
	 * Decrease the total local count.  If we're still holding the lock, we're
B
Bruce Momjian 已提交
1893
	 * done.
1894 1895 1896 1897 1898 1899
	 */
	locallock->nLocks--;

	if (locallock->nLocks > 0)
		return TRUE;

1900
	/* Attempt fast release of any lock eligible for the fast path. */
1901 1902
	if (EligibleForRelationFastPath(locktag, lockmode) &&
		FastPathLocalUseCount > 0)
1903
	{
1904
		bool		released;
1905 1906

		/*
1907 1908
		 * We might not find the lock here, even if we originally entered it
		 * here.  Another backend may have moved it to the main table.
1909
		 */
1910
		LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
1911 1912
		released = FastPathUnGrantRelationLock(locktag->locktag_field2,
											   lockmode);
1913
		LWLockRelease(&MyProc->backendLock);
1914 1915 1916 1917 1918 1919 1920
		if (released)
		{
			RemoveLocalLock(locallock);
			return TRUE;
		}
	}

1921 1922 1923
	/*
	 * Otherwise we've got to mess with the shared lock table.
	 */
1924
	partitionLock = LockHashPartitionLock(locallock->hashcode);
1925

1926
	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
1927 1928

	/*
1929 1930 1931
	 * Normally, we don't need to re-find the lock or proclock, since we kept
	 * their addresses in the locallock table, and they couldn't have been
	 * removed while we were holding a lock on them.  But it's possible that
1932 1933 1934
	 * the lock was taken fast-path and has since been moved to the main hash
	 * table by another backend, in which case we will need to look up the
	 * objects here.  We assume the lock field is NULL if so.
1935 1936
	 */
	lock = locallock->lock;
1937 1938 1939 1940
	if (!lock)
	{
		PROCLOCKTAG proclocktag;

1941
		Assert(EligibleForRelationFastPath(locktag, lockmode));
1942
		lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
1943
													(const void *) locktag,
1944 1945
													locallock->hashcode,
													HASH_FIND,
1946 1947 1948
													NULL);
		if (!lock)
			elog(ERROR, "failed to re-find shared lock object");
1949 1950 1951 1952 1953 1954
		locallock->lock = lock;

		proclocktag.myLock = lock;
		proclocktag.myProc = MyProc;
		locallock->proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash,
													   (void *) &proclocktag,
1955 1956 1957 1958
													   HASH_FIND,
													   NULL);
		if (!locallock->proclock)
			elog(ERROR, "failed to re-find shared proclock object");
1959
	}
1960 1961
	LOCK_PRINT("LockRelease: found", lock, lockmode);
	proclock = locallock->proclock;
1962
	PROCLOCK_PRINT("LockRelease: found", proclock);
M
 
Marc G. Fournier 已提交
1963 1964

	/*
B
Bruce Momjian 已提交
1965 1966
	 * Double-check that we are actually holding a lock of the type we want to
	 * release.
M
 
Marc G. Fournier 已提交
1967
	 */
1968
	if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
1969
	{
1970
		PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
1971
		LWLockRelease(partitionLock);
1972
		elog(WARNING, "you don't own a lock of type %s",
1973
			 lockMethodTable->lockModeNames[lockmode]);
1974
		RemoveLocalLock(locallock);
1975
		return FALSE;
M
 
Marc G. Fournier 已提交
1976 1977
	}

1978
	/*
1979
	 * Do the releasing.  CleanUpLock will waken any now-wakable waiters.
1980
	 */
1981
	wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
1982

1983
	CleanUpLock(lock, proclock,
1984
				lockMethodTable, locallock->hashcode,
1985
				wakeupNeeded);
1986

1987
	LWLockRelease(partitionLock);
1988 1989

	RemoveLocalLock(locallock);
1990
	return TRUE;
1991 1992
}

1993
/*
1994
 * LockReleaseAll -- Release all locks of the specified lock method that
1995
 *		are held by the current process.
1996
 *
1997
 * Well, not necessarily *all* locks.  The available behaviors are:
1998 1999
 *		allLocks == true: release all locks including session locks.
 *		allLocks == false: release all non-session locks.
2000
 */
2001
void
2002
LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
2003
{
2004
	HASH_SEQ_STATUS status;
2005
	LockMethod	lockMethodTable;
2006
	int			i,
2007
				numLockModes;
B
Bruce Momjian 已提交
2008
	LOCALLOCK  *locallock;
2009
	LOCK	   *lock;
2010 2011
	PROCLOCK   *proclock;
	int			partition;
2012
	bool		have_fast_path_lwlock = false;
2013

2014 2015 2016 2017
	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
	lockMethodTable = LockMethods[lockmethodid];

2018
#ifdef LOCK_DEBUG
2019
	if (*(lockMethodTable->trace_flag))
2020
		elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid);
2021 2022
#endif

2023
	/*
B
Bruce Momjian 已提交
2024
	 * Get rid of our fast-path VXID lock, if appropriate.  Note that this is
2025
	 * the only way that the lock we hold on our own VXID can ever get
2026 2027 2028 2029 2030 2031
	 * released: it is always and only released when a toplevel transaction
	 * ends.
	 */
	if (lockmethodid == DEFAULT_LOCKMETHOD)
		VirtualXactLockTableCleanup();

B
Bruce Momjian 已提交
2032
	numLockModes = lockMethodTable->numLockModes;
M
 
Marc G. Fournier 已提交
2033

2034 2035
	/*
	 * First we run through the locallock table and get rid of unwanted
B
Bruce Momjian 已提交
2036 2037 2038
	 * entries, then we scan the process's proclocks and get rid of those. We
	 * do this separately because we may have multiple locallock entries
	 * pointing to the same proclock, and we daren't end up with any dangling
2039 2040
	 * pointers.  Fast-path locks are cleaned up during the locallock table
	 * scan, though.
2041
	 */
2042
	hash_seq_init(&status, LockMethodLocalHash);
2043 2044 2045

	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
	{
2046 2047 2048 2049 2050 2051
		/*
		 * If the LOCALLOCK entry is unused, we must've run out of shared
		 * memory while trying to set up this lock.  Just forget the local
		 * entry.
		 */
		if (locallock->nLocks == 0)
2052
		{
2053 2054 2055
			RemoveLocalLock(locallock);
			continue;
		}
2056

2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069
		/* Ignore items that are not of the lockmethod to be removed */
		if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
			continue;

		/*
		 * If we are asked to release all locks, we can just zap the entry.
		 * Otherwise, must scan to see if there are session locks. We assume
		 * there is at most one lockOwners entry for session locks.
		 */
		if (!allLocks)
		{
			LOCALLOCKOWNER *lockOwners = locallock->lockOwners;

2070
			/* If session lock is above array position 0, move it down to 0 */
B
Bruce Momjian 已提交
2071
			for (i = 0; i < locallock->numLockOwners; i++)
2072
			{
2073 2074
				if (lockOwners[i].owner == NULL)
					lockOwners[0] = lockOwners[i];
2075 2076
				else
					ResourceOwnerForgetLock(lockOwners[i].owner, locallock);
2077 2078 2079 2080 2081 2082 2083 2084 2085 2086
			}

			if (locallock->numLockOwners > 0 &&
				lockOwners[0].owner == NULL &&
				lockOwners[0].nLocks > 0)
			{
				/* Fix the locallock to show just the session locks */
				locallock->nLocks = lockOwners[0].nLocks;
				locallock->numLockOwners = 1;
				/* We aren't deleting this locallock, so done */
2087 2088
				continue;
			}
2089 2090
			else
				locallock->numLockOwners = 0;
2091
		}
2092

2093 2094
		/*
		 * If the lock or proclock pointers are NULL, this lock was taken via
2095
		 * the relation fast-path (and is not known to have been transferred).
2096 2097 2098 2099 2100 2101 2102 2103
		 */
		if (locallock->proclock == NULL || locallock->lock == NULL)
		{
			LOCKMODE	lockmode = locallock->tag.mode;
			Oid			relid;

			/* Verify that a fast-path lock is what we've got. */
			if (!EligibleForRelationFastPath(&locallock->tag.lock, lockmode))
2104 2105 2106 2107
				elog(PANIC, "locallock table corrupted");

			/*
			 * If we don't currently hold the LWLock that protects our
2108
			 * fast-path data structures, we must acquire it before attempting
2109 2110
			 * to release the lock via the fast-path.  We will continue to
			 * hold the LWLock until we're done scanning the locallock table,
B
Bruce Momjian 已提交
2111
			 * unless we hit a transferred fast-path lock.  (XXX is this
2112
			 * really such a good idea?  There could be a lot of entries ...)
2113 2114 2115
			 */
			if (!have_fast_path_lwlock)
			{
2116
				LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
2117 2118 2119 2120 2121
				have_fast_path_lwlock = true;
			}

			/* Attempt fast-path release. */
			relid = locallock->tag.lock.locktag_field2;
2122
			if (FastPathUnGrantRelationLock(relid, lockmode))
2123 2124 2125 2126 2127 2128 2129
			{
				RemoveLocalLock(locallock);
				continue;
			}

			/*
			 * Our lock, originally taken via the fast path, has been
B
Bruce Momjian 已提交
2130
			 * transferred to the main lock table.  That's going to require
2131 2132
			 * some extra work, so release our fast-path lock before starting.
			 */
2133
			LWLockRelease(&MyProc->backendLock);
2134 2135 2136 2137 2138
			have_fast_path_lwlock = false;

			/*
			 * Now dump the lock.  We haven't got a pointer to the LOCK or
			 * PROCLOCK in this case, so we have to handle this a bit
B
Bruce Momjian 已提交
2139
			 * differently than a normal lock release.  Unfortunately, this
2140 2141 2142 2143 2144
			 * requires an extra LWLock acquire-and-release cycle on the
			 * partitionLock, but hopefully it shouldn't happen often.
			 */
			LockRefindAndRelease(lockMethodTable, MyProc,
								 &locallock->tag.lock, lockmode, false);
2145 2146 2147 2148
			RemoveLocalLock(locallock);
			continue;
		}

2149 2150 2151 2152 2153
		/* Mark the proclock to show we need to release this lockmode */
		if (locallock->nLocks > 0)
			locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);

		/* And remove the locallock hashtable entry */
2154 2155 2156
		RemoveLocalLock(locallock);
	}

2157
	/* Done with the fast-path data structures */
2158
	if (have_fast_path_lwlock)
2159
		LWLockRelease(&MyProc->backendLock);
2160

2161 2162 2163 2164 2165
	/*
	 * Now, scan each lock partition separately.
	 */
	for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
	{
2166
		LWLock	   *partitionLock;
2167
		SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
2168
		PROCLOCK   *nextplock;
2169

2170 2171
		partitionLock = LockHashPartitionLockByIndex(partition);

2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192
		/*
		 * If the proclock list for this partition is empty, we can skip
		 * acquiring the partition lock.  This optimization is trickier than
		 * it looks, because another backend could be in process of adding
		 * something to our proclock list due to promoting one of our
		 * fast-path locks.  However, any such lock must be one that we
		 * decided not to delete above, so it's okay to skip it again now;
		 * we'd just decide not to delete it again.  We must, however, be
		 * careful to re-fetch the list header once we've acquired the
		 * partition lock, to be sure we have a valid, up-to-date pointer.
		 * (There is probably no significant risk if pointer fetch/store is
		 * atomic, but we don't wish to assume that.)
		 *
		 * XXX This argument assumes that the locallock table correctly
		 * represents all of our fast-path locks.  While allLocks mode
		 * guarantees to clean up all of our normal locks regardless of the
		 * locallock situation, we lose that guarantee for fast-path locks.
		 * This is not ideal.
		 */
		if (SHMQueueNext(procLocks, procLocks,
						 offsetof(PROCLOCK, procLink)) == NULL)
2193
			continue;			/* needn't examine this partition */
2194

2195
		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2196

2197 2198 2199 2200
		for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
											   offsetof(PROCLOCK, procLink));
			 proclock;
			 proclock = nextplock)
2201 2202
		{
			bool		wakeupNeeded = false;
2203

2204 2205 2206 2207
			/* Get link first, since we may unlink/delete this proclock */
			nextplock = (PROCLOCK *)
				SHMQueueNext(procLocks, &proclock->procLink,
							 offsetof(PROCLOCK, procLink));
2208

2209
			Assert(proclock->tag.myProc == MyProc);
2210

2211
			lock = proclock->tag.myLock;
2212

2213 2214
			/* Ignore items that are not of the lockmethod to be removed */
			if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
2215
				continue;
M
 
Marc G. Fournier 已提交
2216

2217 2218 2219 2220 2221 2222 2223 2224
			/*
			 * In allLocks mode, force release of all locks even if locallock
			 * table had problems
			 */
			if (allLocks)
				proclock->releaseMask = proclock->holdMask;
			else
				Assert((proclock->releaseMask & ~proclock->holdMask) == 0);
2225

2226 2227 2228 2229 2230
			/*
			 * Ignore items that have nothing to be released, unless they have
			 * holdMask == 0 and are therefore recyclable
			 */
			if (proclock->releaseMask == 0 && proclock->holdMask != 0)
2231
				continue;
M
 
Marc G. Fournier 已提交
2232

2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251
			PROCLOCK_PRINT("LockReleaseAll", proclock);
			LOCK_PRINT("LockReleaseAll", lock, 0);
			Assert(lock->nRequested >= 0);
			Assert(lock->nGranted >= 0);
			Assert(lock->nGranted <= lock->nRequested);
			Assert((proclock->holdMask & ~lock->grantMask) == 0);

			/*
			 * Release the previously-marked lock modes
			 */
			for (i = 1; i <= numLockModes; i++)
			{
				if (proclock->releaseMask & LOCKBIT_ON(i))
					wakeupNeeded |= UnGrantLock(lock, i, proclock,
												lockMethodTable);
			}
			Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
			Assert(lock->nGranted <= lock->nRequested);
			LOCK_PRINT("LockReleaseAll: updated", lock, 0);
2252

2253
			proclock->releaseMask = 0;
2254

2255 2256
			/* CleanUpLock will wake up waiters if needed. */
			CleanUpLock(lock, proclock,
2257 2258
						lockMethodTable,
						LockTagHashCode(&lock->tag),
2259
						wakeupNeeded);
B
Bruce Momjian 已提交
2260
		}						/* loop over PROCLOCKs within this partition */
2261 2262

		LWLockRelease(partitionLock);
B
Bruce Momjian 已提交
2263
	}							/* loop over partitions */
2264

2265
#ifdef LOCK_DEBUG
2266
	if (*(lockMethodTable->trace_flag))
2267
		elog(LOG, "LockReleaseAll done");
2268
#endif
2269 2270
}

2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295
/*
 * LockReleaseSession -- Release all session locks of the specified lock method
 *		that are held by the current process.
 */
void
LockReleaseSession(LOCKMETHODID lockmethodid)
{
	HASH_SEQ_STATUS status;
	LOCALLOCK  *locallock;

	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);

	hash_seq_init(&status, LockMethodLocalHash);

	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
	{
		/* Ignore items that are not of the specified lock method */
		if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
			continue;

		ReleaseLockIfHeld(locallock, true);
	}
}

2296 2297 2298
/*
 * LockReleaseCurrentOwner
 *		Release all locks belonging to CurrentResourceOwner
2299 2300 2301 2302 2303
 *
 * If the caller knows what those locks are, it can pass them as an array.
 * That speeds up the call significantly, when a lot of locks are held.
 * Otherwise, pass NULL for locallocks, and we'll traverse through our hash
 * table to find them.
2304 2305
 */
void
2306
LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks)
2307
{
2308 2309 2310 2311
	if (locallocks == NULL)
	{
		HASH_SEQ_STATUS status;
		LOCALLOCK  *locallock;
2312

2313
		hash_seq_init(&status, LockMethodLocalHash);
2314

2315 2316 2317 2318
		while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
			ReleaseLockIfHeld(locallock, false);
	}
	else
2319
	{
B
Bruce Momjian 已提交
2320
		int			i;
2321 2322 2323

		for (i = nlocks - 1; i >= 0; i--)
			ReleaseLockIfHeld(locallocks[i], false);
2324 2325 2326 2327
	}
}

/*
2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338
 * ReleaseLockIfHeld
 *		Release any session-level locks on this lockable object if sessionLock
 *		is true; else, release any locks held by CurrentResourceOwner.
 *
 * It is tempting to pass this a ResourceOwner pointer (or NULL for session
 * locks), but without refactoring LockRelease() we cannot support releasing
 * locks belonging to resource owners other than CurrentResourceOwner.
 * If we were to refactor, it'd be a good idea to fix it so we don't have to
 * do a hashtable lookup of the locallock, too.  However, currently this
 * function isn't used heavily enough to justify refactoring for its
 * convenience.
2339 2340
 */
static void
2341
ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock)
2342
{
2343
	ResourceOwner owner;
2344
	LOCALLOCKOWNER *lockOwners;
2345
	int			i;
2346

2347 2348 2349 2350 2351 2352 2353
	/* Identify owner for lock (must match LockRelease!) */
	if (sessionLock)
		owner = NULL;
	else
		owner = CurrentResourceOwner;

	/* Scan to see if there are any locks belonging to the target owner */
2354 2355 2356 2357
	lockOwners = locallock->lockOwners;
	for (i = locallock->numLockOwners - 1; i >= 0; i--)
	{
		if (lockOwners[i].owner == owner)
2358
		{
2359 2360
			Assert(lockOwners[i].nLocks > 0);
			if (lockOwners[i].nLocks < locallock->nLocks)
2361
			{
2362 2363 2364 2365 2366 2367 2368
				/*
				 * We will still hold this lock after forgetting this
				 * ResourceOwner.
				 */
				locallock->nLocks -= lockOwners[i].nLocks;
				/* compact out unused slot */
				locallock->numLockOwners--;
2369 2370
				if (owner != NULL)
					ResourceOwnerForgetLock(owner, locallock);
2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381
				if (i < locallock->numLockOwners)
					lockOwners[i] = lockOwners[locallock->numLockOwners];
			}
			else
			{
				Assert(lockOwners[i].nLocks == locallock->nLocks);
				/* We want to call LockRelease just once */
				lockOwners[i].nLocks = 1;
				locallock->nLocks = 1;
				if (!LockRelease(&locallock->tag.lock,
								 locallock->tag.mode,
2382 2383
								 sessionLock))
					elog(WARNING, "ReleaseLockIfHeld: failed??");
2384
			}
2385
			break;
2386 2387 2388 2389 2390 2391 2392
		}
	}
}

/*
 * LockReassignCurrentOwner
 *		Reassign all locks belonging to CurrentResourceOwner to belong
2393 2394 2395 2396 2397 2398
 *		to its parent resource owner.
 *
 * If the caller knows what those locks are, it can pass them as an array.
 * That speeds up the call significantly, when a lot of locks are held
 * (e.g pg_dump with a large schema).  Otherwise, pass NULL for locallocks,
 * and we'll traverse through our hash table to find them.
2399 2400
 */
void
2401
LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks)
2402 2403 2404 2405 2406
{
	ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);

	Assert(parent != NULL);

2407 2408 2409 2410
	if (locallocks == NULL)
	{
		HASH_SEQ_STATUS status;
		LOCALLOCK  *locallock;
2411

2412 2413 2414 2415 2416 2417
		hash_seq_init(&status, LockMethodLocalHash);

		while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
			LockReassignOwner(locallock, parent);
	}
	else
2418
	{
B
Bruce Momjian 已提交
2419
		int			i;
2420

2421 2422 2423 2424
		for (i = nlocks - 1; i >= 0; i--)
			LockReassignOwner(locallocks[i], parent);
	}
}
2425

2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436
/*
 * Subroutine of LockReassignCurrentOwner. Reassigns a given lock belonging to
 * CurrentResourceOwner to its parent.
 */
static void
LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent)
{
	LOCALLOCKOWNER *lockOwners;
	int			i;
	int			ic = -1;
	int			ip = -1;
2437

2438
	/*
B
Bruce Momjian 已提交
2439 2440
	 * Scan to see if there are any locks belonging to current owner or its
	 * parent
2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451
	 */
	lockOwners = locallock->lockOwners;
	for (i = locallock->numLockOwners - 1; i >= 0; i--)
	{
		if (lockOwners[i].owner == CurrentResourceOwner)
			ic = i;
		else if (lockOwners[i].owner == parent)
			ip = i;
	}

	if (ic < 0)
B
Bruce Momjian 已提交
2452
		return;					/* no current locks */
2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467

	if (ip < 0)
	{
		/* Parent has no slot, so just give it the child's slot */
		lockOwners[ic].owner = parent;
		ResourceOwnerRememberLock(parent, locallock);
	}
	else
	{
		/* Merge child's count with parent's */
		lockOwners[ip].nLocks += lockOwners[ic].nLocks;
		/* compact out unused slot */
		locallock->numLockOwners--;
		if (ic < locallock->numLockOwners)
			lockOwners[ic] = lockOwners[locallock->numLockOwners];
2468
	}
2469
	ResourceOwnerForgetLock(CurrentResourceOwner, locallock);
2470 2471 2472
}

/*
2473
 * FastPathGrantRelationLock
2474 2475 2476
 *		Grant lock using per-backend fast-path array, if there is space.
 */
static bool
2477
FastPathGrantRelationLock(Oid relid, LOCKMODE lockmode)
2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508
{
	uint32		f;
	uint32		unused_slot = FP_LOCK_SLOTS_PER_BACKEND;

	/* Scan for existing entry for this relid, remembering empty slot. */
	for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
	{
		if (FAST_PATH_GET_BITS(MyProc, f) == 0)
			unused_slot = f;
		else if (MyProc->fpRelId[f] == relid)
		{
			Assert(!FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode));
			FAST_PATH_SET_LOCKMODE(MyProc, f, lockmode);
			return true;
		}
	}

	/* If no existing entry, use any empty slot. */
	if (unused_slot < FP_LOCK_SLOTS_PER_BACKEND)
	{
		MyProc->fpRelId[unused_slot] = relid;
		FAST_PATH_SET_LOCKMODE(MyProc, unused_slot, lockmode);
		++FastPathLocalUseCount;
		return true;
	}

	/* No existing entry, and no empty slot. */
	return false;
}

/*
2509
 * FastPathUnGrantRelationLock
2510 2511 2512 2513
 *		Release fast-path lock, if present.  Update backend-private local
 *		use count, while we're at it.
 */
static bool
2514
FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode)
2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527
{
	uint32		f;
	bool		result = false;

	FastPathLocalUseCount = 0;
	for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
	{
		if (MyProc->fpRelId[f] == relid
			&& FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode))
		{
			Assert(!result);
			FAST_PATH_CLEAR_LOCKMODE(MyProc, f, lockmode);
			result = true;
2528
			/* we continue iterating so as to update FastPathLocalUseCount */
2529 2530 2531 2532 2533 2534 2535 2536
		}
		if (FAST_PATH_GET_BITS(MyProc, f) != 0)
			++FastPathLocalUseCount;
	}
	return result;
}

/*
2537
 * FastPathTransferRelationLocks
2538 2539
 *		Transfer locks matching the given lock tag from per-backend fast-path
 *		arrays to the shared hash table.
2540 2541
 *
 * Returns true if successful, false if ran out of shared memory.
2542 2543
 */
static bool
2544
FastPathTransferRelationLocks(LockMethod lockMethodTable, const LOCKTAG *locktag,
2545
							  uint32 hashcode)
2546
{
2547
	LWLock	   *partitionLock = LockHashPartitionLock(hashcode);
2548 2549
	Oid			relid = locktag->locktag_field2;
	uint32		i;
2550 2551

	/*
2552 2553 2554
	 * Every PGPROC that can potentially hold a fast-path lock is present in
	 * ProcGlobal->allProcs.  Prepared transactions are not, but any
	 * outstanding fast-path locks held by prepared transactions are
2555 2556 2557 2558 2559 2560 2561
	 * transferred to the main lock table.
	 */
	for (i = 0; i < ProcGlobal->allProcCount; i++)
	{
		PGPROC	   *proc = &ProcGlobal->allProcs[i];
		uint32		f;

2562
		LWLockAcquire(&proc->backendLock, LW_EXCLUSIVE);
2563 2564

		/*
2565 2566
		 * If the target backend isn't referencing the same database as the
		 * lock, then we needn't examine the individual relation IDs at all;
2567
		 * none of them can be relevant.
2568 2569 2570
		 *
		 * proc->databaseId is set at backend startup time and never changes
		 * thereafter, so it might be safe to perform this test before
2571
		 * acquiring &proc->backendLock.  In particular, it's certainly safe to
2572 2573
		 * assume that if the target backend holds any fast-path locks, it
		 * must have performed a memory-fencing operation (in particular, an
B
Bruce Momjian 已提交
2574
		 * LWLock acquisition) since setting proc->databaseId.  However, it's
2575
		 * less clear that our backend is certain to have performed a memory
B
Bruce Momjian 已提交
2576
		 * fencing operation since the other backend set proc->databaseId.  So
2577
		 * for now, we test it after acquiring the LWLock just to be safe.
2578
		 */
2579
		if (proc->databaseId != locktag->locktag_field1)
2580
		{
2581
			LWLockRelease(&proc->backendLock);
2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595
			continue;
		}

		for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
		{
			uint32		lockmode;

			/* Look for an allocated slot matching the given relid. */
			if (relid != proc->fpRelId[f] || FAST_PATH_GET_BITS(proc, f) == 0)
				continue;

			/* Find or create lock object. */
			LWLockAcquire(partitionLock, LW_EXCLUSIVE);
			for (lockmode = FAST_PATH_LOCKNUMBER_OFFSET;
2596
			lockmode < FAST_PATH_LOCKNUMBER_OFFSET + FAST_PATH_BITS_PER_SLOT;
2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607
				 ++lockmode)
			{
				PROCLOCK   *proclock;

				if (!FAST_PATH_CHECK_LOCKMODE(proc, f, lockmode))
					continue;
				proclock = SetupLockInTable(lockMethodTable, proc, locktag,
											hashcode, lockmode);
				if (!proclock)
				{
					LWLockRelease(partitionLock);
2608
					LWLockRelease(&proc->backendLock);
2609 2610 2611 2612 2613 2614
					return false;
				}
				GrantLock(proclock->tag.myLock, proclock, lockmode);
				FAST_PATH_CLEAR_LOCKMODE(proc, f, lockmode);
			}
			LWLockRelease(partitionLock);
2615 2616 2617

			/* No need to examine remaining slots. */
			break;
2618
		}
2619
		LWLockRelease(&proc->backendLock);
2620 2621 2622 2623 2624 2625 2626
	}
	return true;
}

/*
 * FastPathGetLockEntry
 *		Return the PROCLOCK for a lock originally taken via the fast-path,
2627
 *		transferring it to the primary lock table if necessary.
2628 2629
 *
 * Note: caller takes care of updating the locallock object.
2630 2631
 */
static PROCLOCK *
2632
FastPathGetRelationLockEntry(LOCALLOCK *locallock)
2633
{
2634 2635 2636
	LockMethod	lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD];
	LOCKTAG    *locktag = &locallock->tag.lock;
	PROCLOCK   *proclock = NULL;
2637
	LWLock	   *partitionLock = LockHashPartitionLock(locallock->hashcode);
2638 2639
	Oid			relid = locktag->locktag_field2;
	uint32		f;
2640

2641
	LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661

	for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
	{
		uint32		lockmode;

		/* Look for an allocated slot matching the given relid. */
		if (relid != MyProc->fpRelId[f] || FAST_PATH_GET_BITS(MyProc, f) == 0)
			continue;

		/* If we don't have a lock of the given mode, forget it! */
		lockmode = locallock->tag.mode;
		if (!FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode))
			break;

		/* Find or create lock object. */
		LWLockAcquire(partitionLock, LW_EXCLUSIVE);

		proclock = SetupLockInTable(lockMethodTable, MyProc, locktag,
									locallock->hashcode, lockmode);
		if (!proclock)
2662
		{
2663
			LWLockRelease(partitionLock);
2664
			LWLockRelease(&MyProc->backendLock);
2665 2666 2667
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of shared memory"),
2668
					 errhint("You might need to increase max_locks_per_transaction.")));
2669
		}
2670 2671 2672 2673
		GrantLock(proclock->tag.myLock, proclock, lockmode);
		FAST_PATH_CLEAR_LOCKMODE(MyProc, f, lockmode);

		LWLockRelease(partitionLock);
2674 2675 2676

		/* No need to examine remaining slots. */
		break;
2677 2678
	}

2679
	LWLockRelease(&MyProc->backendLock);
2680 2681 2682 2683 2684

	/* Lock may have already been transferred by some other backend. */
	if (proclock == NULL)
	{
		LOCK	   *lock;
2685
		PROCLOCKTAG proclocktag;
2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710
		uint32		proclock_hashcode;

		LWLockAcquire(partitionLock, LW_SHARED);

		lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
													(void *) locktag,
													locallock->hashcode,
													HASH_FIND,
													NULL);
		if (!lock)
			elog(ERROR, "failed to re-find shared lock object");

		proclocktag.myLock = lock;
		proclocktag.myProc = MyProc;

		proclock_hashcode = ProcLockHashCode(&proclocktag, locallock->hashcode);
		proclock = (PROCLOCK *)
			hash_search_with_hash_value(LockMethodProcLockHash,
										(void *) &proclocktag,
										proclock_hashcode,
										HASH_FIND,
										NULL);
		if (!proclock)
			elog(ERROR, "failed to re-find shared proclock object");
		LWLockRelease(partitionLock);
2711 2712
	}

2713 2714
	return proclock;
}
2715

2716 2717
/*
 * GetLockConflicts
2718
 *		Get an array of VirtualTransactionIds of xacts currently holding locks
2719 2720 2721
 *		that would conflict with the specified lock/lockmode.
 *		xacts merely awaiting such a lock are NOT reported.
 *
2722 2723
 * The result array is palloc'd and is terminated with an invalid VXID.
 *
2724 2725 2726
 * Of course, the result could be out of date by the time it's returned,
 * so use of this function has to be thought about carefully.
 *
2727 2728 2729 2730
 * Note we never include the current xact's vxid in the result array,
 * since an xact never blocks itself.  Also, prepared transactions are
 * ignored, which is a bit more debatable but is appropriate for current
 * uses of the result.
2731
 */
2732
VirtualTransactionId *
2733 2734
GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
{
2735
	static VirtualTransactionId *vxids;
2736 2737 2738 2739 2740 2741 2742
	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
	LockMethod	lockMethodTable;
	LOCK	   *lock;
	LOCKMASK	conflictMask;
	SHM_QUEUE  *procLocks;
	PROCLOCK   *proclock;
	uint32		hashcode;
2743
	LWLock	   *partitionLock;
2744
	int			count = 0;
2745
	int			fast_count = 0;
2746 2747 2748 2749 2750 2751 2752

	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
	lockMethodTable = LockMethods[lockmethodid];
	if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
		elog(ERROR, "unrecognized lock mode: %d", lockmode);

2753
	/*
B
Bruce Momjian 已提交
2754 2755
	 * Allocate memory to store results, and fill with InvalidVXID.  We only
	 * need enough space for MaxBackends + a terminator, since prepared xacts
2756
	 * don't count. InHotStandby allocate once in TopMemoryContext.
2757
	 */
2758
	if (InHotStandby)
2759 2760 2761
	{
		if (vxids == NULL)
			vxids = (VirtualTransactionId *)
2762
				MemoryContextAlloc(TopMemoryContext,
B
Bruce Momjian 已提交
2763
						   sizeof(VirtualTransactionId) * (MaxBackends + 1));
2764
	}
2765 2766 2767
	else
		vxids = (VirtualTransactionId *)
			palloc0(sizeof(VirtualTransactionId) * (MaxBackends + 1));
2768

2769
	/* Compute hash code and partiton lock, and look up conflicting modes. */
2770 2771
	hashcode = LockTagHashCode(locktag);
	partitionLock = LockHashPartitionLock(hashcode);
2772 2773 2774 2775
	conflictMask = lockMethodTable->conflictTab[lockmode];

	/*
	 * Fast path locks might not have been entered in the primary lock table.
2776 2777
	 * If the lock we're dealing with could conflict with such a lock, we must
	 * examine each backend's fast-path array for conflicts.
2778
	 */
2779
	if (ConflictsWithRelationFastPath(locktag, lockmode))
2780 2781 2782
	{
		int			i;
		Oid			relid = locktag->locktag_field2;
2783
		VirtualTransactionId vxid;
2784 2785 2786 2787

		/*
		 * Iterate over relevant PGPROCs.  Anything held by a prepared
		 * transaction will have been transferred to the primary lock table,
2788 2789 2790 2791
		 * so we need not worry about those.  This is all a bit fuzzy, because
		 * new locks could be taken after we've visited a particular
		 * partition, but the callers had better be prepared to deal with that
		 * anyway, since the locks could equally well be taken between the
2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803
		 * time we return the value and the time the caller does something
		 * with it.
		 */
		for (i = 0; i < ProcGlobal->allProcCount; i++)
		{
			PGPROC	   *proc = &ProcGlobal->allProcs[i];
			uint32		f;

			/* A backend never blocks itself */
			if (proc == MyProc)
				continue;

2804
			LWLockAcquire(&proc->backendLock, LW_SHARED);
2805 2806

			/*
B
Bruce Momjian 已提交
2807 2808 2809
			 * If the target backend isn't referencing the same database as
			 * the lock, then we needn't examine the individual relation IDs
			 * at all; none of them can be relevant.
2810 2811 2812 2813
			 *
			 * See FastPathTransferLocks() for discussion of why we do this
			 * test after acquiring the lock.
			 */
2814
			if (proc->databaseId != locktag->locktag_field1)
2815
			{
2816
				LWLockRelease(&proc->backendLock);
2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832
				continue;
			}

			for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
			{
				uint32		lockmask;

				/* Look for an allocated slot matching the given relid. */
				if (relid != proc->fpRelId[f])
					continue;
				lockmask = FAST_PATH_GET_BITS(proc, f);
				if (!lockmask)
					continue;
				lockmask <<= FAST_PATH_LOCKNUMBER_OFFSET;

				/*
2833 2834
				 * There can only be one entry per relation, so if we found it
				 * and it doesn't conflict, we can skip the rest of the slots.
2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848
				 */
				if ((lockmask & conflictMask) == 0)
					break;

				/* Conflict! */
				GET_VXID_FROM_PGPROC(vxid, *proc);

				/*
				 * If we see an invalid VXID, then either the xact has already
				 * committed (or aborted), or it's a prepared xact.  In either
				 * case we may ignore it.
				 */
				if (VirtualTransactionIdIsValid(vxid))
					vxids[count++] = vxid;
2849 2850

				/* No need to examine remaining slots. */
2851 2852
				break;
			}
2853

2854
			LWLockRelease(&proc->backendLock);
2855 2856 2857 2858 2859 2860 2861 2862 2863
		}
	}

	/* Remember how many fast-path conflicts we found. */
	fast_count = count;

	/*
	 * Look up the lock object matching the tag.
	 */
2864 2865 2866
	LWLockAcquire(partitionLock, LW_SHARED);

	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
2867
												(const void *) locktag,
2868 2869 2870 2871 2872 2873
												hashcode,
												HASH_FIND,
												NULL);
	if (!lock)
	{
		/*
B
Bruce Momjian 已提交
2874 2875
		 * If the lock object doesn't exist, there is nothing holding a lock
		 * on this lockable object.
2876 2877
		 */
		LWLockRelease(partitionLock);
2878 2879
		vxids[count].backendId = InvalidBackendId;
		vxids[count].localTransactionId = InvalidLocalTransactionId;
2880
		return vxids;
2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895
	}

	/*
	 * Examine each existing holder (or awaiter) of the lock.
	 */

	procLocks = &(lock->procLocks);

	proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
										 offsetof(PROCLOCK, lockLink));

	while (proclock)
	{
		if (conflictMask & proclock->holdMask)
		{
B
Bruce Momjian 已提交
2896
			PGPROC	   *proc = proclock->tag.myProc;
2897 2898 2899 2900

			/* A backend never blocks itself */
			if (proc != MyProc)
			{
2901 2902 2903
				VirtualTransactionId vxid;

				GET_VXID_FROM_PGPROC(vxid, *proc);
2904 2905

				/*
2906
				 * If we see an invalid VXID, then either the xact has already
B
Bruce Momjian 已提交
2907 2908
				 * committed (or aborted), or it's a prepared xact.  In either
				 * case we may ignore it.
2909
				 */
2910
				if (VirtualTransactionIdIsValid(vxid))
2911
				{
2912
					int			i;
2913 2914 2915 2916 2917 2918 2919 2920

					/* Avoid duplicate entries. */
					for (i = 0; i < fast_count; ++i)
						if (VirtualTransactionIdEquals(vxids[i], vxid))
							break;
					if (i >= fast_count)
						vxids[count++] = vxid;
				}
2921 2922 2923 2924 2925 2926 2927 2928 2929
			}
		}

		proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
											 offsetof(PROCLOCK, lockLink));
	}

	LWLockRelease(partitionLock);

2930 2931 2932
	if (count > MaxBackends)	/* should never happen */
		elog(PANIC, "too many conflicting locks found");

2933 2934
	vxids[count].backendId = InvalidBackendId;
	vxids[count].localTransactionId = InvalidLocalTransactionId;
2935
	return vxids;
2936 2937
}

2938 2939 2940 2941 2942
/*
 * Find a lock in the shared lock table and release it.  It is the caller's
 * responsibility to verify that this is a sane thing to do.  (For example, it
 * would be bad to release a lock here if there might still be a LOCALLOCK
 * object with pointers to it.)
2943
 *
2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958
 * We currently use this in two situations: first, to release locks held by
 * prepared transactions on commit (see lock_twophase_postcommit); and second,
 * to release locks taken via the fast-path, transferred to the main hash
 * table, and then released (see LockReleaseAll).
 */
static void
LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
					 LOCKTAG *locktag, LOCKMODE lockmode,
					 bool decrement_strong_lock_count)
{
	LOCK	   *lock;
	PROCLOCK   *proclock;
	PROCLOCKTAG proclocktag;
	uint32		hashcode;
	uint32		proclock_hashcode;
2959
	LWLock	   *partitionLock;
2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017
	bool		wakeupNeeded;

	hashcode = LockTagHashCode(locktag);
	partitionLock = LockHashPartitionLock(hashcode);

	LWLockAcquire(partitionLock, LW_EXCLUSIVE);

	/*
	 * Re-find the lock object (it had better be there).
	 */
	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
												(void *) locktag,
												hashcode,
												HASH_FIND,
												NULL);
	if (!lock)
		elog(PANIC, "failed to re-find shared lock object");

	/*
	 * Re-find the proclock object (ditto).
	 */
	proclocktag.myLock = lock;
	proclocktag.myProc = proc;

	proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);

	proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
														(void *) &proclocktag,
														proclock_hashcode,
														HASH_FIND,
														NULL);
	if (!proclock)
		elog(PANIC, "failed to re-find shared proclock object");

	/*
	 * Double-check that we are actually holding a lock of the type we want to
	 * release.
	 */
	if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
	{
		PROCLOCK_PRINT("lock_twophase_postcommit: WRONGTYPE", proclock);
		LWLockRelease(partitionLock);
		elog(WARNING, "you don't own a lock of type %s",
			 lockMethodTable->lockModeNames[lockmode]);
		return;
	}

	/*
	 * Do the releasing.  CleanUpLock will waken any now-wakable waiters.
	 */
	wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);

	CleanUpLock(lock, proclock,
				lockMethodTable, hashcode,
				wakeupNeeded);

	LWLockRelease(partitionLock);

3018
	/*
3019 3020 3021
	 * Decrement strong lock count.  This logic is needed only for 2PC.
	 */
	if (decrement_strong_lock_count
3022
		&& ConflictsWithRelationFastPath(locktag, lockmode))
3023
	{
3024 3025
		uint32		fasthashcode = FastPathStrongLockHashPartition(hashcode);

3026
		SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
3027
		Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0);
3028 3029
		FastPathStrongRelationLocks->count[fasthashcode]--;
		SpinLockRelease(&FastPathStrongRelationLocks->mutex);
3030 3031
	}
}
3032

3033 3034 3035 3036 3037
/*
 * AtPrepare_Locks
 *		Do the preparatory work for a PREPARE: make 2PC state file records
 *		for all locks currently held.
 *
3038
 * Session-level locks are ignored, as are VXID locks.
3039
 *
3040 3041 3042 3043 3044
 * There are some special cases that we error out on: we can't be holding any
 * locks at both session and transaction level (since we must either keep or
 * give away the PROCLOCK object), and we can't be holding any locks on
 * temporary objects (since that would mess up the current backend if it tries
 * to exit before the prepared xact is committed).
3045 3046 3047 3048 3049 3050 3051 3052
 */
void
AtPrepare_Locks(void)
{
	HASH_SEQ_STATUS status;
	LOCALLOCK  *locallock;

	/*
3053 3054
	 * For the most part, we don't need to touch shared memory for this ---
	 * all the necessary state information is in the locallock table.
3055 3056
	 * Fast-path locks are an exception, however: we move any such locks to
	 * the main table before allowing PREPARE TRANSACTION to succeed.
3057
	 */
3058
	hash_seq_init(&status, LockMethodLocalHash);
3059 3060 3061 3062 3063

	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
	{
		TwoPhaseLockRecord record;
		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
3064 3065
		bool		haveSessionLock;
		bool		haveXactLock;
B
Bruce Momjian 已提交
3066
		int			i;
3067

3068 3069 3070 3071 3072 3073 3074
		/*
		 * Ignore VXID locks.  We don't want those to be held by prepared
		 * transactions, since they aren't meaningful after a restart.
		 */
		if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
			continue;

3075 3076 3077 3078
		/* Ignore it if we don't actually hold the lock */
		if (locallock->nLocks <= 0)
			continue;

3079 3080
		/* Scan to see whether we hold it at session or transaction level */
		haveSessionLock = haveXactLock = false;
3081 3082 3083
		for (i = locallock->numLockOwners - 1; i >= 0; i--)
		{
			if (lockOwners[i].owner == NULL)
3084 3085 3086
				haveSessionLock = true;
			else
				haveXactLock = true;
3087 3088
		}

3089 3090 3091 3092 3093
		/* Ignore it if we have only session lock */
		if (!haveXactLock)
			continue;

		/*
B
Bruce Momjian 已提交
3094
		 * If we have both session- and transaction-level locks, fail.  This
3095 3096 3097 3098 3099 3100 3101 3102
		 * should never happen with regular locks, since we only take those at
		 * session level in some special operations like VACUUM.  It's
		 * possible to hit this with advisory locks, though.
		 *
		 * It would be nice if we could keep the session hold and give away
		 * the transactional hold to the prepared xact.  However, that would
		 * require two PROCLOCK objects, and we cannot be sure that another
		 * PROCLOCK will be available when it comes time for PostPrepare_Locks
B
Bruce Momjian 已提交
3103
		 * to do the deed.  So for now, we error out while we can still do so
3104 3105 3106 3107 3108 3109 3110
		 * safely.
		 */
		if (haveSessionLock)
			ereport(ERROR,
					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
					 errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));

3111 3112 3113
		/*
		 * If the local lock was taken via the fast-path, we need to move it
		 * to the primary lock table, or just get a pointer to the existing
3114 3115
		 * primary lock table entry if by chance it's already been
		 * transferred.
3116 3117 3118
		 */
		if (locallock->proclock == NULL)
		{
3119
			locallock->proclock = FastPathGetRelationLockEntry(locallock);
3120 3121 3122 3123
			locallock->lock = locallock->proclock->tag.myLock;
		}

		/*
3124
		 * Arrange to not release any strong lock count held by this lock
3125 3126
		 * entry.  We must retain the count until the prepared transaction is
		 * committed or rolled back.
3127 3128 3129
		 */
		locallock->holdsStrongLockCount = FALSE;

3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161
		/*
		 * Create a 2PC record.
		 */
		memcpy(&(record.locktag), &(locallock->tag.lock), sizeof(LOCKTAG));
		record.lockmode = locallock->tag.mode;

		RegisterTwoPhaseRecord(TWOPHASE_RM_LOCK_ID, 0,
							   &record, sizeof(TwoPhaseLockRecord));
	}
}

/*
 * PostPrepare_Locks
 *		Clean up after successful PREPARE
 *
 * Here, we want to transfer ownership of our locks to a dummy PGPROC
 * that's now associated with the prepared transaction, and we want to
 * clean out the corresponding entries in the LOCALLOCK table.
 *
 * Note: by removing the LOCALLOCK entries, we are leaving dangling
 * pointers in the transaction's resource owner.  This is OK at the
 * moment since resowner.c doesn't try to free locks retail at a toplevel
 * transaction commit or abort.  We could alternatively zero out nLocks
 * and leave the LOCALLOCK entries to be garbage-collected by LockReleaseAll,
 * but that probably costs more cycles.
 */
void
PostPrepare_Locks(TransactionId xid)
{
	PGPROC	   *newproc = TwoPhaseGetDummyProc(xid);
	HASH_SEQ_STATUS status;
	LOCALLOCK  *locallock;
3162
	LOCK	   *lock;
3163 3164
	PROCLOCK   *proclock;
	PROCLOCKTAG proclocktag;
3165
	int			partition;
3166

3167 3168 3169 3170
	/* Can't prepare a lock group follower. */
	Assert(MyProc->lockGroupLeader == NULL ||
		   MyProc->lockGroupLeader == MyProc);

3171 3172 3173 3174 3175
	/* This is a critical section: any error means big trouble */
	START_CRIT_SECTION();

	/*
	 * First we run through the locallock table and get rid of unwanted
B
Bruce Momjian 已提交
3176 3177
	 * entries, then we scan the process's proclocks and transfer them to the
	 * target proc.
3178
	 *
B
Bruce Momjian 已提交
3179 3180 3181
	 * We do this separately because we may have multiple locallock entries
	 * pointing to the same proclock, and we daren't end up with any dangling
	 * pointers.
3182
	 */
3183
	hash_seq_init(&status, LockMethodLocalHash);
3184 3185 3186

	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
	{
3187 3188 3189 3190 3191
		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
		bool		haveSessionLock;
		bool		haveXactLock;
		int			i;

3192 3193 3194
		if (locallock->proclock == NULL || locallock->lock == NULL)
		{
			/*
B
Bruce Momjian 已提交
3195 3196
			 * We must've run out of shared memory while trying to set up this
			 * lock.  Just forget the local entry.
3197 3198 3199 3200 3201 3202
			 */
			Assert(locallock->nLocks == 0);
			RemoveLocalLock(locallock);
			continue;
		}

3203 3204 3205 3206
		/* Ignore VXID locks */
		if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
			continue;

3207 3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225
		/* Scan to see whether we hold it at session or transaction level */
		haveSessionLock = haveXactLock = false;
		for (i = locallock->numLockOwners - 1; i >= 0; i--)
		{
			if (lockOwners[i].owner == NULL)
				haveSessionLock = true;
			else
				haveXactLock = true;
		}

		/* Ignore it if we have only session lock */
		if (!haveXactLock)
			continue;

		/* This can't happen, because we already checked it */
		if (haveSessionLock)
			ereport(PANIC,
					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
					 errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));
3226 3227 3228 3229 3230 3231 3232 3233 3234

		/* Mark the proclock to show we need to release this lockmode */
		if (locallock->nLocks > 0)
			locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);

		/* And remove the locallock hashtable entry */
		RemoveLocalLock(locallock);
	}

3235 3236 3237 3238 3239
	/*
	 * Now, scan each lock partition separately.
	 */
	for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
	{
3240
		LWLock	   *partitionLock;
3241
		SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
3242
		PROCLOCK   *nextplock;
3243

3244 3245
		partitionLock = LockHashPartitionLockByIndex(partition);

3246 3247 3248 3249 3250 3251 3252 3253 3254 3255
		/*
		 * If the proclock list for this partition is empty, we can skip
		 * acquiring the partition lock.  This optimization is safer than the
		 * situation in LockReleaseAll, because we got rid of any fast-path
		 * locks during AtPrepare_Locks, so there cannot be any case where
		 * another backend is adding something to our lists now.  For safety,
		 * though, we code this the same way as in LockReleaseAll.
		 */
		if (SHMQueueNext(procLocks, procLocks,
						 offsetof(PROCLOCK, procLink)) == NULL)
3256
			continue;			/* needn't examine this partition */
3257

3258
		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
3259

3260 3261 3262 3263
		for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
											   offsetof(PROCLOCK, procLink));
			 proclock;
			 proclock = nextplock)
3264
		{
3265
			/* Get link first, since we may unlink/relink this proclock */
3266 3267 3268
			nextplock = (PROCLOCK *)
				SHMQueueNext(procLocks, &proclock->procLink,
							 offsetof(PROCLOCK, procLink));
3269

3270
			Assert(proclock->tag.myProc == MyProc);
3271

3272
			lock = proclock->tag.myLock;
3273

3274 3275
			/* Ignore VXID locks */
			if (lock->tag.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
3276
				continue;
3277

3278 3279 3280 3281 3282 3283
			PROCLOCK_PRINT("PostPrepare_Locks", proclock);
			LOCK_PRINT("PostPrepare_Locks", lock, 0);
			Assert(lock->nRequested >= 0);
			Assert(lock->nGranted >= 0);
			Assert(lock->nGranted <= lock->nRequested);
			Assert((proclock->holdMask & ~lock->grantMask) == 0);
3284

3285 3286
			/* Ignore it if nothing to release (must be a session lock) */
			if (proclock->releaseMask == 0)
3287
				continue;
3288 3289

			/* Else we should be releasing all locks */
3290 3291
			if (proclock->releaseMask != proclock->holdMask)
				elog(PANIC, "we seem to have dropped a bit somewhere");
3292

3293
			/*
3294
			 * We cannot simply modify proclock->tag.myProc to reassign
3295
			 * ownership of the lock, because that's part of the hash key and
B
Bruce Momjian 已提交
3296
			 * the proclock would then be in the wrong hash chain.  Instead
3297 3298
			 * use hash_update_hash_key.  (We used to create a new hash entry,
			 * but that risks out-of-memory failure if other processes are
B
Bruce Momjian 已提交
3299
			 * busy making proclocks too.)	We must unlink the proclock from
3300 3301 3302
			 * our procLink chain and put it into the new proc's chain, too.
			 *
			 * Note: the updated proclock hash key will still belong to the
B
Bruce Momjian 已提交
3303 3304
			 * same hash partition, cf proclock_hash().  So the partition lock
			 * we already hold is sufficient for this.
3305 3306
			 */
			SHMQueueDelete(&proclock->procLink);
3307

3308
			/*
3309
			 * Create the new hash key for the proclock.
3310
			 */
3311 3312
			proclocktag.myLock = lock;
			proclocktag.myProc = newproc;
3313

3314 3315 3316 3317 3318 3319 3320
			/*
			 * Update groupLeader pointer to point to the new proc.  (We'd
			 * better not be a member of somebody else's lock group!)
			 */
			Assert(proclock->groupLeader == proclock->tag.myProc);
			proclock->groupLeader = newproc;

3321
			/*
B
Bruce Momjian 已提交
3322 3323 3324
			 * Update the proclock.  We should not find any existing entry for
			 * the same hash key, since there can be only one entry for any
			 * given lock with my own proc.
3325
			 */
3326 3327 3328 3329
			if (!hash_update_hash_key(LockMethodProcLockHash,
									  (void *) proclock,
									  (void *) &proclocktag))
				elog(PANIC, "duplicate entry found while reassigning a prepared transaction's locks");
3330

3331 3332 3333 3334 3335
			/* Re-link into the new proc's proclock list */
			SHMQueueInsertBefore(&(newproc->myProcLocks[partition]),
								 &proclock->procLink);

			PROCLOCK_PRINT("PostPrepare_Locks: updated", proclock);
B
Bruce Momjian 已提交
3336
		}						/* loop over PROCLOCKs within this partition */
3337

3338
		LWLockRelease(partitionLock);
B
Bruce Momjian 已提交
3339
	}							/* loop over partitions */
3340 3341 3342 3343 3344

	END_CRIT_SECTION();
}


3345 3346 3347
/*
 * Estimate shared-memory space used for lock tables
 */
3348
Size
3349
LockShmemSize(void)
3350
{
3351
	Size		size = 0;
3352
	long		max_table_size;
3353

3354
	/* lock hash table */
3355
	max_table_size = NLOCKENTS();
3356
	size = add_size(size, hash_estimate_size(max_table_size, sizeof(LOCK)));
3357

3358
	/* proclock hash table */
3359
	max_table_size *= 2;
3360
	size = add_size(size, hash_estimate_size(max_table_size, sizeof(PROCLOCK)));
3361

B
Bruce Momjian 已提交
3362
	/*
3363
	 * Since NLOCKENTS is only an estimate, add 10% safety margin.
3364
	 */
3365
	size = add_size(size, size / 10);
3366 3367

	return size;
3368 3369
}

3370 3371
/*
 * GetLockStatusData - Return a summary of the lock manager's internal
3372
 * status, for use in a user-level reporting function.
3373
 *
3374 3375 3376 3377 3378
 * The return data consists of an array of PROCLOCK objects, with the
 * associated PGPROC and LOCK objects for each.  Note that multiple
 * copies of the same PGPROC and/or LOCK objects are likely to appear.
 * It is the caller's responsibility to match up duplicates if wanted.
 *
3379
 * The design goal is to hold the LWLocks for as short a time as possible;
3380
 * thus, this function simply makes a copy of the necessary data and releases
3381
 * the locks, allowing the caller to contemplate and format the data for as
3382
 * long as it pleases.
3383
 */
3384 3385
LockData *
GetLockStatusData(void)
3386
{
B
Bruce Momjian 已提交
3387
	LockData   *data;
3388
	PROCLOCK   *proclock;
3389
	HASH_SEQ_STATUS seqstat;
3390 3391
	int			els;
	int			el;
B
Bruce Momjian 已提交
3392
	int			i;
3393

3394
	data = (LockData *) palloc(sizeof(LockData));
3395

3396 3397 3398 3399 3400 3401 3402
	/* Guess how much space we'll need. */
	els = MaxBackends;
	el = 0;
	data->locks = (LockInstanceData *) palloc(sizeof(LockInstanceData) * els);

	/*
	 * First, we iterate through the per-backend fast-path arrays, locking
B
Bruce Momjian 已提交
3403
	 * them one at a time.  This might produce an inconsistent picture of the
3404 3405 3406
	 * system state, but taking all of those LWLocks at the same time seems
	 * impractical (in particular, note MAX_SIMUL_LWLOCKS).  It shouldn't
	 * matter too much, because none of these locks can be involved in lock
3407 3408
	 * conflicts anyway - anything that might must be present in the main lock
	 * table.
3409 3410 3411 3412 3413 3414
	 */
	for (i = 0; i < ProcGlobal->allProcCount; ++i)
	{
		PGPROC	   *proc = &ProcGlobal->allProcs[i];
		uint32		f;

3415
		LWLockAcquire(&proc->backendLock, LW_SHARED);
3416 3417 3418

		for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; ++f)
		{
3419
			LockInstanceData *instance;
3420 3421 3422 3423 3424 3425 3426 3427 3428 3429 3430 3431 3432
			uint32		lockbits = FAST_PATH_GET_BITS(proc, f);

			/* Skip unallocated slots. */
			if (!lockbits)
				continue;

			if (el >= els)
			{
				els += MaxBackends;
				data->locks = (LockInstanceData *)
					repalloc(data->locks, sizeof(LockInstanceData) * els);
			}

3433
			instance = &data->locks[el];
3434 3435 3436 3437 3438 3439 3440 3441 3442 3443 3444 3445
			SET_LOCKTAG_RELATION(instance->locktag, proc->databaseId,
								 proc->fpRelId[f]);
			instance->holdMask = lockbits << FAST_PATH_LOCKNUMBER_OFFSET;
			instance->waitLockMode = NoLock;
			instance->backend = proc->backendId;
			instance->lxid = proc->lxid;
			instance->pid = proc->pid;
			instance->fastpath = true;

			el++;
		}

3446 3447
		if (proc->fpVXIDLock)
		{
3448 3449
			VirtualTransactionId vxid;
			LockInstanceData *instance;
3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 3461 3462 3463 3464 3465 3466 3467 3468 3469 3470 3471 3472

			if (el >= els)
			{
				els += MaxBackends;
				data->locks = (LockInstanceData *)
					repalloc(data->locks, sizeof(LockInstanceData) * els);
			}

			vxid.backendId = proc->backendId;
			vxid.localTransactionId = proc->fpLocalTransactionId;

			instance = &data->locks[el];
			SET_LOCKTAG_VIRTUALTRANSACTION(instance->locktag, vxid);
			instance->holdMask = LOCKBIT_ON(ExclusiveLock);
			instance->waitLockMode = NoLock;
			instance->backend = proc->backendId;
			instance->lxid = proc->lxid;
			instance->pid = proc->pid;
			instance->fastpath = true;

			el++;
		}

3473
		LWLockRelease(&proc->backendLock);
3474 3475
	}

3476
	/*
3477 3478 3479
	 * Next, acquire lock on the entire shared lock data structure.  We do
	 * this so that, at least for locks in the primary lock table, the state
	 * will be self-consistent.
3480
	 *
B
Bruce Momjian 已提交
3481
	 * Since this is a read-only operation, we take shared instead of
B
Bruce Momjian 已提交
3482
	 * exclusive lock.  There's not a whole lot of point to this, because all
B
Bruce Momjian 已提交
3483 3484 3485
	 * the normal operations require exclusive lock, but it doesn't hurt
	 * anything either. It will at least allow two backends to do
	 * GetLockStatusData in parallel.
3486 3487 3488 3489
	 *
	 * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
	 */
	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
3490
		LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);
3491 3492

	/* Now we can safely count the number of proclocks */
3493 3494 3495 3496 3497 3498 3499
	data->nelements = el + hash_get_num_entries(LockMethodProcLockHash);
	if (data->nelements > els)
	{
		els = data->nelements;
		data->locks = (LockInstanceData *)
			repalloc(data->locks, sizeof(LockInstanceData) * els);
	}
3500

3501
	/* Now scan the tables to copy the data */
3502
	hash_seq_init(&seqstat, LockMethodProcLockHash);
3503

3504 3505 3506 3507
	while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
	{
		PGPROC	   *proc = proclock->tag.myProc;
		LOCK	   *lock = proclock->tag.myLock;
3508
		LockInstanceData *instance = &data->locks[el];
3509

3510 3511 3512 3513 3514 3515 3516 3517 3518 3519
		memcpy(&instance->locktag, &lock->tag, sizeof(LOCKTAG));
		instance->holdMask = proclock->holdMask;
		if (proc->waitLock == proclock->tag.myLock)
			instance->waitLockMode = proc->waitLockMode;
		else
			instance->waitLockMode = NoLock;
		instance->backend = proc->backendId;
		instance->lxid = proc->lxid;
		instance->pid = proc->pid;
		instance->fastpath = false;
3520

3521
		el++;
3522 3523
	}

3524
	/*
B
Bruce Momjian 已提交
3525 3526 3527 3528 3529
	 * And release locks.  We do this in reverse order for two reasons: (1)
	 * Anyone else who needs more than one of the locks will be trying to lock
	 * them in increasing order; we don't want to release the other process
	 * until it can get all the locks it needs. (2) This avoids O(N^2)
	 * behavior inside LWLockRelease.
3530
	 */
B
Bruce Momjian 已提交
3531
	for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
3532
		LWLockRelease(LockHashPartitionLockByIndex(i));
3533

3534
	Assert(el == data->nelements);
3535

3536
	return data;
3537 3538
}

3539
/*
3540 3541 3542 3543 3544 3545 3546 3547 3548 3549
 * Returns a list of currently held AccessExclusiveLocks, for use by
 * LogStandbySnapshot().  The result is a palloc'd array,
 * with the number of elements returned into *nlocks.
 *
 * XXX This currently takes a lock on all partitions of the lock table,
 * but it's possible to do better.  By reference counting locks and storing
 * the value in the ProcArray entry for each backend we could tell if any
 * locks need recording without having to acquire the partition locks and
 * scan the lock table.  Whether that's worth the additional overhead
 * is pretty dubious though.
3550 3551 3552 3553
 */
xl_standby_lock *
GetRunningTransactionLocks(int *nlocks)
{
3554
	xl_standby_lock *accessExclusiveLocks;
3555 3556 3557
	PROCLOCK   *proclock;
	HASH_SEQ_STATUS seqstat;
	int			i;
B
Bruce Momjian 已提交
3558
	int			index;
3559 3560 3561 3562 3563 3564 3565 3566
	int			els;

	/*
	 * Acquire lock on the entire shared lock data structure.
	 *
	 * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
	 */
	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
3567
		LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);
3568 3569 3570 3571 3572 3573 3574 3575 3576 3577

	/* Now we can safely count the number of proclocks */
	els = hash_get_num_entries(LockMethodProcLockHash);

	/*
	 * Allocating enough space for all locks in the lock table is overkill,
	 * but it's more convenient and faster than having to enlarge the array.
	 */
	accessExclusiveLocks = palloc(els * sizeof(xl_standby_lock));

3578 3579 3580
	/* Now scan the tables to copy the data */
	hash_seq_init(&seqstat, LockMethodProcLockHash);

3581
	/*
B
Bruce Momjian 已提交
3582 3583 3584 3585 3586
	 * If lock is a currently granted AccessExclusiveLock then it will have
	 * just one proclock holder, so locks are never accessed twice in this
	 * particular case. Don't copy this code for use elsewhere because in the
	 * general case this will give you duplicate locks when looking at
	 * non-exclusive lock types.
3587 3588 3589 3590 3591 3592 3593 3594
	 */
	index = 0;
	while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
	{
		/* make sure this definition matches the one used in LockAcquire */
		if ((proclock->holdMask & LOCKBIT_ON(AccessExclusiveLock)) &&
			proclock->tag.myLock->tag.locktag_type == LOCKTAG_RELATION)
		{
B
Bruce Momjian 已提交
3595
			PGPROC	   *proc = proclock->tag.myProc;
3596
			PGXACT	   *pgxact = &ProcGlobal->allPgXact[proc->pgprocno];
B
Bruce Momjian 已提交
3597
			LOCK	   *lock = proclock->tag.myLock;
3598
			TransactionId xid = pgxact->xid;
3599

3600
			/*
3601 3602 3603 3604
			 * Don't record locks for transactions if we know they have
			 * already issued their WAL record for commit but not yet released
			 * lock. It is still possible that we see locks held by already
			 * complete transactions, if they haven't yet zeroed their xids.
3605 3606 3607 3608 3609
			 */
			if (!TransactionIdIsValid(xid))
				continue;

			accessExclusiveLocks[index].xid = xid;
B
Bruce Momjian 已提交
3610
			accessExclusiveLocks[index].dbOid = lock->tag.locktag_field1;
3611 3612 3613 3614 3615 3616
			accessExclusiveLocks[index].relOid = lock->tag.locktag_field2;

			index++;
		}
	}

3617 3618
	Assert(index <= els);

3619 3620 3621 3622 3623 3624 3625 3626
	/*
	 * And release locks.  We do this in reverse order for two reasons: (1)
	 * Anyone else who needs more than one of the locks will be trying to lock
	 * them in increasing order; we don't want to release the other process
	 * until it can get all the locks it needs. (2) This avoids O(N^2)
	 * behavior inside LWLockRelease.
	 */
	for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
3627
		LWLockRelease(LockHashPartitionLockByIndex(i));
3628 3629 3630 3631 3632

	*nlocks = index;
	return accessExclusiveLocks;
}

3633 3634
/* Provide the textual name of any lock mode */
const char *
3635
GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode)
3636
{
3637 3638 3639
	Assert(lockmethodid > 0 && lockmethodid < lengthof(LockMethods));
	Assert(mode > 0 && mode <= LockMethods[lockmethodid]->numLockModes);
	return LockMethods[lockmethodid]->lockModeNames[mode];
3640
}
B
Bruce Momjian 已提交
3641

3642
#ifdef LOCK_DEBUG
3643
/*
3644
 * Dump all locks in the given proc's myProcLocks lists.
3645
 *
3646
 * Caller is responsible for having acquired appropriate LWLocks.
3647 3648
 */
void
3649
DumpLocks(PGPROC *proc)
3650
{
3651
	SHM_QUEUE  *procLocks;
3652
	PROCLOCK   *proclock;
3653
	LOCK	   *lock;
3654
	int			i;
3655

3656
	if (proc == NULL)
3657
		return;
3658

3659 3660 3661
	if (proc->waitLock)
		LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);

3662
	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
3663
	{
3664
		procLocks = &(proc->myProcLocks[i]);
3665

3666 3667
		proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
											 offsetof(PROCLOCK, procLink));
M
 
Marc G. Fournier 已提交
3668

3669 3670
		while (proclock)
		{
3671
			Assert(proclock->tag.myProc == proc);
3672

3673
			lock = proclock->tag.myLock;
3674 3675 3676 3677 3678 3679 3680 3681

			PROCLOCK_PRINT("DumpLocks", proclock);
			LOCK_PRINT("DumpLocks", lock, 0);

			proclock = (PROCLOCK *)
				SHMQueueNext(procLocks, &proclock->procLink,
							 offsetof(PROCLOCK, procLink));
		}
3682
	}
3683
}
3684

M
 
Marc G. Fournier 已提交
3685
/*
3686 3687 3688
 * Dump all lmgr locks.
 *
 * Caller is responsible for having acquired appropriate LWLocks.
M
 
Marc G. Fournier 已提交
3689 3690
 */
void
3691
DumpAllLocks(void)
M
 
Marc G. Fournier 已提交
3692
{
J
Jan Wieck 已提交
3693
	PGPROC	   *proc;
3694
	PROCLOCK   *proclock;
3695
	LOCK	   *lock;
3696
	HASH_SEQ_STATUS status;
M
 
Marc G. Fournier 已提交
3697

3698
	proc = MyProc;
M
 
Marc G. Fournier 已提交
3699

3700
	if (proc && proc->waitLock)
3701
		LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
M
 
Marc G. Fournier 已提交
3702

3703
	hash_seq_init(&status, LockMethodProcLockHash);
M
 
Marc G. Fournier 已提交
3704

3705 3706 3707
	while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL)
	{
		PROCLOCK_PRINT("DumpAllLocks", proclock);
3708

3709 3710 3711 3712 3713
		lock = proclock->tag.myLock;
		if (lock)
			LOCK_PRINT("DumpAllLocks", lock, 0);
		else
			elog(LOG, "DumpAllLocks: proclock->tag.myLock = NULL");
M
 
Marc G. Fournier 已提交
3714 3715
	}
}
3716
#endif   /* LOCK_DEBUG */
3717 3718 3719 3720 3721 3722 3723 3724 3725 3726 3727

/*
 * LOCK 2PC resource manager's routines
 */

/*
 * Re-acquire a lock belonging to a transaction that was prepared.
 *
 * Because this function is run at db startup, re-acquiring the locks should
 * never conflict with running transactions because there are none.  We
 * assume that the lock state represented by the stored 2PC files is legal.
3728 3729 3730
 *
 * When switching from Hot Standby mode to normal operation, the locks will
 * be already held by the startup process. The locks are acquired for the new
R
Robert Haas 已提交
3731
 * procs without checking for conflicts, so we don't get a conflict between the
3732 3733 3734 3735 3736 3737 3738 3739 3740 3741 3742 3743 3744 3745
 * startup process and the dummy procs, even though we will momentarily have
 * a situation where two procs are holding the same AccessExclusiveLock,
 * which isn't normally possible because the conflict. If we're in standby
 * mode, but a recovery snapshot hasn't been established yet, it's possible
 * that some but not all of the locks are already held by the startup process.
 *
 * This approach is simple, but also a bit dangerous, because if there isn't
 * enough shared memory to acquire the locks, an error will be thrown, which
 * is promoted to FATAL and recovery will abort, bringing down postmaster.
 * A safer approach would be to transfer the locks like we do in
 * AtPrepare_Locks, but then again, in hot standby mode it's possible for
 * read-only backends to use up all the shared lock memory anyway, so that
 * replaying the WAL record that needs to acquire a lock will throw an error
 * and PANIC anyway.
3746 3747 3748 3749 3750 3751 3752
 */
void
lock_twophase_recover(TransactionId xid, uint16 info,
					  void *recdata, uint32 len)
{
	TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
	PGPROC	   *proc = TwoPhaseGetDummyProc(xid);
B
Bruce Momjian 已提交
3753
	LOCKTAG    *locktag;
3754 3755 3756 3757 3758 3759
	LOCKMODE	lockmode;
	LOCKMETHODID lockmethodid;
	LOCK	   *lock;
	PROCLOCK   *proclock;
	PROCLOCKTAG proclocktag;
	bool		found;
3760 3761
	uint32		hashcode;
	uint32		proclock_hashcode;
3762
	int			partition;
3763
	LWLock	   *partitionLock;
3764 3765 3766 3767 3768 3769 3770
	LockMethod	lockMethodTable;

	Assert(len == sizeof(TwoPhaseLockRecord));
	locktag = &rec->locktag;
	lockmode = rec->lockmode;
	lockmethodid = locktag->locktag_lockmethodid;

3771
	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
3772
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
3773
	lockMethodTable = LockMethods[lockmethodid];
3774

3775 3776 3777
	hashcode = LockTagHashCode(locktag);
	partition = LockHashPartition(hashcode);
	partitionLock = LockHashPartitionLock(hashcode);
3778

3779
	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
3780 3781 3782 3783

	/*
	 * Find or create a lock with this tag.
	 */
3784 3785 3786 3787 3788
	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
												(void *) locktag,
												hashcode,
												HASH_ENTER_NULL,
												&found);
3789 3790
	if (!lock)
	{
3791
		LWLockRelease(partitionLock);
3792 3793 3794
		ereport(ERROR,
				(errcode(ERRCODE_OUT_OF_MEMORY),
				 errmsg("out of shared memory"),
B
Bruce Momjian 已提交
3795
		  errhint("You might need to increase max_locks_per_transaction.")));
3796 3797 3798 3799 3800 3801 3802 3803 3804 3805 3806 3807 3808 3809 3810 3811 3812 3813 3814 3815 3816 3817 3818 3819 3820 3821 3822 3823
	}

	/*
	 * if it's a new lock object, initialize it
	 */
	if (!found)
	{
		lock->grantMask = 0;
		lock->waitMask = 0;
		SHMQueueInit(&(lock->procLocks));
		ProcQueueInit(&(lock->waitProcs));
		lock->nRequested = 0;
		lock->nGranted = 0;
		MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
		MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
		LOCK_PRINT("lock_twophase_recover: new", lock, lockmode);
	}
	else
	{
		LOCK_PRINT("lock_twophase_recover: found", lock, lockmode);
		Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
		Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
		Assert(lock->nGranted <= lock->nRequested);
	}

	/*
	 * Create the hash key for the proclock table.
	 */
3824 3825 3826 3827
	proclocktag.myLock = lock;
	proclocktag.myProc = proc;

	proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
3828 3829 3830 3831

	/*
	 * Find or create a proclock entry with this tag
	 */
3832 3833 3834 3835 3836
	proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
														(void *) &proclocktag,
														proclock_hashcode,
														HASH_ENTER_NULL,
														&found);
3837 3838 3839 3840 3841 3842 3843 3844 3845 3846 3847 3848
	if (!proclock)
	{
		/* Ooops, not enough shmem for the proclock */
		if (lock->nRequested == 0)
		{
			/*
			 * There are no other requestors of this lock, so garbage-collect
			 * the lock object.  We *must* do this to avoid a permanent leak
			 * of shared memory, because there won't be anything to cause
			 * anyone to release the lock object later.
			 */
			Assert(SHMQueueEmpty(&(lock->procLocks)));
3849 3850 3851 3852 3853
			if (!hash_search_with_hash_value(LockMethodLockHash,
											 (void *) &(lock->tag),
											 hashcode,
											 HASH_REMOVE,
											 NULL))
3854 3855
				elog(PANIC, "lock table corrupted");
		}
3856
		LWLockRelease(partitionLock);
3857 3858 3859
		ereport(ERROR,
				(errcode(ERRCODE_OUT_OF_MEMORY),
				 errmsg("out of shared memory"),
B
Bruce Momjian 已提交
3860
		  errhint("You might need to increase max_locks_per_transaction.")));
3861 3862 3863 3864 3865 3866 3867
	}

	/*
	 * If new, initialize the new entry
	 */
	if (!found)
	{
3868 3869
		Assert(proc->lockGroupLeader == NULL);
		proclock->groupLeader = proc;
3870 3871 3872 3873
		proclock->holdMask = 0;
		proclock->releaseMask = 0;
		/* Add proclock to appropriate lists */
		SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
3874 3875
		SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
							 &proclock->procLink);
3876 3877 3878 3879 3880 3881 3882 3883 3884 3885
		PROCLOCK_PRINT("lock_twophase_recover: new", proclock);
	}
	else
	{
		PROCLOCK_PRINT("lock_twophase_recover: found", proclock);
		Assert((proclock->holdMask & ~lock->grantMask) == 0);
	}

	/*
	 * lock->nRequested and lock->requested[] count the total number of
B
Bruce Momjian 已提交
3886
	 * requests, whether granted or waiting, so increment those immediately.
3887 3888 3889 3890 3891 3892 3893 3894 3895 3896
	 */
	lock->nRequested++;
	lock->requested[lockmode]++;
	Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));

	/*
	 * We shouldn't already hold the desired lock.
	 */
	if (proclock->holdMask & LOCKBIT_ON(lockmode))
		elog(ERROR, "lock %s on object %u/%u/%u is already held",
3897
			 lockMethodTable->lockModeNames[lockmode],
3898 3899 3900 3901
			 lock->tag.locktag_field1, lock->tag.locktag_field2,
			 lock->tag.locktag_field3);

	/*
B
Bruce Momjian 已提交
3902 3903
	 * We ignore any possible conflicts and just grant ourselves the lock. Not
	 * only because we don't bother, but also to avoid deadlocks when
3904
	 * switching from standby to normal mode. See function comment.
3905 3906 3907
	 */
	GrantLock(lock, proclock, lockmode);

3908
	/*
3909 3910 3911
	 * Bump strong lock count, to make sure any fast-path lock requests won't
	 * be granted without consulting the primary lock table.
	 */
3912
	if (ConflictsWithRelationFastPath(&lock->tag, lockmode))
3913
	{
3914 3915
		uint32		fasthashcode = FastPathStrongLockHashPartition(hashcode);

3916 3917 3918
		SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
		FastPathStrongRelationLocks->count[fasthashcode]++;
		SpinLockRelease(&FastPathStrongRelationLocks->mutex);
3919 3920
	}

3921
	LWLockRelease(partitionLock);
3922 3923
}

3924 3925
/*
 * Re-acquire a lock belonging to a transaction that was prepared, when
H
Heikki Linnakangas 已提交
3926
 * starting up into hot standby mode.
3927 3928 3929 3930 3931 3932 3933 3934 3935 3936 3937 3938 3939 3940 3941 3942 3943 3944 3945 3946 3947 3948
 */
void
lock_twophase_standby_recover(TransactionId xid, uint16 info,
							  void *recdata, uint32 len)
{
	TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
	LOCKTAG    *locktag;
	LOCKMODE	lockmode;
	LOCKMETHODID lockmethodid;

	Assert(len == sizeof(TwoPhaseLockRecord));
	locktag = &rec->locktag;
	lockmode = rec->lockmode;
	lockmethodid = locktag->locktag_lockmethodid;

	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);

	if (lockmode == AccessExclusiveLock &&
		locktag->locktag_type == LOCKTAG_RELATION)
	{
		StandbyAcquireAccessExclusiveLock(xid,
B
Bruce Momjian 已提交
3949 3950
										locktag->locktag_field1 /* dboid */ ,
									  locktag->locktag_field2 /* reloid */ );
3951 3952 3953 3954
	}
}


3955 3956 3957 3958 3959 3960 3961 3962 3963 3964 3965
/*
 * 2PC processing routine for COMMIT PREPARED case.
 *
 * Find and release the lock indicated by the 2PC record.
 */
void
lock_twophase_postcommit(TransactionId xid, uint16 info,
						 void *recdata, uint32 len)
{
	TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
	PGPROC	   *proc = TwoPhaseGetDummyProc(xid);
B
Bruce Momjian 已提交
3966
	LOCKTAG    *locktag;
3967 3968 3969 3970 3971 3972 3973
	LOCKMETHODID lockmethodid;
	LockMethod	lockMethodTable;

	Assert(len == sizeof(TwoPhaseLockRecord));
	locktag = &rec->locktag;
	lockmethodid = locktag->locktag_lockmethodid;

3974
	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
3975
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
3976
	lockMethodTable = LockMethods[lockmethodid];
3977

3978
	LockRefindAndRelease(lockMethodTable, proc, locktag, rec->lockmode, true);
3979 3980 3981 3982 3983 3984 3985 3986 3987 3988 3989 3990 3991
}

/*
 * 2PC processing routine for ROLLBACK PREPARED case.
 *
 * This is actually just the same as the COMMIT case.
 */
void
lock_twophase_postabort(TransactionId xid, uint16 info,
						void *recdata, uint32 len)
{
	lock_twophase_postcommit(xid, info, recdata, len);
}
3992 3993 3994 3995 3996 3997 3998 3999 4000 4001 4002

/*
 *		VirtualXactLockTableInsert
 *
 *		Take vxid lock via the fast-path.  There can't be any pre-existing
 *		lockers, as we haven't advertised this vxid via the ProcArray yet.
 *
 *		Since MyProc->fpLocalTransactionId will normally contain the same data
 *		as MyProc->lxid, you might wonder if we really need both.  The
 *		difference is that MyProc->lxid is set and cleared unlocked, and
 *		examined by procarray.c, while fpLocalTransactionId is protected by
B
Bruce Momjian 已提交
4003
 *		backendLock and is used only by the locking subsystem.  Doing it this
4004 4005 4006 4007 4008 4009 4010 4011 4012 4013 4014
 *		way makes it easier to verify that there are no funny race conditions.
 *
 *		We don't bother recording this lock in the local lock table, since it's
 *		only ever released at the end of a transaction.  Instead,
 *		LockReleaseAll() calls VirtualXactLockTableCleanup().
 */
void
VirtualXactLockTableInsert(VirtualTransactionId vxid)
{
	Assert(VirtualTransactionIdIsValid(vxid));

4015
	LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
4016 4017 4018 4019 4020 4021 4022 4023

	Assert(MyProc->backendId == vxid.backendId);
	Assert(MyProc->fpLocalTransactionId == InvalidLocalTransactionId);
	Assert(MyProc->fpVXIDLock == false);

	MyProc->fpVXIDLock = true;
	MyProc->fpLocalTransactionId = vxid.localTransactionId;

4024
	LWLockRelease(&MyProc->backendLock);
4025 4026 4027 4028 4029 4030 4031 4032
}

/*
 *		VirtualXactLockTableCleanup
 *
 *		Check whether a VXID lock has been materialized; if so, release it,
 *		unblocking waiters.
 */
4033
void
4034
VirtualXactLockTableCleanup(void)
4035
{
4036 4037
	bool		fastpath;
	LocalTransactionId lxid;
4038 4039 4040 4041 4042 4043

	Assert(MyProc->backendId != InvalidBackendId);

	/*
	 * Clean up shared memory state.
	 */
4044
	LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
4045 4046 4047 4048 4049 4050

	fastpath = MyProc->fpVXIDLock;
	lxid = MyProc->fpLocalTransactionId;
	MyProc->fpVXIDLock = false;
	MyProc->fpLocalTransactionId = InvalidLocalTransactionId;

4051
	LWLockRelease(&MyProc->backendLock);
4052 4053 4054 4055 4056 4057 4058

	/*
	 * If fpVXIDLock has been cleared without touching fpLocalTransactionId,
	 * that means someone transferred the lock to the main lock table.
	 */
	if (!fastpath && LocalTransactionIdIsValid(lxid))
	{
4059 4060
		VirtualTransactionId vxid;
		LOCKTAG		locktag;
4061 4062 4063 4064 4065 4066 4067

		vxid.backendId = MyBackendId;
		vxid.localTransactionId = lxid;
		SET_LOCKTAG_VIRTUALTRANSACTION(locktag, vxid);

		LockRefindAndRelease(LockMethods[DEFAULT_LOCKMETHOD], MyProc,
							 &locktag, ExclusiveLock, false);
4068
	}
4069 4070 4071 4072 4073 4074 4075 4076 4077 4078 4079 4080 4081 4082 4083 4084 4085 4086 4087 4088 4089 4090 4091 4092 4093 4094 4095 4096 4097 4098
}

/*
 *		VirtualXactLock
 *
 * If wait = true, wait until the given VXID has been released, and then
 * return true.
 *
 * If wait = false, just check whether the VXID is still running, and return
 * true or false.
 */
bool
VirtualXactLock(VirtualTransactionId vxid, bool wait)
{
	LOCKTAG		tag;
	PGPROC	   *proc;

	Assert(VirtualTransactionIdIsValid(vxid));

	SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid);

	/*
	 * If a lock table entry must be made, this is the PGPROC on whose behalf
	 * it must be done.  Note that the transaction might end or the PGPROC
	 * might be reassigned to a new backend before we get around to examining
	 * it, but it doesn't matter.  If we find upon examination that the
	 * relevant lxid is no longer running here, that's enough to prove that
	 * it's no longer running anywhere.
	 */
	proc = BackendIdGetProc(vxid.backendId);
R
Robert Haas 已提交
4099 4100
	if (proc == NULL)
		return true;
4101 4102 4103

	/*
	 * We must acquire this lock before checking the backendId and lxid
4104 4105
	 * against the ones we're waiting for.  The target backend will only set
	 * or clear lxid while holding this lock.
4106
	 */
4107
	LWLockAcquire(&proc->backendLock, LW_EXCLUSIVE);
4108 4109 4110 4111 4112

	/* If the transaction has ended, our work here is done. */
	if (proc->backendId != vxid.backendId
		|| proc->fpLocalTransactionId != vxid.localTransactionId)
	{
4113
		LWLockRelease(&proc->backendLock);
4114 4115 4116 4117 4118 4119 4120 4121 4122
		return true;
	}

	/*
	 * If we aren't asked to wait, there's no need to set up a lock table
	 * entry.  The transaction is still in progress, so just return false.
	 */
	if (!wait)
	{
4123
		LWLockRelease(&proc->backendLock);
4124 4125 4126 4127 4128
		return false;
	}

	/*
	 * OK, we're going to need to sleep on the VXID.  But first, we must set
4129 4130
	 * up the primary lock table entry, if needed (ie, convert the proc's
	 * fast-path lock on its VXID to a regular lock).
4131 4132 4133 4134 4135
	 */
	if (proc->fpVXIDLock)
	{
		PROCLOCK   *proclock;
		uint32		hashcode;
4136
		LWLock	   *partitionLock;
4137 4138

		hashcode = LockTagHashCode(&tag);
4139 4140 4141 4142

		partitionLock = LockHashPartitionLock(hashcode);
		LWLockAcquire(partitionLock, LW_EXCLUSIVE);

4143 4144 4145
		proclock = SetupLockInTable(LockMethods[DEFAULT_LOCKMETHOD], proc,
									&tag, hashcode, ExclusiveLock);
		if (!proclock)
4146 4147
		{
			LWLockRelease(partitionLock);
4148
			LWLockRelease(&proc->backendLock);
4149 4150 4151
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of shared memory"),
4152
					 errhint("You might need to increase max_locks_per_transaction.")));
4153
		}
4154
		GrantLock(proclock->tag.myLock, proclock, ExclusiveLock);
4155 4156 4157

		LWLockRelease(partitionLock);

4158 4159 4160 4161
		proc->fpVXIDLock = false;
	}

	/* Done with proc->fpLockBits */
4162
	LWLockRelease(&proc->backendLock);
4163 4164 4165 4166 4167 4168 4169

	/* Time to wait. */
	(void) LockAcquire(&tag, ShareLock, false, false);

	LockRelease(&tag, ShareLock, false);
	return true;
}