lock.c 71.6 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * lock.c
4
 *	  POSTGRES primary lock mechanism
5
 *
6
 * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
B
Add:  
Bruce Momjian 已提交
7
 * Portions Copyright (c) 1994, Regents of the University of California
8 9 10
 *
 *
 * IDENTIFICATION
11
 *	  $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.183 2008/03/17 19:44:41 petere Exp $
12 13
 *
 * NOTES
14
 *	  A lock table is a shared memory hash table.  When
15
 *	  a process tries to acquire a lock of a type that conflicts
16 17
 *	  with existing locks, it is put to sleep using the routines
 *	  in storage/lmgr/proc.c.
18
 *
19 20 21
 *	  For the most part, this code should be invoked via lmgr.c
 *	  or another lock-management module, not directly.
 *
22
 *	Interface:
23
 *
24 25
 *	InitLocks(), GetLocksMethodTable(),
 *	LockAcquire(), LockRelease(), LockReleaseAll(),
26
 *	LockCheckConflicts(), GrantLock()
27 28 29
 *
 *-------------------------------------------------------------------------
 */
30 31
#include "postgres.h"

M
 
Marc G. Fournier 已提交
32
#include <signal.h>
33
#include <unistd.h>
B
Bruce Momjian 已提交
34

35
#include "access/transam.h"
36 37
#include "access/twophase.h"
#include "access/twophase_rmgr.h"
B
Bruce Momjian 已提交
38
#include "miscadmin.h"
39
#include "pgstat.h"
40
#include "utils/memutils.h"
M
 
Marc G. Fournier 已提交
41
#include "utils/ps_status.h"
42
#include "utils/resowner.h"
43
#include "pg_trace.h"
44

45 46

/* This configuration variable is used to set the lock table size */
47
int			max_locks_per_xact; /* set by guc.c */
48

B
Bruce Momjian 已提交
49
#define NLOCKENTS() \
50
	mul_size(max_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
51 52


53
/*
54 55 56
 * Data structures defining the semantics of the standard lock methods.
 *
 * The conflict table defines the semantics of the various lock modes.
57
 */
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
static const LOCKMASK LockConflicts[] = {
	0,

	/* AccessShareLock */
	(1 << AccessExclusiveLock),

	/* RowShareLock */
	(1 << ExclusiveLock) | (1 << AccessExclusiveLock),

	/* RowExclusiveLock */
	(1 << ShareLock) | (1 << ShareRowExclusiveLock) |
	(1 << ExclusiveLock) | (1 << AccessExclusiveLock),

	/* ShareUpdateExclusiveLock */
	(1 << ShareUpdateExclusiveLock) |
	(1 << ShareLock) | (1 << ShareRowExclusiveLock) |
	(1 << ExclusiveLock) | (1 << AccessExclusiveLock),

	/* ShareLock */
	(1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
	(1 << ShareRowExclusiveLock) |
	(1 << ExclusiveLock) | (1 << AccessExclusiveLock),

	/* ShareRowExclusiveLock */
	(1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
	(1 << ShareLock) | (1 << ShareRowExclusiveLock) |
	(1 << ExclusiveLock) | (1 << AccessExclusiveLock),

	/* ExclusiveLock */
	(1 << RowShareLock) |
	(1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
	(1 << ShareLock) | (1 << ShareRowExclusiveLock) |
	(1 << ExclusiveLock) | (1 << AccessExclusiveLock),

	/* AccessExclusiveLock */
	(1 << AccessShareLock) | (1 << RowShareLock) |
	(1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
	(1 << ShareLock) | (1 << ShareRowExclusiveLock) |
	(1 << ExclusiveLock) | (1 << AccessExclusiveLock)
97

98
};
99

100
/* Names of lock modes, for debug printouts */
B
Bruce Momjian 已提交
101
static const char *const lock_mode_names[] =
102
{
103
	"INVALID",
104 105 106 107 108 109 110 111
	"AccessShareLock",
	"RowShareLock",
	"RowExclusiveLock",
	"ShareUpdateExclusiveLock",
	"ShareLock",
	"ShareRowExclusiveLock",
	"ExclusiveLock",
	"AccessExclusiveLock"
112
};
113

114
#ifndef LOCK_DEBUG
B
Bruce Momjian 已提交
115
static bool Dummy_trace = false;
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
#endif

static const LockMethodData default_lockmethod = {
	AccessExclusiveLock,		/* highest valid lock mode number */
	true,
	LockConflicts,
	lock_mode_names,
#ifdef LOCK_DEBUG
	&Trace_locks
#else
	&Dummy_trace
#endif
};

static const LockMethodData user_lockmethod = {
	AccessExclusiveLock,		/* highest valid lock mode number */
	false,
	LockConflicts,
	lock_mode_names,
#ifdef LOCK_DEBUG
	&Trace_userlocks
#else
	&Dummy_trace
#endif
};

/*
 * map from lock method id to the lock table data structures
 */
static const LockMethod LockMethods[] = {
	NULL,
	&default_lockmethod,
	&user_lockmethod
};


/* Record that's written to 2PC state file when a lock is persisted */
typedef struct TwoPhaseLockRecord
{
	LOCKTAG		locktag;
	LOCKMODE	lockmode;
} TwoPhaseLockRecord;


/*
161 162 163 164
 * Pointers to hash tables containing lock state
 *
 * The LockMethodLockHash and LockMethodProcLockHash hash tables are in
 * shared memory; LockMethodLocalHash is local to each backend.
165
 */
166 167
static HTAB *LockMethodLockHash;
static HTAB *LockMethodProcLockHash;
168 169 170 171 172 173 174
static HTAB *LockMethodLocalHash;


/* private state for GrantAwaitedLock */
static LOCALLOCK *awaitedLock;
static ResourceOwner awaitedOwner;

175 176 177 178 179 180

#ifdef LOCK_DEBUG

/*------
 * The following configuration options are available for lock debugging:
 *
B
Bruce Momjian 已提交
181 182 183 184 185 186
 *	   TRACE_LOCKS		-- give a bunch of output what's going on in this file
 *	   TRACE_USERLOCKS	-- same but for user locks
 *	   TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
 *						   (use to avoid output on system tables)
 *	   TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
 *	   DEBUG_DEADLOCKS	-- currently dumps locks at untimely occasions ;)
B
Bruce Momjian 已提交
187
 *
188 189
 * Furthermore, but in storage/lmgr/lwlock.c:
 *	   TRACE_LWLOCKS	-- trace lightweight locks (pretty useless)
190
 *
B
Bruce Momjian 已提交
191 192
 * Define LOCK_DEBUG at compile time to get all these enabled.
 * --------
193 194
 */

195
int			Trace_lock_oidmin = FirstNormalObjectId;
B
Bruce Momjian 已提交
196 197 198 199
bool		Trace_locks = false;
bool		Trace_userlocks = false;
int			Trace_lock_table = 0;
bool		Debug_deadlocks = false;
200 201 202


inline static bool
203
LOCK_DEBUG_ENABLED(const LOCKTAG *tag)
204
{
B
Bruce Momjian 已提交
205
	return
206 207 208 209
		(*(LockMethods[tag->locktag_lockmethodid]->trace_flag) &&
		 ((Oid) tag->locktag_field2 >= (Oid) Trace_lock_oidmin))
		|| (Trace_lock_table &&
			(tag->locktag_field2 == Trace_lock_table));
210 211 212 213
}


inline static void
B
Bruce Momjian 已提交
214
LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
215
{
216
	if (LOCK_DEBUG_ENABLED(&lock->tag))
217
		elog(LOG,
218
			 "%s: lock(%p) id(%u,%u,%u,%u,%u,%u) grantMask(%x) "
B
Bruce Momjian 已提交
219 220
			 "req(%d,%d,%d,%d,%d,%d,%d)=%d "
			 "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
221
			 where, lock,
222 223 224 225
			 lock->tag.locktag_field1, lock->tag.locktag_field2,
			 lock->tag.locktag_field3, lock->tag.locktag_field4,
			 lock->tag.locktag_type, lock->tag.locktag_lockmethodid,
			 lock->grantMask,
B
Bruce Momjian 已提交
226
			 lock->requested[1], lock->requested[2], lock->requested[3],
227 228
			 lock->requested[4], lock->requested[5], lock->requested[6],
			 lock->requested[7], lock->nRequested,
B
Bruce Momjian 已提交
229 230 231
			 lock->granted[1], lock->granted[2], lock->granted[3],
			 lock->granted[4], lock->granted[5], lock->granted[6],
			 lock->granted[7], lock->nGranted,
232 233
			 lock->waitProcs.size,
			 LockMethods[LOCK_LOCKMETHOD(*lock)]->lockModeNames[type]);
234 235 236 237
}


inline static void
238
PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
239
{
240
	if (LOCK_DEBUG_ENABLED(&proclockP->tag.myLock->tag))
241
		elog(LOG,
242 243
			 "%s: proclock(%p) lock(%p) method(%u) proc(%p) hold(%x)",
			 where, proclockP, proclockP->tag.myLock,
244
			 PROCLOCK_LOCKMETHOD(*(proclockP)),
245
			 proclockP->tag.myProc, (int) proclockP->holdMask);
246
}
B
Bruce Momjian 已提交
247
#else							/* not LOCK_DEBUG */
248 249

#define LOCK_PRINT(where, lock, type)
250
#define PROCLOCK_PRINT(where, proclockP)
251
#endif   /* not LOCK_DEBUG */
252

253

254
static uint32 proclock_hash(const void *key, Size keysize);
255 256
static void RemoveLocalLock(LOCALLOCK *locallock);
static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
257
static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
258
static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
B
Bruce Momjian 已提交
259
			PROCLOCK *proclock, LockMethod lockMethodTable);
260
static void CleanUpLock(LOCK *lock, PROCLOCK *proclock,
261
			LockMethod lockMethodTable, uint32 hashcode,
262
			bool wakeupNeeded);
263

264

B
Bruce Momjian 已提交
265
/*
266 267 268 269 270 271 272 273 274 275
 * InitLocks -- Initialize the lock manager's data structures.
 *
 * This is called from CreateSharedMemoryAndSemaphores(), which see for
 * more comments.  In the normal postmaster case, the shared hash tables
 * are created here, as well as a locallock hash table that will remain
 * unused and empty in the postmaster itself.  Backends inherit the pointers
 * to the shared tables via fork(), and also inherit an image of the locallock
 * hash table, which they proceed to use.  In the EXEC_BACKEND case, each
 * backend re-executes this code to obtain pointers to the already existing
 * shared hash tables and to create its locallock hash table.
276 277
 */
void
278
InitLocks(void)
279
{
280 281
	HASHCTL		info;
	int			hash_flags;
282 283
	long		init_table_size,
				max_table_size;
284

285 286 287 288
	/*
	 * Compute init/max size to request for lock hashtables.  Note these
	 * calculations must agree with LockShmemSize!
	 */
289
	max_table_size = NLOCKENTS();
290
	init_table_size = max_table_size / 2;
291

B
Bruce Momjian 已提交
292
	/*
B
Bruce Momjian 已提交
293 294
	 * Allocate hash table for LOCK structs.  This stores per-locked-object
	 * information.
295
	 */
296
	MemSet(&info, 0, sizeof(info));
297 298
	info.keysize = sizeof(LOCKTAG);
	info.entrysize = sizeof(LOCK);
299
	info.hash = tag_hash;
300 301
	info.num_partitions = NUM_LOCK_PARTITIONS;
	hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
302

303 304 305 306 307 308 309
	LockMethodLockHash = ShmemInitHash("LOCK hash",
									   init_table_size,
									   max_table_size,
									   &info,
									   hash_flags);
	if (!LockMethodLockHash)
		elog(FATAL, "could not initialize lock hash table");
310

311 312 313
	/* Assume an average of 2 holders per lock */
	max_table_size *= 2;
	init_table_size *= 2;
314

B
Bruce Momjian 已提交
315
	/*
316
	 * Allocate hash table for PROCLOCK structs.  This stores
317
	 * per-lock-per-holder information.
318
	 */
B
Bruce Momjian 已提交
319 320
	info.keysize = sizeof(PROCLOCKTAG);
	info.entrysize = sizeof(PROCLOCK);
321 322 323 324 325 326 327 328 329 330 331
	info.hash = proclock_hash;
	info.num_partitions = NUM_LOCK_PARTITIONS;
	hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);

	LockMethodProcLockHash = ShmemInitHash("PROCLOCK hash",
										   init_table_size,
										   max_table_size,
										   &info,
										   hash_flags);
	if (!LockMethodProcLockHash)
		elog(FATAL, "could not initialize proclock hash table");
332 333

	/*
B
Bruce Momjian 已提交
334 335
	 * Allocate non-shared hash table for LOCALLOCK structs.  This stores lock
	 * counts and resource owner information.
336
	 *
337 338 339 340
	 * The non-shared table could already exist in this process (this occurs
	 * when the postmaster is recreating shared memory after a backend crash).
	 * If so, delete and recreate it.  (We could simply leave it, since it
	 * ought to be empty in the postmaster, but for safety let's zap it.)
341
	 */
342 343
	if (LockMethodLocalHash)
		hash_destroy(LockMethodLocalHash);
344 345 346 347 348 349

	info.keysize = sizeof(LOCALLOCKTAG);
	info.entrysize = sizeof(LOCALLOCK);
	info.hash = tag_hash;
	hash_flags = (HASH_ELEM | HASH_FUNCTION);

350 351 352 353
	LockMethodLocalHash = hash_create("LOCALLOCK hash",
									  128,
									  &info,
									  hash_flags);
354 355
}

356

357
/*
358
 * Fetch the lock method table associated with a given lock
359
 */
360 361
LockMethod
GetLocksMethodTable(const LOCK *lock)
362
{
363
	LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock);
364

365 366
	Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
	return LockMethods[lockmethodid];
367 368
}

369

370
/*
371
 * Compute the hash code associated with a LOCKTAG.
372
 *
373 374 375 376
 * To avoid unnecessary recomputations of the hash code, we try to do this
 * just once per function, and then pass it around as needed.  Aside from
 * passing the hashcode to hash_search_with_hash_value(), we can extract
 * the lock partition number from the hashcode.
377
 */
378 379
uint32
LockTagHashCode(const LOCKTAG *locktag)
380
{
381 382
	return get_hash_value(LockMethodLockHash, (const void *) locktag);
}
383

384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
/*
 * Compute the hash code associated with a PROCLOCKTAG.
 *
 * Because we want to use just one set of partition locks for both the
 * LOCK and PROCLOCK hash tables, we have to make sure that PROCLOCKs
 * fall into the same partition number as their associated LOCKs.
 * dynahash.c expects the partition number to be the low-order bits of
 * the hash code, and therefore a PROCLOCKTAG's hash code must have the
 * same low-order bits as the associated LOCKTAG's hash code.  We achieve
 * this with this specialized hash function.
 */
static uint32
proclock_hash(const void *key, Size keysize)
{
	const PROCLOCKTAG *proclocktag = (const PROCLOCKTAG *) key;
B
Bruce Momjian 已提交
399 400
	uint32		lockhash;
	Datum		procptr;
401 402 403 404 405 406 407 408 409

	Assert(keysize == sizeof(PROCLOCKTAG));

	/* Look into the associated LOCK object, and compute its hash code */
	lockhash = LockTagHashCode(&proclocktag->myLock->tag);

	/*
	 * To make the hash code also depend on the PGPROC, we xor the proc
	 * struct's address into the hash code, left-shifted so that the
B
Bruce Momjian 已提交
410 411 412
	 * partition-number bits don't change.  Since this is only a hash, we
	 * don't care if we lose high-order bits of the address; use an
	 * intermediate variable to suppress cast-pointer-to-int warnings.
413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428
	 */
	procptr = PointerGetDatum(proclocktag->myProc);
	lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS;

	return lockhash;
}

/*
 * Compute the hash code associated with a PROCLOCKTAG, given the hashcode
 * for its underlying LOCK.
 *
 * We use this just to avoid redundant calls of LockTagHashCode().
 */
static inline uint32
ProcLockHashCode(const PROCLOCKTAG *proclocktag, uint32 hashcode)
{
B
Bruce Momjian 已提交
429 430
	uint32		lockhash = hashcode;
	Datum		procptr;
431 432 433 434 435 436 437 438

	/*
	 * This must match proclock_hash()!
	 */
	procptr = PointerGetDatum(proclocktag->myProc);
	lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS;

	return lockhash;
439 440 441
}


442 443
/*
 * LockAcquire -- Check for lock conflicts, sleep if conflict found,
444
 *		set lock if/when no conflicts.
445
 *
446 447 448 449 450 451
 * Inputs:
 *	locktag: unique identifier for the lockable object
 *	lockmode: lock mode to acquire
 *	sessionLock: if true, acquire lock for session not current transaction
 *	dontWait: if true, don't wait to acquire lock
 *
452 453 454 455 456 457 458 459
 * Returns one of:
 *		LOCKACQUIRE_NOT_AVAIL		lock not available, and dontWait=true
 *		LOCKACQUIRE_OK				lock successfully acquired
 *		LOCKACQUIRE_ALREADY_HELD	incremented count for lock already held
 *
 * In the normal case where dontWait=false and the caller doesn't need to
 * distinguish a freshly acquired lock from one already taken earlier in
 * this same transaction, there is no need to examine the return value.
460
 *
461 462 463 464
 * Side Effects: The lock is acquired and recorded in lock tables.
 *
 * NOTE: if we wait for the lock, there is no way to abort the wait
 * short of aborting the transaction.
465
 */
466
LockAcquireResult
467
LockAcquire(const LOCKTAG *locktag,
468 469 470
			LOCKMODE lockmode,
			bool sessionLock,
			bool dontWait)
471
{
472 473
	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
	LockMethod	lockMethodTable;
474 475 476
	LOCALLOCKTAG localtag;
	LOCALLOCK  *locallock;
	LOCK	   *lock;
477 478
	PROCLOCK   *proclock;
	PROCLOCKTAG proclocktag;
479
	bool		found;
480
	ResourceOwner owner;
481 482
	uint32		hashcode;
	uint32		proclock_hashcode;
483 484
	int			partition;
	LWLockId	partitionLock;
485
	int			status;
486

487 488 489 490 491 492
	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
	lockMethodTable = LockMethods[lockmethodid];
	if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
		elog(ERROR, "unrecognized lock mode: %d", lockmode);

493
#ifdef LOCK_DEBUG
494 495
	if (LOCK_DEBUG_ENABLED(locktag))
		elog(LOG, "LockAcquire: lock [%u,%u] %s",
496
			 locktag->locktag_field1, locktag->locktag_field2,
497
			 lockMethodTable->lockModeNames[lockmode]);
498 499
#endif

500 501
	/* Session locks are never transactional, else check table */
	if (!sessionLock && lockMethodTable->transactional)
502 503 504 505 506 507 508
		owner = CurrentResourceOwner;
	else
		owner = NULL;

	/*
	 * Find or create a LOCALLOCK entry for this lock and lockmode
	 */
B
Bruce Momjian 已提交
509
	MemSet(&localtag, 0, sizeof(localtag));		/* must clear padding */
510 511 512
	localtag.lock = *locktag;
	localtag.mode = lockmode;

513
	locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
514 515 516 517 518 519 520 521 522 523
										  (void *) &localtag,
										  HASH_ENTER, &found);

	/*
	 * if it's a new locallock object, initialize it
	 */
	if (!found)
	{
		locallock->lock = NULL;
		locallock->proclock = NULL;
524
		locallock->hashcode = LockTagHashCode(&(localtag.lock));
525 526 527 528 529 530
		locallock->nLocks = 0;
		locallock->numLockOwners = 0;
		locallock->maxLockOwners = 8;
		locallock->lockOwners = NULL;
		locallock->lockOwners = (LOCALLOCKOWNER *)
			MemoryContextAlloc(TopMemoryContext,
B
Bruce Momjian 已提交
531
						  locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
532 533 534 535 536 537
	}
	else
	{
		/* Make sure there will be room to remember the lock */
		if (locallock->numLockOwners >= locallock->maxLockOwners)
		{
B
Bruce Momjian 已提交
538
			int			newsize = locallock->maxLockOwners * 2;
539 540 541 542 543 544 545 546 547

			locallock->lockOwners = (LOCALLOCKOWNER *)
				repalloc(locallock->lockOwners,
						 newsize * sizeof(LOCALLOCKOWNER));
			locallock->maxLockOwners = newsize;
		}
	}

	/*
B
Bruce Momjian 已提交
548
	 * If we already hold the lock, we can just increase the count locally.
549 550 551 552
	 */
	if (locallock->nLocks > 0)
	{
		GrantLockLocal(locallock, owner);
553
		return LOCKACQUIRE_ALREADY_HELD;
554 555 556 557 558
	}

	/*
	 * Otherwise we've got to mess with the shared lock table.
	 */
559 560 561
	hashcode = locallock->hashcode;
	partition = LockHashPartition(hashcode);
	partitionLock = LockHashPartitionLock(hashcode);
562

563
	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
564

M
 
Marc G. Fournier 已提交
565
	/*
566 567
	 * Find or create a lock with this tag.
	 *
568 569
	 * Note: if the locallock object already existed, it might have a pointer
	 * to the lock already ... but we probably should not assume that that
570 571
	 * pointer is valid, since a lock object with no locks can go away
	 * anytime.
M
 
Marc G. Fournier 已提交
572
	 */
573 574 575 576 577
	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
												(void *) locktag,
												hashcode,
												HASH_ENTER_NULL,
												&found);
578
	if (!lock)
579
	{
580
		LWLockRelease(partitionLock);
581 582
		ereport(ERROR,
				(errcode(ERRCODE_OUT_OF_MEMORY),
583
				 errmsg("out of shared memory"),
B
Bruce Momjian 已提交
584
		  errhint("You might need to increase max_locks_per_transaction.")));
585
	}
586
	locallock->lock = lock;
587

B
Bruce Momjian 已提交
588
	/*
589
	 * if it's a new lock object, initialize it
590 591
	 */
	if (!found)
592
	{
593 594
		lock->grantMask = 0;
		lock->waitMask = 0;
595
		SHMQueueInit(&(lock->procLocks));
596
		ProcQueueInit(&(lock->waitProcs));
597 598
		lock->nRequested = 0;
		lock->nGranted = 0;
599 600
		MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
		MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
M
 
Marc G. Fournier 已提交
601
		LOCK_PRINT("LockAcquire: new", lock, lockmode);
602 603 604
	}
	else
	{
M
 
Marc G. Fournier 已提交
605
		LOCK_PRINT("LockAcquire: found", lock, lockmode);
606 607 608
		Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
		Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
		Assert(lock->nGranted <= lock->nRequested);
609
	}
610

B
Bruce Momjian 已提交
611
	/*
612
	 * Create the hash key for the proclock table.
613
	 */
614 615 616 617
	proclocktag.myLock = lock;
	proclocktag.myProc = MyProc;

	proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
618

M
 
Marc G. Fournier 已提交
619
	/*
620
	 * Find or create a proclock entry with this tag
M
 
Marc G. Fournier 已提交
621
	 */
622 623 624 625 626
	proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
														(void *) &proclocktag,
														proclock_hashcode,
														HASH_ENTER_NULL,
														&found);
627
	if (!proclock)
628
	{
629 630 631 632 633 634 635 636 637 638
		/* Ooops, not enough shmem for the proclock */
		if (lock->nRequested == 0)
		{
			/*
			 * There are no other requestors of this lock, so garbage-collect
			 * the lock object.  We *must* do this to avoid a permanent leak
			 * of shared memory, because there won't be anything to cause
			 * anyone to release the lock object later.
			 */
			Assert(SHMQueueEmpty(&(lock->procLocks)));
639 640 641 642 643
			if (!hash_search_with_hash_value(LockMethodLockHash,
											 (void *) &(lock->tag),
											 hashcode,
											 HASH_REMOVE,
											 NULL))
644
				elog(PANIC, "lock table corrupted");
645
		}
646
		LWLockRelease(partitionLock);
647 648
		ereport(ERROR,
				(errcode(ERRCODE_OUT_OF_MEMORY),
649
				 errmsg("out of shared memory"),
B
Bruce Momjian 已提交
650
		  errhint("You might need to increase max_locks_per_transaction.")));
651
	}
652
	locallock->proclock = proclock;
M
 
Marc G. Fournier 已提交
653 654

	/*
655
	 * If new, initialize the new entry
M
 
Marc G. Fournier 已提交
656
	 */
657
	if (!found)
658
	{
659
		proclock->holdMask = 0;
660
		proclock->releaseMask = 0;
661
		/* Add proclock to appropriate lists */
662
		SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
663 664
		SHMQueueInsertBefore(&(MyProc->myProcLocks[partition]),
							 &proclock->procLink);
665
		PROCLOCK_PRINT("LockAcquire: new", proclock);
666 667 668
	}
	else
	{
669
		PROCLOCK_PRINT("LockAcquire: found", proclock);
670
		Assert((proclock->holdMask & ~lock->grantMask) == 0);
671

672
#ifdef CHECK_DEADLOCK_RISK
B
Bruce Momjian 已提交
673

674
		/*
B
Bruce Momjian 已提交
675 676 677 678 679
		 * Issue warning if we already hold a lower-level lock on this object
		 * and do not hold a lock of the requested level or higher. This
		 * indicates a deadlock-prone coding practice (eg, we'd have a
		 * deadlock if another backend were following the same code path at
		 * about the same time).
680
		 *
B
Bruce Momjian 已提交
681 682 683
		 * This is not enabled by default, because it may generate log entries
		 * about user-level coding practices that are in fact safe in context.
		 * It can be enabled to help find system-level problems.
684
		 *
B
Bruce Momjian 已提交
685 686
		 * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
		 * better to use a table.  For now, though, this works.
687 688
		 */
		{
B
Bruce Momjian 已提交
689
			int			i;
690 691

			for (i = lockMethodTable->numLockModes; i > 0; i--)
692
			{
693 694 695
				if (proclock->holdMask & LOCKBIT_ON(i))
				{
					if (i >= (int) lockmode)
B
Bruce Momjian 已提交
696
						break;	/* safe: we have a lock >= req level */
697 698
					elog(LOG, "deadlock risk: raising lock level"
						 " from %s to %s on object %u/%u/%u",
699 700
						 lockMethodTable->lockModeNames[i],
						 lockMethodTable->lockModeNames[lockmode],
701 702 703 704
						 lock->tag.locktag_field1, lock->tag.locktag_field2,
						 lock->tag.locktag_field3);
					break;
				}
705 706
			}
		}
707
#endif   /* CHECK_DEADLOCK_RISK */
708
	}
709

B
Bruce Momjian 已提交
710
	/*
711
	 * lock->nRequested and lock->requested[] count the total number of
B
Bruce Momjian 已提交
712 713
	 * requests, whether granted or waiting, so increment those immediately.
	 * The other counts don't increment till we get the lock.
714
	 */
715 716 717
	lock->nRequested++;
	lock->requested[lockmode]++;
	Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
718

B
Bruce Momjian 已提交
719
	/*
B
Bruce Momjian 已提交
720 721
	 * We shouldn't already hold the desired lock; else locallock table is
	 * broken.
722
	 */
723 724
	if (proclock->holdMask & LOCKBIT_ON(lockmode))
		elog(ERROR, "lock %s on object %u/%u/%u is already held",
725
			 lockMethodTable->lockModeNames[lockmode],
726 727
			 lock->tag.locktag_field1, lock->tag.locktag_field2,
			 lock->tag.locktag_field3);
728

B
Bruce Momjian 已提交
729
	/*
B
Bruce Momjian 已提交
730 731 732
	 * If lock requested conflicts with locks requested by waiters, must join
	 * wait queue.	Otherwise, check for conflict with already-held locks.
	 * (That's last because most complex check.)
V
Vadim B. Mikheev 已提交
733
	 */
B
Bruce Momjian 已提交
734
	if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
735
		status = STATUS_FOUND;
V
Vadim B. Mikheev 已提交
736
	else
737
		status = LockCheckConflicts(lockMethodTable, lockmode,
738
									lock, proclock, MyProc);
V
Vadim B. Mikheev 已提交
739

740
	if (status == STATUS_OK)
741 742
	{
		/* No conflict with held or previously requested locks */
743
		GrantLock(lock, proclock, lockmode);
744
		GrantLockLocal(locallock, owner);
745 746
	}
	else
747
	{
748
		Assert(status == STATUS_FOUND);
749

750
		/*
751
		 * We can't acquire the lock immediately.  If caller specified no
B
Bruce Momjian 已提交
752 753
		 * blocking, remove useless table entries and return NOT_AVAIL without
		 * waiting.
754
		 */
755
		if (dontWait)
756
		{
757
			if (proclock->holdMask == 0)
758
			{
759 760
				SHMQueueDelete(&proclock->lockLink);
				SHMQueueDelete(&proclock->procLink);
761 762 763 764 765
				if (!hash_search_with_hash_value(LockMethodProcLockHash,
												 (void *) &(proclock->tag),
												 proclock_hashcode,
												 HASH_REMOVE,
												 NULL))
766
					elog(PANIC, "proclock table corrupted");
767
			}
768
			else
769
				PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
770 771
			lock->nRequested--;
			lock->requested[lockmode]--;
772
			LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
773 774
			Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
			Assert(lock->nGranted <= lock->nRequested);
775
			LWLockRelease(partitionLock);
776 777
			if (locallock->nLocks == 0)
				RemoveLocalLock(locallock);
778
			return LOCKACQUIRE_NOT_AVAIL;
779
		}
B
Bruce Momjian 已提交
780

V
Vadim B. Mikheev 已提交
781
		/*
782
		 * Set bitmask of locks this process already holds on this object.
V
Vadim B. Mikheev 已提交
783
		 */
784
		MyProc->heldLocks = proclock->holdMask;
V
Vadim B. Mikheev 已提交
785

786 787 788
		/*
		 * Sleep till someone wakes me up.
		 */
789

790
		TRACE_POSTGRESQL_LOCK_STARTWAIT(locktag->locktag_field2, lockmode);
791

792
		WaitOnLock(locallock, owner);
793

794
		TRACE_POSTGRESQL_LOCK_ENDWAIT(locktag->locktag_field2, lockmode);
795

796 797
		/*
		 * NOTE: do not do any material change of state between here and
B
Bruce Momjian 已提交
798
		 * return.	All required changes in locktable state must have been
B
Bruce Momjian 已提交
799
		 * done when the lock was granted to us --- see notes in WaitOnLock.
800 801
		 */

M
 
Marc G. Fournier 已提交
802
		/*
803
		 * Check the proclock entry status, in case something in the ipc
804
		 * communication doesn't work correctly.
M
 
Marc G. Fournier 已提交
805
		 */
806
		if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
807
		{
808
			PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
809
			LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
M
 
Marc G. Fournier 已提交
810
			/* Should we retry ? */
811
			LWLockRelease(partitionLock);
812
			elog(ERROR, "LockAcquire failed");
M
 
Marc G. Fournier 已提交
813
		}
814
		PROCLOCK_PRINT("LockAcquire: granted", proclock);
M
 
Marc G. Fournier 已提交
815
		LOCK_PRINT("LockAcquire: granted", lock, lockmode);
816
	}
817

818
	LWLockRelease(partitionLock);
819

820
	return LOCKACQUIRE_OK;
821 822
}

823 824 825 826 827 828 829 830
/*
 * Subroutine to free a locallock entry
 */
static void
RemoveLocalLock(LOCALLOCK *locallock)
{
	pfree(locallock->lockOwners);
	locallock->lockOwners = NULL;
831
	if (!hash_search(LockMethodLocalHash,
832 833
					 (void *) &(locallock->tag),
					 HASH_REMOVE, NULL))
834 835 836
		elog(WARNING, "locallock table corrupted");
}

B
Bruce Momjian 已提交
837
/*
838 839 840 841
 * LockCheckConflicts -- test whether requested lock conflicts
 *		with those already granted
 *
 * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
842 843
 *
 * NOTES:
844
 *		Here's what makes this complicated: one process's locks don't
845 846
 * conflict with one another, no matter what purpose they are held for
 * (eg, session and transaction locks do not conflict).
847 848
 * So, we must subtract off our own locks when determining whether the
 * requested new lock conflicts with those already held.
849 850
 */
int
851
LockCheckConflicts(LockMethod lockMethodTable,
852 853
				   LOCKMODE lockmode,
				   LOCK *lock,
854
				   PROCLOCK *proclock,
855
				   PGPROC *proc)
856
{
B
Bruce Momjian 已提交
857
	int			numLockModes = lockMethodTable->numLockModes;
858 859
	LOCKMASK	myLocks;
	LOCKMASK	otherLocks;
860
	int			i;
861

B
Bruce Momjian 已提交
862
	/*
B
Bruce Momjian 已提交
863 864
	 * first check for global conflicts: If no locks conflict with my request,
	 * then I get the lock.
865
	 *
866 867 868 869
	 * Checking for conflict: lock->grantMask represents the types of
	 * currently held locks.  conflictTable[lockmode] has a bit set for each
	 * type of lock that conflicts with request.   Bitwise compare tells if
	 * there is a conflict.
870
	 */
B
Bruce Momjian 已提交
871
	if (!(lockMethodTable->conflictTab[lockmode] & lock->grantMask))
872
	{
873
		PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock);
874
		return STATUS_OK;
875
	}
876

B
Bruce Momjian 已提交
877
	/*
B
Bruce Momjian 已提交
878 879 880
	 * Rats.  Something conflicts.	But it could still be my own lock. We have
	 * to construct a conflict mask that does not reflect our own locks, but
	 * only lock types held by other processes.
881
	 */
882 883
	myLocks = proclock->holdMask;
	otherLocks = 0;
884
	for (i = 1; i <= numLockModes; i++)
885
	{
B
Bruce Momjian 已提交
886
		int			myHolding = (myLocks & LOCKBIT_ON(i)) ? 1 : 0;
887 888 889

		if (lock->granted[i] > myHolding)
			otherLocks |= LOCKBIT_ON(i);
890
	}
891

B
Bruce Momjian 已提交
892
	/*
893
	 * now check again for conflicts.  'otherLocks' describes the types of
B
Bruce Momjian 已提交
894 895
	 * locks held by other processes.  If one of these conflicts with the kind
	 * of lock that I want, there is a conflict and I have to sleep.
896
	 */
897
	if (!(lockMethodTable->conflictTab[lockmode] & otherLocks))
898
	{
899
		/* no conflict. OK to get the lock */
900
		PROCLOCK_PRINT("LockCheckConflicts: resolved", proclock);
901
		return STATUS_OK;
902
	}
903

904
	PROCLOCK_PRINT("LockCheckConflicts: conflicting", proclock);
905
	return STATUS_FOUND;
906 907
}

908
/*
909
 * GrantLock -- update the lock and proclock data structures to show
910
 *		the lock request has been granted.
911 912
 *
 * NOTE: if proc was blocked, it also needs to be removed from the wait list
913
 * and have its waitLock/waitProcLock fields cleared.  That's not done here.
914
 *
915 916 917
 * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK
 * table entry; but since we may be awaking some other process, we can't do
 * that here; it's done by GrantLockLocal, instead.
M
 
Marc G. Fournier 已提交
918 919
 */
void
920
GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
M
 
Marc G. Fournier 已提交
921
{
922 923
	lock->nGranted++;
	lock->granted[lockmode]++;
924
	lock->grantMask |= LOCKBIT_ON(lockmode);
925
	if (lock->granted[lockmode] == lock->requested[lockmode])
926
		lock->waitMask &= LOCKBIT_OFF(lockmode);
927
	proclock->holdMask |= LOCKBIT_ON(lockmode);
M
 
Marc G. Fournier 已提交
928
	LOCK_PRINT("GrantLock", lock, lockmode);
929 930
	Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
	Assert(lock->nGranted <= lock->nRequested);
931 932
}

933
/*
B
Bruce Momjian 已提交
934
 * UnGrantLock -- opposite of GrantLock.
935 936 937 938 939 940 941 942 943 944 945
 *
 * Updates the lock and proclock data structures to show that the lock
 * is no longer held nor requested by the current holder.
 *
 * Returns true if there were any waiters waiting on the lock that
 * should now be woken up with ProcLockWakeup.
 */
static bool
UnGrantLock(LOCK *lock, LOCKMODE lockmode,
			PROCLOCK *proclock, LockMethod lockMethodTable)
{
B
Bruce Momjian 已提交
946
	bool		wakeupNeeded = false;
947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968

	Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
	Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
	Assert(lock->nGranted <= lock->nRequested);

	/*
	 * fix the general lock stats
	 */
	lock->nRequested--;
	lock->requested[lockmode]--;
	lock->nGranted--;
	lock->granted[lockmode]--;

	if (lock->granted[lockmode] == 0)
	{
		/* change the conflict mask.  No more of this lock type. */
		lock->grantMask &= LOCKBIT_OFF(lockmode);
	}

	LOCK_PRINT("UnGrantLock: updated", lock, lockmode);

	/*
B
Bruce Momjian 已提交
969 970 971 972 973 974 975
	 * We need only run ProcLockWakeup if the released lock conflicts with at
	 * least one of the lock types requested by waiter(s).	Otherwise whatever
	 * conflict made them wait must still exist.  NOTE: before MVCC, we could
	 * skip wakeup if lock->granted[lockmode] was still positive. But that's
	 * not true anymore, because the remaining granted locks might belong to
	 * some waiter, who could now be awakened because he doesn't conflict with
	 * his own locks.
976 977 978 979 980 981 982 983 984 985 986 987 988
	 */
	if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
		wakeupNeeded = true;

	/*
	 * Now fix the per-proclock state.
	 */
	proclock->holdMask &= LOCKBIT_OFF(lockmode);
	PROCLOCK_PRINT("UnGrantLock: updated", proclock);

	return wakeupNeeded;
}

989
/*
B
Bruce Momjian 已提交
990
 * CleanUpLock -- clean up after releasing a lock.	We garbage-collect the
991 992 993 994 995
 * proclock and lock objects if possible, and call ProcLockWakeup if there
 * are remaining requests and the caller says it's OK.  (Normally, this
 * should be called after UnGrantLock, and wakeupNeeded is the result from
 * UnGrantLock.)
 *
996
 * The appropriate partition lock must be held at entry, and will be
997 998 999
 * held at exit.
 */
static void
1000
CleanUpLock(LOCK *lock, PROCLOCK *proclock,
1001
			LockMethod lockMethodTable, uint32 hashcode,
1002 1003 1004
			bool wakeupNeeded)
{
	/*
B
Bruce Momjian 已提交
1005 1006
	 * If this was my last hold on this lock, delete my entry in the proclock
	 * table.
1007 1008 1009
	 */
	if (proclock->holdMask == 0)
	{
1010 1011
		uint32		proclock_hashcode;

1012 1013 1014
		PROCLOCK_PRINT("CleanUpLock: deleting", proclock);
		SHMQueueDelete(&proclock->lockLink);
		SHMQueueDelete(&proclock->procLink);
1015 1016 1017 1018 1019 1020
		proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
		if (!hash_search_with_hash_value(LockMethodProcLockHash,
										 (void *) &(proclock->tag),
										 proclock_hashcode,
										 HASH_REMOVE,
										 NULL))
1021 1022 1023 1024 1025 1026
			elog(PANIC, "proclock table corrupted");
	}

	if (lock->nRequested == 0)
	{
		/*
B
Bruce Momjian 已提交
1027 1028
		 * The caller just released the last lock, so garbage-collect the lock
		 * object.
1029 1030 1031
		 */
		LOCK_PRINT("CleanUpLock: deleting", lock, 0);
		Assert(SHMQueueEmpty(&(lock->procLocks)));
1032 1033 1034 1035 1036
		if (!hash_search_with_hash_value(LockMethodLockHash,
										 (void *) &(lock->tag),
										 hashcode,
										 HASH_REMOVE,
										 NULL))
1037 1038 1039 1040 1041
			elog(PANIC, "lock table corrupted");
	}
	else if (wakeupNeeded)
	{
		/* There are waiters on this lock, so wake them up. */
1042
		ProcLockWakeup(lockMethodTable, lock);
1043 1044 1045
	}
}

1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056
/*
 * GrantLockLocal -- update the locallock data structures to show
 *		the lock request has been granted.
 *
 * We expect that LockAcquire made sure there is room to add a new
 * ResourceOwner entry.
 */
static void
GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
{
	LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
B
Bruce Momjian 已提交
1057
	int			i;
1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089

	Assert(locallock->numLockOwners < locallock->maxLockOwners);
	/* Count the total */
	locallock->nLocks++;
	/* Count the per-owner lock */
	for (i = 0; i < locallock->numLockOwners; i++)
	{
		if (lockOwners[i].owner == owner)
		{
			lockOwners[i].nLocks++;
			return;
		}
	}
	lockOwners[i].owner = owner;
	lockOwners[i].nLocks = 1;
	locallock->numLockOwners++;
}

/*
 * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing
 *		WaitOnLock on.
 *
 * proc.c needs this for the case where we are booted off the lock by
 * timeout, but discover that someone granted us the lock anyway.
 *
 * We could just export GrantLockLocal, but that would require including
 * resowner.h in lock.h, which creates circularity.
 */
void
GrantAwaitedLock(void)
{
	GrantLockLocal(awaitedLock, awaitedOwner);
M
 
Marc G. Fournier 已提交
1090 1091
}

1092 1093 1094
/*
 * WaitOnLock -- wait to acquire a lock
 *
1095
 * Caller must have set MyProc->heldLocks to reflect locks already held
1096
 * on the lockable object by this process.
1097
 *
1098
 * The appropriate partition lock must be held at entry.
1099
 */
1100
static void
1101
WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
1102
{
1103
	LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock);
1104
	LockMethod	lockMethodTable = LockMethods[lockmethodid];
1105
	char	   * volatile new_status = NULL;
1106

1107 1108
	LOCK_PRINT("WaitOnLock: sleeping on lock",
			   locallock->lock, locallock->tag.mode);
1109

1110
	/* Report change to waiting status */
1111 1112
	if (update_process_title)
	{
1113 1114 1115
		const char *old_status;
		int			len;

1116 1117 1118 1119 1120
		old_status = get_ps_display(&len);
		new_status = (char *) palloc(len + 8 + 1);
		memcpy(new_status, old_status, len);
		strcpy(new_status + len, " waiting");
		set_ps_display(new_status, false);
B
Bruce Momjian 已提交
1121
		new_status[len] = '\0'; /* truncate off " waiting" */
1122
	}
1123 1124
	pgstat_report_waiting(true);

1125 1126 1127
	awaitedLock = locallock;
	awaitedOwner = owner;

B
Bruce Momjian 已提交
1128
	/*
1129
	 * NOTE: Think not to put any shared-state cleanup after the call to
B
Bruce Momjian 已提交
1130 1131 1132 1133 1134 1135
	 * ProcSleep, in either the normal or failure path.  The lock state must
	 * be fully set by the lock grantor, or by CheckDeadLock if we give up
	 * waiting for the lock.  This is necessary because of the possibility
	 * that a cancel/die interrupt will interrupt ProcSleep after someone else
	 * grants us the lock, but before we've noticed it. Hence, after granting,
	 * the locktable state must fully reflect the fact that we own the lock;
1136 1137 1138 1139 1140 1141 1142 1143
	 * we can't do additional work on return.
	 *
	 * We can and do use a PG_TRY block to try to clean up after failure,
	 * but this still has a major limitation: elog(FATAL) can occur while
	 * waiting (eg, a "die" interrupt), and then control won't come back here.
	 * So all cleanup of essential state should happen in LockWaitCancel,
	 * not here.  We can use PG_TRY to clear the "waiting" status flags,
	 * since doing that is unimportant if the process exits.
1144
	 */
1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156
	PG_TRY();
	{
		if (ProcSleep(locallock, lockMethodTable) != STATUS_OK)
		{
			/*
			 * We failed as a result of a deadlock, see CheckDeadLock().
			 * Quit now.
			 */
			awaitedLock = NULL;
			LOCK_PRINT("WaitOnLock: aborting on lock",
					   locallock->lock, locallock->tag.mode);
			LWLockRelease(LockHashPartitionLock(locallock->hashcode));
1157

1158 1159 1160 1161 1162 1163 1164 1165 1166
			/*
			 * Now that we aren't holding the partition lock, we can give an
			 * error report including details about the detected deadlock.
			 */
			DeadLockReport();
			/* not reached */
		}
	}
	PG_CATCH();
1167
	{
1168
		/* In this path, awaitedLock remains set until LockWaitCancel */
B
Bruce Momjian 已提交
1169

1170 1171 1172 1173 1174 1175 1176 1177 1178 1179
		/* Report change to non-waiting status */
		pgstat_report_waiting(false);
		if (update_process_title)
		{
			set_ps_display(new_status, false);
			pfree(new_status);
		}

		/* and propagate the error */
		PG_RE_THROW();
1180
	}
1181
	PG_END_TRY();
1182

1183 1184
	awaitedLock = NULL;

1185
	/* Report change to non-waiting status */
1186
	pgstat_report_waiting(false);
1187 1188 1189 1190 1191
	if (update_process_title)
	{
		set_ps_display(new_status, false);
		pfree(new_status);
	}
1192

1193 1194
	LOCK_PRINT("WaitOnLock: wakeup on lock",
			   locallock->lock, locallock->tag.mode);
1195 1196
}

1197
/*
1198 1199 1200
 * Remove a proc from the wait-queue it is on (caller must know it is on one).
 * This is only used when the proc has failed to get the lock, so we set its
 * waitStatus to STATUS_ERROR.
1201
 *
1202 1203
 * Appropriate partition lock must be held by caller.  Also, caller is
 * responsible for signaling the proc if needed.
1204
 *
1205
 * NB: this does not clean up any locallock object that may exist for the lock.
1206 1207
 */
void
1208
RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode)
1209
{
B
Bruce Momjian 已提交
1210
	LOCK	   *waitLock = proc->waitLock;
1211
	PROCLOCK   *proclock = proc->waitProcLock;
B
Bruce Momjian 已提交
1212
	LOCKMODE	lockmode = proc->waitLockMode;
1213
	LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*waitLock);
1214 1215

	/* Make sure proc is waiting */
1216
	Assert(proc->waitStatus == STATUS_WAITING);
1217 1218 1219
	Assert(proc->links.next != INVALID_OFFSET);
	Assert(waitLock);
	Assert(waitLock->waitProcs.size > 0);
1220
	Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233

	/* Remove proc from lock's wait queue */
	SHMQueueDelete(&(proc->links));
	waitLock->waitProcs.size--;

	/* Undo increments of request counts by waiting process */
	Assert(waitLock->nRequested > 0);
	Assert(waitLock->nRequested > proc->waitLock->nGranted);
	waitLock->nRequested--;
	Assert(waitLock->requested[lockmode] > 0);
	waitLock->requested[lockmode]--;
	/* don't forget to clear waitMask bit if appropriate */
	if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
1234
		waitLock->waitMask &= LOCKBIT_OFF(lockmode);
1235

1236
	/* Clean up the proc's own state, and pass it the ok/fail signal */
1237
	proc->waitLock = NULL;
1238
	proc->waitProcLock = NULL;
1239
	proc->waitStatus = STATUS_ERROR;
1240

1241 1242
	/*
	 * Delete the proclock immediately if it represents no already-held locks.
1243 1244
	 * (This must happen now because if the owner of the lock decides to
	 * release it, and the requested/granted counts then go to zero,
B
Bruce Momjian 已提交
1245 1246
	 * LockRelease expects there to be no remaining proclocks.) Then see if
	 * any other waiters for the lock can be woken up now.
1247
	 */
1248
	CleanUpLock(waitLock, proclock,
1249
				LockMethods[lockmethodid], hashcode,
1250
				true);
1251 1252
}

1253
/*
1254 1255 1256
 * LockRelease -- look up 'locktag' and release one 'lockmode' lock on it.
 *		Release a session lock if 'sessionLock' is true, else release a
 *		regular transaction lock.
1257
 *
1258 1259 1260 1261
 * Side Effects: find any waiting processes that are now wakable,
 *		grant them their requested locks and awaken them.
 *		(We have to grant the lock here to avoid a race between
 *		the waking process and any new process to
1262
 *		come along and request the lock.)
1263 1264
 */
bool
1265
LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
1266
{
1267 1268
	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
	LockMethod	lockMethodTable;
1269 1270
	LOCALLOCKTAG localtag;
	LOCALLOCK  *locallock;
1271
	LOCK	   *lock;
1272
	PROCLOCK   *proclock;
1273
	LWLockId	partitionLock;
1274
	bool		wakeupNeeded;
1275

1276 1277 1278 1279 1280 1281
	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
	lockMethodTable = LockMethods[lockmethodid];
	if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
		elog(ERROR, "unrecognized lock mode: %d", lockmode);

1282
#ifdef LOCK_DEBUG
1283 1284
	if (LOCK_DEBUG_ENABLED(locktag))
		elog(LOG, "LockRelease: lock [%u,%u] %s",
1285
			 locktag->locktag_field1, locktag->locktag_field2,
1286
			 lockMethodTable->lockModeNames[lockmode]);
1287 1288
#endif

1289
	/*
1290
	 * Find the LOCALLOCK entry for this lock and lockmode
1291
	 */
B
Bruce Momjian 已提交
1292
	MemSet(&localtag, 0, sizeof(localtag));		/* must clear padding */
1293 1294 1295
	localtag.lock = *locktag;
	localtag.mode = lockmode;

1296
	locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
1297 1298
										  (void *) &localtag,
										  HASH_FIND, NULL);
1299

1300
	/*
B
Bruce Momjian 已提交
1301
	 * let the caller print its own error message, too. Do not ereport(ERROR).
1302
	 */
1303
	if (!locallock || locallock->nLocks <= 0)
1304
	{
1305
		elog(WARNING, "you don't own a lock of type %s",
1306
			 lockMethodTable->lockModeNames[lockmode]);
1307
		return FALSE;
1308
	}
1309

M
 
Marc G. Fournier 已提交
1310
	/*
1311
	 * Decrease the count for the resource owner.
M
 
Marc G. Fournier 已提交
1312
	 */
1313
	{
1314 1315
		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
		ResourceOwner owner;
B
Bruce Momjian 已提交
1316
		int			i;
1317

1318 1319
		/* Session locks are never transactional, else check table */
		if (!sessionLock && lockMethodTable->transactional)
1320
			owner = CurrentResourceOwner;
1321
		else
1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342
			owner = NULL;

		for (i = locallock->numLockOwners - 1; i >= 0; i--)
		{
			if (lockOwners[i].owner == owner)
			{
				Assert(lockOwners[i].nLocks > 0);
				if (--lockOwners[i].nLocks == 0)
				{
					/* compact out unused slot */
					locallock->numLockOwners--;
					if (i < locallock->numLockOwners)
						lockOwners[i] = lockOwners[locallock->numLockOwners];
				}
				break;
			}
		}
		if (i < 0)
		{
			/* don't release a lock belonging to another owner */
			elog(WARNING, "you don't own a lock of type %s",
1343
				 lockMethodTable->lockModeNames[lockmode]);
1344 1345
			return FALSE;
		}
1346
	}
1347 1348

	/*
B
Bruce Momjian 已提交
1349 1350
	 * Decrease the total local count.	If we're still holding the lock, we're
	 * done.
1351 1352 1353 1354 1355 1356 1357 1358 1359
	 */
	locallock->nLocks--;

	if (locallock->nLocks > 0)
		return TRUE;

	/*
	 * Otherwise we've got to mess with the shared lock table.
	 */
1360
	partitionLock = LockHashPartitionLock(locallock->hashcode);
1361

1362
	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
1363 1364 1365

	/*
	 * We don't need to re-find the lock or proclock, since we kept their
B
Bruce Momjian 已提交
1366 1367
	 * addresses in the locallock table, and they couldn't have been removed
	 * while we were holding a lock on them.
1368 1369 1370 1371
	 */
	lock = locallock->lock;
	LOCK_PRINT("LockRelease: found", lock, lockmode);
	proclock = locallock->proclock;
1372
	PROCLOCK_PRINT("LockRelease: found", proclock);
M
 
Marc G. Fournier 已提交
1373 1374

	/*
B
Bruce Momjian 已提交
1375 1376
	 * Double-check that we are actually holding a lock of the type we want to
	 * release.
M
 
Marc G. Fournier 已提交
1377
	 */
1378
	if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
1379
	{
1380
		PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
1381
		LWLockRelease(partitionLock);
1382
		elog(WARNING, "you don't own a lock of type %s",
1383
			 lockMethodTable->lockModeNames[lockmode]);
1384
		RemoveLocalLock(locallock);
1385
		return FALSE;
M
 
Marc G. Fournier 已提交
1386 1387
	}

1388
	/*
1389
	 * Do the releasing.  CleanUpLock will waken any now-wakable waiters.
1390
	 */
1391
	wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
1392

1393
	CleanUpLock(lock, proclock,
1394
				lockMethodTable, locallock->hashcode,
1395
				wakeupNeeded);
1396

1397
	LWLockRelease(partitionLock);
1398 1399

	RemoveLocalLock(locallock);
1400
	return TRUE;
1401 1402
}

1403
/*
1404
 * LockReleaseAll -- Release all locks of the specified lock method that
1405
 *		are held by the current process.
1406
 *
1407
 * Well, not necessarily *all* locks.  The available behaviors are:
1408 1409
 *		allLocks == true: release all locks including session locks.
 *		allLocks == false: release all non-session locks.
1410
 */
1411
void
1412
LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
1413
{
1414
	HASH_SEQ_STATUS status;
1415
	LockMethod	lockMethodTable;
1416
	int			i,
1417
				numLockModes;
B
Bruce Momjian 已提交
1418
	LOCALLOCK  *locallock;
1419
	LOCK	   *lock;
1420 1421
	PROCLOCK   *proclock;
	int			partition;
1422

1423 1424 1425 1426
	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
	lockMethodTable = LockMethods[lockmethodid];

1427
#ifdef LOCK_DEBUG
1428
	if (*(lockMethodTable->trace_flag))
1429
		elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid);
1430 1431
#endif

B
Bruce Momjian 已提交
1432
	numLockModes = lockMethodTable->numLockModes;
M
 
Marc G. Fournier 已提交
1433

1434 1435
	/*
	 * First we run through the locallock table and get rid of unwanted
B
Bruce Momjian 已提交
1436 1437 1438 1439
	 * entries, then we scan the process's proclocks and get rid of those. We
	 * do this separately because we may have multiple locallock entries
	 * pointing to the same proclock, and we daren't end up with any dangling
	 * pointers.
1440
	 */
1441
	hash_seq_init(&status, LockMethodLocalHash);
1442 1443 1444 1445 1446 1447

	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
	{
		if (locallock->proclock == NULL || locallock->lock == NULL)
		{
			/*
B
Bruce Momjian 已提交
1448 1449
			 * We must've run out of shared memory while trying to set up this
			 * lock.  Just forget the local entry.
1450 1451 1452 1453 1454 1455 1456 1457 1458 1459
			 */
			Assert(locallock->nLocks == 0);
			RemoveLocalLock(locallock);
			continue;
		}

		/* Ignore items that are not of the lockmethod to be removed */
		if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
			continue;

B
Bruce Momjian 已提交
1460
		/*
B
Bruce Momjian 已提交
1461 1462 1463
		 * If we are asked to release all locks, we can just zap the entry.
		 * Otherwise, must scan to see if there are session locks. We assume
		 * there is at most one lockOwners entry for session locks.
B
Bruce Momjian 已提交
1464
		 */
1465 1466 1467
		if (!allLocks)
		{
			LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
1468

1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495
			/* If it's above array position 0, move it down to 0 */
			for (i = locallock->numLockOwners - 1; i > 0; i--)
			{
				if (lockOwners[i].owner == NULL)
				{
					lockOwners[0] = lockOwners[i];
					break;
				}
			}

			if (locallock->numLockOwners > 0 &&
				lockOwners[0].owner == NULL &&
				lockOwners[0].nLocks > 0)
			{
				/* Fix the locallock to show just the session locks */
				locallock->nLocks = lockOwners[0].nLocks;
				locallock->numLockOwners = 1;
				/* We aren't deleting this locallock, so done */
				continue;
			}
		}

		/* Mark the proclock to show we need to release this lockmode */
		if (locallock->nLocks > 0)
			locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);

		/* And remove the locallock hashtable entry */
1496 1497 1498
		RemoveLocalLock(locallock);
	}

1499 1500 1501 1502 1503 1504 1505
	/*
	 * Now, scan each lock partition separately.
	 */
	for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
	{
		LWLockId	partitionLock = FirstLockMgrLock + partition;
		SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
1506

1507 1508
		proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
											 offsetof(PROCLOCK, procLink));
1509

1510 1511
		if (!proclock)
			continue;			/* needn't examine this partition */
1512

1513
		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
1514

1515 1516 1517 1518
		while (proclock)
		{
			bool		wakeupNeeded = false;
			PROCLOCK   *nextplock;
1519

1520 1521 1522 1523
			/* Get link first, since we may unlink/delete this proclock */
			nextplock = (PROCLOCK *)
				SHMQueueNext(procLocks, &proclock->procLink,
							 offsetof(PROCLOCK, procLink));
1524

1525
			Assert(proclock->tag.myProc == MyProc);
1526

1527
			lock = proclock->tag.myLock;
1528

1529 1530 1531
			/* Ignore items that are not of the lockmethod to be removed */
			if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
				goto next_item;
M
 
Marc G. Fournier 已提交
1532

1533 1534 1535 1536 1537 1538 1539 1540
			/*
			 * In allLocks mode, force release of all locks even if locallock
			 * table had problems
			 */
			if (allLocks)
				proclock->releaseMask = proclock->holdMask;
			else
				Assert((proclock->releaseMask & ~proclock->holdMask) == 0);
1541

1542 1543 1544 1545 1546 1547
			/*
			 * Ignore items that have nothing to be released, unless they have
			 * holdMask == 0 and are therefore recyclable
			 */
			if (proclock->releaseMask == 0 && proclock->holdMask != 0)
				goto next_item;
M
 
Marc G. Fournier 已提交
1548

1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567
			PROCLOCK_PRINT("LockReleaseAll", proclock);
			LOCK_PRINT("LockReleaseAll", lock, 0);
			Assert(lock->nRequested >= 0);
			Assert(lock->nGranted >= 0);
			Assert(lock->nGranted <= lock->nRequested);
			Assert((proclock->holdMask & ~lock->grantMask) == 0);

			/*
			 * Release the previously-marked lock modes
			 */
			for (i = 1; i <= numLockModes; i++)
			{
				if (proclock->releaseMask & LOCKBIT_ON(i))
					wakeupNeeded |= UnGrantLock(lock, i, proclock,
												lockMethodTable);
			}
			Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
			Assert(lock->nGranted <= lock->nRequested);
			LOCK_PRINT("LockReleaseAll: updated", lock, 0);
1568

1569
			proclock->releaseMask = 0;
1570

1571 1572
			/* CleanUpLock will wake up waiters if needed. */
			CleanUpLock(lock, proclock,
1573 1574
						lockMethodTable,
						LockTagHashCode(&lock->tag),
1575
						wakeupNeeded);
M
 
Marc G. Fournier 已提交
1576

B
Bruce Momjian 已提交
1577
	next_item:
1578
			proclock = nextplock;
B
Bruce Momjian 已提交
1579
		}						/* loop over PROCLOCKs within this partition */
1580 1581

		LWLockRelease(partitionLock);
B
Bruce Momjian 已提交
1582
	}							/* loop over partitions */
1583

1584
#ifdef LOCK_DEBUG
1585
	if (*(lockMethodTable->trace_flag))
1586
		elog(LOG, "LockReleaseAll done");
1587
#endif
1588 1589
}

1590 1591 1592 1593 1594 1595 1596 1597
/*
 * LockReleaseCurrentOwner
 *		Release all locks belonging to CurrentResourceOwner
 */
void
LockReleaseCurrentOwner(void)
{
	HASH_SEQ_STATUS status;
B
Bruce Momjian 已提交
1598
	LOCALLOCK  *locallock;
1599 1600 1601
	LOCALLOCKOWNER *lockOwners;
	int			i;

1602
	hash_seq_init(&status, LockMethodLocalHash);
1603 1604 1605 1606

	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
	{
		/* Ignore items that must be nontransactional */
1607
		if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional)
1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634
			continue;

		/* Scan to see if there are any locks belonging to current owner */
		lockOwners = locallock->lockOwners;
		for (i = locallock->numLockOwners - 1; i >= 0; i--)
		{
			if (lockOwners[i].owner == CurrentResourceOwner)
			{
				Assert(lockOwners[i].nLocks > 0);
				if (lockOwners[i].nLocks < locallock->nLocks)
				{
					/*
					 * We will still hold this lock after forgetting this
					 * ResourceOwner.
					 */
					locallock->nLocks -= lockOwners[i].nLocks;
					/* compact out unused slot */
					locallock->numLockOwners--;
					if (i < locallock->numLockOwners)
						lockOwners[i] = lockOwners[locallock->numLockOwners];
				}
				else
				{
					Assert(lockOwners[i].nLocks == locallock->nLocks);
					/* We want to call LockRelease just once */
					lockOwners[i].nLocks = 1;
					locallock->nLocks = 1;
1635
					if (!LockRelease(&locallock->tag.lock,
1636 1637
									 locallock->tag.mode,
									 false))
1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655
						elog(WARNING, "LockReleaseCurrentOwner: failed??");
				}
				break;
			}
		}
	}
}

/*
 * LockReassignCurrentOwner
 *		Reassign all locks belonging to CurrentResourceOwner to belong
 *		to its parent resource owner
 */
void
LockReassignCurrentOwner(void)
{
	ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
	HASH_SEQ_STATUS status;
B
Bruce Momjian 已提交
1656
	LOCALLOCK  *locallock;
1657 1658 1659 1660
	LOCALLOCKOWNER *lockOwners;

	Assert(parent != NULL);

1661
	hash_seq_init(&status, LockMethodLocalHash);
1662 1663 1664 1665 1666 1667 1668 1669

	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
	{
		int			i;
		int			ic = -1;
		int			ip = -1;

		/* Ignore items that must be nontransactional */
1670
		if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional)
1671 1672 1673
			continue;

		/*
B
Bruce Momjian 已提交
1674 1675
		 * Scan to see if there are any locks belonging to current owner or
		 * its parent
1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706
		 */
		lockOwners = locallock->lockOwners;
		for (i = locallock->numLockOwners - 1; i >= 0; i--)
		{
			if (lockOwners[i].owner == CurrentResourceOwner)
				ic = i;
			else if (lockOwners[i].owner == parent)
				ip = i;
		}

		if (ic < 0)
			continue;			/* no current locks */

		if (ip < 0)
		{
			/* Parent has no slot, so just give it child's slot */
			lockOwners[ic].owner = parent;
		}
		else
		{
			/* Merge child's count with parent's */
			lockOwners[ip].nLocks += lockOwners[ic].nLocks;
			/* compact out unused slot */
			locallock->numLockOwners--;
			if (ic < locallock->numLockOwners)
				lockOwners[ic] = lockOwners[locallock->numLockOwners];
		}
	}
}


1707 1708
/*
 * GetLockConflicts
1709
 *		Get an array of VirtualTransactionIds of xacts currently holding locks
1710 1711 1712
 *		that would conflict with the specified lock/lockmode.
 *		xacts merely awaiting such a lock are NOT reported.
 *
1713 1714
 * The result array is palloc'd and is terminated with an invalid VXID.
 *
1715 1716 1717
 * Of course, the result could be out of date by the time it's returned,
 * so use of this function has to be thought about carefully.
 *
1718 1719 1720 1721
 * Note we never include the current xact's vxid in the result array,
 * since an xact never blocks itself.  Also, prepared transactions are
 * ignored, which is a bit more debatable but is appropriate for current
 * uses of the result.
1722
 */
1723
VirtualTransactionId *
1724 1725
GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
{
1726
	VirtualTransactionId *vxids;
1727 1728 1729 1730 1731 1732 1733 1734
	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
	LockMethod	lockMethodTable;
	LOCK	   *lock;
	LOCKMASK	conflictMask;
	SHM_QUEUE  *procLocks;
	PROCLOCK   *proclock;
	uint32		hashcode;
	LWLockId	partitionLock;
1735
	int			count = 0;
1736 1737 1738 1739 1740 1741 1742

	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
	lockMethodTable = LockMethods[lockmethodid];
	if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
		elog(ERROR, "unrecognized lock mode: %d", lockmode);

1743
	/*
B
Bruce Momjian 已提交
1744 1745 1746
	 * Allocate memory to store results, and fill with InvalidVXID.  We only
	 * need enough space for MaxBackends + a terminator, since prepared xacts
	 * don't count.
1747 1748 1749 1750
	 */
	vxids = (VirtualTransactionId *)
		palloc0(sizeof(VirtualTransactionId) * (MaxBackends + 1));

1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766
	/*
	 * Look up the lock object matching the tag.
	 */
	hashcode = LockTagHashCode(locktag);
	partitionLock = LockHashPartitionLock(hashcode);

	LWLockAcquire(partitionLock, LW_SHARED);

	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
												(void *) locktag,
												hashcode,
												HASH_FIND,
												NULL);
	if (!lock)
	{
		/*
B
Bruce Momjian 已提交
1767 1768
		 * If the lock object doesn't exist, there is nothing holding a lock
		 * on this lockable object.
1769 1770
		 */
		LWLockRelease(partitionLock);
1771
		return vxids;
1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787
	}

	/*
	 * Examine each existing holder (or awaiter) of the lock.
	 */
	conflictMask = lockMethodTable->conflictTab[lockmode];

	procLocks = &(lock->procLocks);

	proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
										 offsetof(PROCLOCK, lockLink));

	while (proclock)
	{
		if (conflictMask & proclock->holdMask)
		{
B
Bruce Momjian 已提交
1788
			PGPROC	   *proc = proclock->tag.myProc;
1789 1790 1791 1792

			/* A backend never blocks itself */
			if (proc != MyProc)
			{
1793 1794 1795
				VirtualTransactionId vxid;

				GET_VXID_FROM_PGPROC(vxid, *proc);
1796 1797

				/*
1798
				 * If we see an invalid VXID, then either the xact has already
B
Bruce Momjian 已提交
1799 1800
				 * committed (or aborted), or it's a prepared xact.  In either
				 * case we may ignore it.
1801
				 */
1802 1803
				if (VirtualTransactionIdIsValid(vxid))
					vxids[count++] = vxid;
1804 1805 1806 1807 1808 1809 1810 1811 1812
			}
		}

		proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
											 offsetof(PROCLOCK, lockLink));
	}

	LWLockRelease(partitionLock);

1813 1814 1815 1816
	if (count > MaxBackends)	/* should never happen */
		elog(PANIC, "too many conflicting locks found");

	return vxids;
1817 1818 1819
}


1820 1821 1822 1823 1824
/*
 * AtPrepare_Locks
 *		Do the preparatory work for a PREPARE: make 2PC state file records
 *		for all locks currently held.
 *
1825
 * Non-transactional locks are ignored, as are VXID locks.
1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842
 *
 * There are some special cases that we error out on: we can't be holding
 * any session locks (should be OK since only VACUUM uses those) and we
 * can't be holding any locks on temporary objects (since that would mess
 * up the current backend if it tries to exit before the prepared xact is
 * committed).
 */
void
AtPrepare_Locks(void)
{
	HASH_SEQ_STATUS status;
	LOCALLOCK  *locallock;

	/*
	 * We don't need to touch shared memory for this --- all the necessary
	 * state information is in the locallock table.
	 */
1843
	hash_seq_init(&status, LockMethodLocalHash);
1844 1845 1846 1847 1848

	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
	{
		TwoPhaseLockRecord record;
		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
B
Bruce Momjian 已提交
1849
		int			i;
1850

1851 1852
		/* Ignore nontransactional locks */
		if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional)
1853 1854
			continue;

1855 1856 1857 1858 1859 1860 1861
		/*
		 * Ignore VXID locks.  We don't want those to be held by prepared
		 * transactions, since they aren't meaningful after a restart.
		 */
		if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
			continue;

1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905
		/* Ignore it if we don't actually hold the lock */
		if (locallock->nLocks <= 0)
			continue;

		/* Scan to verify there are no session locks */
		for (i = locallock->numLockOwners - 1; i >= 0; i--)
		{
			/* elog not ereport since this should not happen */
			if (lockOwners[i].owner == NULL)
				elog(ERROR, "cannot PREPARE when session locks exist");
		}

		/*
		 * Create a 2PC record.
		 */
		memcpy(&(record.locktag), &(locallock->tag.lock), sizeof(LOCKTAG));
		record.lockmode = locallock->tag.mode;

		RegisterTwoPhaseRecord(TWOPHASE_RM_LOCK_ID, 0,
							   &record, sizeof(TwoPhaseLockRecord));
	}
}

/*
 * PostPrepare_Locks
 *		Clean up after successful PREPARE
 *
 * Here, we want to transfer ownership of our locks to a dummy PGPROC
 * that's now associated with the prepared transaction, and we want to
 * clean out the corresponding entries in the LOCALLOCK table.
 *
 * Note: by removing the LOCALLOCK entries, we are leaving dangling
 * pointers in the transaction's resource owner.  This is OK at the
 * moment since resowner.c doesn't try to free locks retail at a toplevel
 * transaction commit or abort.  We could alternatively zero out nLocks
 * and leave the LOCALLOCK entries to be garbage-collected by LockReleaseAll,
 * but that probably costs more cycles.
 */
void
PostPrepare_Locks(TransactionId xid)
{
	PGPROC	   *newproc = TwoPhaseGetDummyProc(xid);
	HASH_SEQ_STATUS status;
	LOCALLOCK  *locallock;
1906
	LOCK	   *lock;
1907 1908 1909
	PROCLOCK   *proclock;
	PROCLOCKTAG proclocktag;
	bool		found;
1910
	int			partition;
1911 1912 1913 1914 1915 1916

	/* This is a critical section: any error means big trouble */
	START_CRIT_SECTION();

	/*
	 * First we run through the locallock table and get rid of unwanted
B
Bruce Momjian 已提交
1917 1918
	 * entries, then we scan the process's proclocks and transfer them to the
	 * target proc.
1919
	 *
B
Bruce Momjian 已提交
1920 1921 1922
	 * We do this separately because we may have multiple locallock entries
	 * pointing to the same proclock, and we daren't end up with any dangling
	 * pointers.
1923
	 */
1924
	hash_seq_init(&status, LockMethodLocalHash);
1925 1926 1927 1928 1929 1930

	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
	{
		if (locallock->proclock == NULL || locallock->lock == NULL)
		{
			/*
B
Bruce Momjian 已提交
1931 1932
			 * We must've run out of shared memory while trying to set up this
			 * lock.  Just forget the local entry.
1933 1934 1935 1936 1937 1938
			 */
			Assert(locallock->nLocks == 0);
			RemoveLocalLock(locallock);
			continue;
		}

1939 1940
		/* Ignore nontransactional locks */
		if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional)
1941 1942
			continue;

1943 1944 1945 1946
		/* Ignore VXID locks */
		if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
			continue;

1947 1948 1949 1950 1951 1952 1953 1954 1955 1956
		/* We already checked there are no session locks */

		/* Mark the proclock to show we need to release this lockmode */
		if (locallock->nLocks > 0)
			locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);

		/* And remove the locallock hashtable entry */
		RemoveLocalLock(locallock);
	}

1957 1958 1959 1960 1961 1962 1963
	/*
	 * Now, scan each lock partition separately.
	 */
	for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
	{
		LWLockId	partitionLock = FirstLockMgrLock + partition;
		SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
1964

1965 1966
		proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
											 offsetof(PROCLOCK, procLink));
1967

1968 1969
		if (!proclock)
			continue;			/* needn't examine this partition */
1970

1971
		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
1972

1973 1974 1975 1976 1977
		while (proclock)
		{
			PROCLOCK   *nextplock;
			LOCKMASK	holdMask;
			PROCLOCK   *newproclock;
1978

1979 1980 1981 1982
			/* Get link first, since we may unlink/delete this proclock */
			nextplock = (PROCLOCK *)
				SHMQueueNext(procLocks, &proclock->procLink,
							 offsetof(PROCLOCK, procLink));
1983

1984
			Assert(proclock->tag.myProc == MyProc);
1985

1986
			lock = proclock->tag.myLock;
1987

1988 1989 1990
			/* Ignore nontransactional locks */
			if (!LockMethods[LOCK_LOCKMETHOD(*lock)]->transactional)
				goto next_item;
1991

1992 1993 1994 1995
			/* Ignore VXID locks */
			if (lock->tag.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
				goto next_item;

1996 1997 1998 1999 2000 2001
			PROCLOCK_PRINT("PostPrepare_Locks", proclock);
			LOCK_PRINT("PostPrepare_Locks", lock, 0);
			Assert(lock->nRequested >= 0);
			Assert(lock->nGranted >= 0);
			Assert(lock->nGranted <= lock->nRequested);
			Assert((proclock->holdMask & ~lock->grantMask) == 0);
2002

2003 2004 2005 2006 2007 2008
			/*
			 * Since there were no session locks, we should be releasing all
			 * locks
			 */
			if (proclock->releaseMask != proclock->holdMask)
				elog(PANIC, "we seem to have dropped a bit somewhere");
2009

2010
			holdMask = proclock->holdMask;
2011

2012
			/*
2013
			 * We cannot simply modify proclock->tag.myProc to reassign
2014
			 * ownership of the lock, because that's part of the hash key and
B
Bruce Momjian 已提交
2015
			 * the proclock would then be in the wrong hash chain.	So, unlink
2016 2017 2018 2019 2020 2021 2022 2023
			 * and delete the old proclock; create a new one with the right
			 * contents; and link it into place.  We do it in this order to be
			 * certain we won't run out of shared memory (the way dynahash.c
			 * works, the deleted object is certain to be available for
			 * reallocation).
			 */
			SHMQueueDelete(&proclock->lockLink);
			SHMQueueDelete(&proclock->procLink);
2024
			if (!hash_search(LockMethodProcLockHash,
2025 2026 2027
							 (void *) &(proclock->tag),
							 HASH_REMOVE, NULL))
				elog(PANIC, "proclock table corrupted");
2028

2029 2030 2031
			/*
			 * Create the hash key for the new proclock table.
			 */
2032 2033
			proclocktag.myLock = lock;
			proclocktag.myProc = newproc;
2034

2035
			newproclock = (PROCLOCK *) hash_search(LockMethodProcLockHash,
2036 2037 2038
												   (void *) &proclocktag,
												   HASH_ENTER_NULL, &found);
			if (!newproclock)
B
Bruce Momjian 已提交
2039
				ereport(PANIC,	/* should not happen */
2040 2041 2042
						(errcode(ERRCODE_OUT_OF_MEMORY),
						 errmsg("out of shared memory"),
						 errdetail("Not enough memory for reassigning the prepared transaction's locks.")));
2043

2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068
			/*
			 * If new, initialize the new entry
			 */
			if (!found)
			{
				newproclock->holdMask = 0;
				newproclock->releaseMask = 0;
				/* Add new proclock to appropriate lists */
				SHMQueueInsertBefore(&lock->procLocks, &newproclock->lockLink);
				SHMQueueInsertBefore(&(newproc->myProcLocks[partition]),
									 &newproclock->procLink);
				PROCLOCK_PRINT("PostPrepare_Locks: new", newproclock);
			}
			else
			{
				PROCLOCK_PRINT("PostPrepare_Locks: found", newproclock);
				Assert((newproclock->holdMask & ~lock->grantMask) == 0);
			}

			/*
			 * Pass over the identified lock ownership.
			 */
			Assert((newproclock->holdMask & holdMask) == 0);
			newproclock->holdMask |= holdMask;

B
Bruce Momjian 已提交
2069
	next_item:
2070
			proclock = nextplock;
B
Bruce Momjian 已提交
2071
		}						/* loop over PROCLOCKs within this partition */
2072

2073
		LWLockRelease(partitionLock);
B
Bruce Momjian 已提交
2074
	}							/* loop over partitions */
2075 2076 2077 2078 2079

	END_CRIT_SECTION();
}


2080 2081 2082
/*
 * Estimate shared-memory space used for lock tables
 */
2083
Size
2084
LockShmemSize(void)
2085
{
2086
	Size		size = 0;
2087
	long		max_table_size;
2088

2089
	/* lock hash table */
2090
	max_table_size = NLOCKENTS();
2091
	size = add_size(size, hash_estimate_size(max_table_size, sizeof(LOCK)));
2092

2093
	/* proclock hash table */
2094
	max_table_size *= 2;
2095
	size = add_size(size, hash_estimate_size(max_table_size, sizeof(PROCLOCK)));
2096

B
Bruce Momjian 已提交
2097
	/*
2098
	 * Since NLOCKENTS is only an estimate, add 10% safety margin.
2099
	 */
2100
	size = add_size(size, size / 10);
2101 2102

	return size;
2103 2104
}

2105 2106
/*
 * GetLockStatusData - Return a summary of the lock manager's internal
2107
 * status, for use in a user-level reporting function.
2108
 *
2109 2110 2111 2112 2113
 * The return data consists of an array of PROCLOCK objects, with the
 * associated PGPROC and LOCK objects for each.  Note that multiple
 * copies of the same PGPROC and/or LOCK objects are likely to appear.
 * It is the caller's responsibility to match up duplicates if wanted.
 *
2114
 * The design goal is to hold the LWLocks for as short a time as possible;
2115
 * thus, this function simply makes a copy of the necessary data and releases
2116
 * the locks, allowing the caller to contemplate and format the data for as
2117
 * long as it pleases.
2118
 */
2119 2120
LockData *
GetLockStatusData(void)
2121
{
B
Bruce Momjian 已提交
2122
	LockData   *data;
2123
	PROCLOCK   *proclock;
2124
	HASH_SEQ_STATUS seqstat;
2125 2126
	int			els;
	int			el;
B
Bruce Momjian 已提交
2127
	int			i;
2128

2129
	data = (LockData *) palloc(sizeof(LockData));
2130

2131
	/*
2132
	 * Acquire lock on the entire shared lock data structure.  We can't
2133 2134 2135
	 * operate one partition at a time if we want to deliver a self-consistent
	 * view of the state.
	 *
B
Bruce Momjian 已提交
2136 2137 2138 2139 2140
	 * Since this is a read-only operation, we take shared instead of
	 * exclusive lock.	There's not a whole lot of point to this, because all
	 * the normal operations require exclusive lock, but it doesn't hurt
	 * anything either. It will at least allow two backends to do
	 * GetLockStatusData in parallel.
2141 2142 2143 2144 2145
	 *
	 * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
	 */
	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
		LWLockAcquire(FirstLockMgrLock + i, LW_SHARED);
2146 2147 2148

	/* Now we can safely count the number of proclocks */
	els = hash_get_num_entries(LockMethodProcLockHash);
2149

2150 2151 2152 2153
	data->nelements = els;
	data->proclocks = (PROCLOCK *) palloc(sizeof(PROCLOCK) * els);
	data->procs = (PGPROC *) palloc(sizeof(PGPROC) * els);
	data->locks = (LOCK *) palloc(sizeof(LOCK) * els);
2154

2155
	/* Now scan the tables to copy the data */
2156
	hash_seq_init(&seqstat, LockMethodProcLockHash);
2157

2158 2159 2160 2161 2162
	el = 0;
	while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
	{
		PGPROC	   *proc = proclock->tag.myProc;
		LOCK	   *lock = proclock->tag.myLock;
2163

2164 2165 2166
		memcpy(&(data->proclocks[el]), proclock, sizeof(PROCLOCK));
		memcpy(&(data->procs[el]), proc, sizeof(PGPROC));
		memcpy(&(data->locks[el]), lock, sizeof(LOCK));
2167

2168
		el++;
2169 2170
	}

2171
	/*
B
Bruce Momjian 已提交
2172 2173 2174 2175 2176
	 * And release locks.  We do this in reverse order for two reasons: (1)
	 * Anyone else who needs more than one of the locks will be trying to lock
	 * them in increasing order; we don't want to release the other process
	 * until it can get all the locks it needs. (2) This avoids O(N^2)
	 * behavior inside LWLockRelease.
2177
	 */
B
Bruce Momjian 已提交
2178
	for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
2179
		LWLockRelease(FirstLockMgrLock + i);
2180

2181
	Assert(el == data->nelements);
2182

2183
	return data;
2184 2185
}

2186 2187
/* Provide the textual name of any lock mode */
const char *
2188
GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode)
2189
{
2190 2191 2192
	Assert(lockmethodid > 0 && lockmethodid < lengthof(LockMethods));
	Assert(mode > 0 && mode <= LockMethods[lockmethodid]->numLockModes);
	return LockMethods[lockmethodid]->lockModeNames[mode];
2193
}
B
Bruce Momjian 已提交
2194

2195
#ifdef LOCK_DEBUG
2196
/*
2197
 * Dump all locks in the given proc's myProcLocks lists.
2198
 *
2199
 * Caller is responsible for having acquired appropriate LWLocks.
2200 2201
 */
void
2202
DumpLocks(PGPROC *proc)
2203
{
2204
	SHM_QUEUE  *procLocks;
2205
	PROCLOCK   *proclock;
2206
	LOCK	   *lock;
2207
	int			i;
2208

2209
	if (proc == NULL)
2210
		return;
2211

2212 2213 2214
	if (proc->waitLock)
		LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);

2215
	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
2216
	{
2217
		procLocks = &(proc->myProcLocks[i]);
2218

2219 2220
		proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
											 offsetof(PROCLOCK, procLink));
M
 
Marc G. Fournier 已提交
2221

2222 2223
		while (proclock)
		{
2224
			Assert(proclock->tag.myProc == proc);
2225

2226
			lock = proclock->tag.myLock;
2227 2228 2229 2230 2231 2232 2233 2234

			PROCLOCK_PRINT("DumpLocks", proclock);
			LOCK_PRINT("DumpLocks", lock, 0);

			proclock = (PROCLOCK *)
				SHMQueueNext(procLocks, &proclock->procLink,
							 offsetof(PROCLOCK, procLink));
		}
2235
	}
2236
}
2237

M
 
Marc G. Fournier 已提交
2238
/*
2239 2240 2241
 * Dump all lmgr locks.
 *
 * Caller is responsible for having acquired appropriate LWLocks.
M
 
Marc G. Fournier 已提交
2242 2243
 */
void
2244
DumpAllLocks(void)
M
 
Marc G. Fournier 已提交
2245
{
J
Jan Wieck 已提交
2246
	PGPROC	   *proc;
2247
	PROCLOCK   *proclock;
2248
	LOCK	   *lock;
2249
	HASH_SEQ_STATUS status;
M
 
Marc G. Fournier 已提交
2250

2251
	proc = MyProc;
M
 
Marc G. Fournier 已提交
2252

2253
	if (proc && proc->waitLock)
2254
		LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
M
 
Marc G. Fournier 已提交
2255

2256
	hash_seq_init(&status, LockMethodProcLockHash);
M
 
Marc G. Fournier 已提交
2257

2258 2259 2260
	while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL)
	{
		PROCLOCK_PRINT("DumpAllLocks", proclock);
2261

2262 2263 2264 2265 2266
		lock = proclock->tag.myLock;
		if (lock)
			LOCK_PRINT("DumpAllLocks", lock, 0);
		else
			elog(LOG, "DumpAllLocks: proclock->tag.myLock = NULL");
M
 
Marc G. Fournier 已提交
2267 2268
	}
}
2269
#endif   /* LOCK_DEBUG */
2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287

/*
 * LOCK 2PC resource manager's routines
 */

/*
 * Re-acquire a lock belonging to a transaction that was prepared.
 *
 * Because this function is run at db startup, re-acquiring the locks should
 * never conflict with running transactions because there are none.  We
 * assume that the lock state represented by the stored 2PC files is legal.
 */
void
lock_twophase_recover(TransactionId xid, uint16 info,
					  void *recdata, uint32 len)
{
	TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
	PGPROC	   *proc = TwoPhaseGetDummyProc(xid);
B
Bruce Momjian 已提交
2288
	LOCKTAG    *locktag;
2289 2290 2291 2292 2293 2294
	LOCKMODE	lockmode;
	LOCKMETHODID lockmethodid;
	LOCK	   *lock;
	PROCLOCK   *proclock;
	PROCLOCKTAG proclocktag;
	bool		found;
2295 2296
	uint32		hashcode;
	uint32		proclock_hashcode;
2297 2298
	int			partition;
	LWLockId	partitionLock;
2299 2300 2301 2302 2303 2304 2305
	LockMethod	lockMethodTable;

	Assert(len == sizeof(TwoPhaseLockRecord));
	locktag = &rec->locktag;
	lockmode = rec->lockmode;
	lockmethodid = locktag->locktag_lockmethodid;

2306
	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
2307
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
2308
	lockMethodTable = LockMethods[lockmethodid];
2309

2310 2311 2312
	hashcode = LockTagHashCode(locktag);
	partition = LockHashPartition(hashcode);
	partitionLock = LockHashPartitionLock(hashcode);
2313

2314
	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2315 2316 2317 2318

	/*
	 * Find or create a lock with this tag.
	 */
2319 2320 2321 2322 2323
	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
												(void *) locktag,
												hashcode,
												HASH_ENTER_NULL,
												&found);
2324 2325
	if (!lock)
	{
2326
		LWLockRelease(partitionLock);
2327 2328 2329
		ereport(ERROR,
				(errcode(ERRCODE_OUT_OF_MEMORY),
				 errmsg("out of shared memory"),
B
Bruce Momjian 已提交
2330
		  errhint("You might need to increase max_locks_per_transaction.")));
2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358
	}

	/*
	 * if it's a new lock object, initialize it
	 */
	if (!found)
	{
		lock->grantMask = 0;
		lock->waitMask = 0;
		SHMQueueInit(&(lock->procLocks));
		ProcQueueInit(&(lock->waitProcs));
		lock->nRequested = 0;
		lock->nGranted = 0;
		MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
		MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
		LOCK_PRINT("lock_twophase_recover: new", lock, lockmode);
	}
	else
	{
		LOCK_PRINT("lock_twophase_recover: found", lock, lockmode);
		Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
		Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
		Assert(lock->nGranted <= lock->nRequested);
	}

	/*
	 * Create the hash key for the proclock table.
	 */
2359 2360 2361 2362
	proclocktag.myLock = lock;
	proclocktag.myProc = proc;

	proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
2363 2364 2365 2366

	/*
	 * Find or create a proclock entry with this tag
	 */
2367 2368 2369 2370 2371
	proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
														(void *) &proclocktag,
														proclock_hashcode,
														HASH_ENTER_NULL,
														&found);
2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383
	if (!proclock)
	{
		/* Ooops, not enough shmem for the proclock */
		if (lock->nRequested == 0)
		{
			/*
			 * There are no other requestors of this lock, so garbage-collect
			 * the lock object.  We *must* do this to avoid a permanent leak
			 * of shared memory, because there won't be anything to cause
			 * anyone to release the lock object later.
			 */
			Assert(SHMQueueEmpty(&(lock->procLocks)));
2384 2385 2386 2387 2388
			if (!hash_search_with_hash_value(LockMethodLockHash,
											 (void *) &(lock->tag),
											 hashcode,
											 HASH_REMOVE,
											 NULL))
2389 2390
				elog(PANIC, "lock table corrupted");
		}
2391
		LWLockRelease(partitionLock);
2392 2393 2394
		ereport(ERROR,
				(errcode(ERRCODE_OUT_OF_MEMORY),
				 errmsg("out of shared memory"),
B
Bruce Momjian 已提交
2395
		  errhint("You might need to increase max_locks_per_transaction.")));
2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406
	}

	/*
	 * If new, initialize the new entry
	 */
	if (!found)
	{
		proclock->holdMask = 0;
		proclock->releaseMask = 0;
		/* Add proclock to appropriate lists */
		SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
2407 2408
		SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
							 &proclock->procLink);
2409 2410 2411 2412 2413 2414 2415 2416 2417 2418
		PROCLOCK_PRINT("lock_twophase_recover: new", proclock);
	}
	else
	{
		PROCLOCK_PRINT("lock_twophase_recover: found", proclock);
		Assert((proclock->holdMask & ~lock->grantMask) == 0);
	}

	/*
	 * lock->nRequested and lock->requested[] count the total number of
B
Bruce Momjian 已提交
2419
	 * requests, whether granted or waiting, so increment those immediately.
2420 2421 2422 2423 2424 2425 2426 2427 2428 2429
	 */
	lock->nRequested++;
	lock->requested[lockmode]++;
	Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));

	/*
	 * We shouldn't already hold the desired lock.
	 */
	if (proclock->holdMask & LOCKBIT_ON(lockmode))
		elog(ERROR, "lock %s on object %u/%u/%u is already held",
2430
			 lockMethodTable->lockModeNames[lockmode],
2431 2432 2433 2434 2435 2436 2437 2438
			 lock->tag.locktag_field1, lock->tag.locktag_field2,
			 lock->tag.locktag_field3);

	/*
	 * We ignore any possible conflicts and just grant ourselves the lock.
	 */
	GrantLock(lock, proclock, lockmode);

2439
	LWLockRelease(partitionLock);
2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452
}

/*
 * 2PC processing routine for COMMIT PREPARED case.
 *
 * Find and release the lock indicated by the 2PC record.
 */
void
lock_twophase_postcommit(TransactionId xid, uint16 info,
						 void *recdata, uint32 len)
{
	TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
	PGPROC	   *proc = TwoPhaseGetDummyProc(xid);
B
Bruce Momjian 已提交
2453
	LOCKTAG    *locktag;
2454 2455 2456 2457
	LOCKMODE	lockmode;
	LOCKMETHODID lockmethodid;
	LOCK	   *lock;
	PROCLOCK   *proclock;
2458
	PROCLOCKTAG proclocktag;
2459 2460
	uint32		hashcode;
	uint32		proclock_hashcode;
2461
	LWLockId	partitionLock;
2462 2463 2464 2465 2466 2467 2468 2469
	LockMethod	lockMethodTable;
	bool		wakeupNeeded;

	Assert(len == sizeof(TwoPhaseLockRecord));
	locktag = &rec->locktag;
	lockmode = rec->lockmode;
	lockmethodid = locktag->locktag_lockmethodid;

2470
	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
2471
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
2472
	lockMethodTable = LockMethods[lockmethodid];
2473

2474 2475
	hashcode = LockTagHashCode(locktag);
	partitionLock = LockHashPartitionLock(hashcode);
2476

2477
	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2478 2479 2480 2481

	/*
	 * Re-find the lock object (it had better be there).
	 */
2482 2483 2484 2485 2486
	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
												(void *) locktag,
												hashcode,
												HASH_FIND,
												NULL);
2487 2488 2489 2490 2491 2492
	if (!lock)
		elog(PANIC, "failed to re-find shared lock object");

	/*
	 * Re-find the proclock object (ditto).
	 */
2493 2494 2495 2496 2497 2498 2499 2500 2501 2502
	proclocktag.myLock = lock;
	proclocktag.myProc = proc;

	proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);

	proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
														(void *) &proclocktag,
														proclock_hashcode,
														HASH_FIND,
														NULL);
2503 2504 2505 2506
	if (!proclock)
		elog(PANIC, "failed to re-find shared proclock object");

	/*
B
Bruce Momjian 已提交
2507 2508
	 * Double-check that we are actually holding a lock of the type we want to
	 * release.
2509 2510 2511 2512
	 */
	if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
	{
		PROCLOCK_PRINT("lock_twophase_postcommit: WRONGTYPE", proclock);
2513
		LWLockRelease(partitionLock);
2514
		elog(WARNING, "you don't own a lock of type %s",
2515
			 lockMethodTable->lockModeNames[lockmode]);
2516 2517 2518 2519 2520 2521 2522 2523
		return;
	}

	/*
	 * Do the releasing.  CleanUpLock will waken any now-wakable waiters.
	 */
	wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);

2524
	CleanUpLock(lock, proclock,
2525
				lockMethodTable, hashcode,
2526
				wakeupNeeded);
2527

2528
	LWLockRelease(partitionLock);
2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541
}

/*
 * 2PC processing routine for ROLLBACK PREPARED case.
 *
 * This is actually just the same as the COMMIT case.
 */
void
lock_twophase_postabort(TransactionId xid, uint16 info,
						void *recdata, uint32 len)
{
	lock_twophase_postcommit(xid, info, recdata, len);
}