lock.c 65.9 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * lock.c
4
 *	  POSTGRES primary lock mechanism
5
 *
P
 
PostgreSQL Daemon 已提交
6
 * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
B
Add:  
Bruce Momjian 已提交
7
 * Portions Copyright (c) 1994, Regents of the University of California
8 9 10
 *
 *
 * IDENTIFICATION
11
 *	  $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.162 2005/12/11 21:02:18 tgl Exp $
12 13
 *
 * NOTES
14
 *	  A lock table is a shared memory hash table.  When
15
 *	  a process tries to acquire a lock of a type that conflicts
16 17
 *	  with existing locks, it is put to sleep using the routines
 *	  in storage/lmgr/proc.c.
18
 *
19 20 21
 *	  For the most part, this code should be invoked via lmgr.c
 *	  or another lock-management module, not directly.
 *
22
 *	Interface:
23
 *
24 25
 *	InitLocks(), GetLocksMethodTable(),
 *	LockAcquire(), LockRelease(), LockReleaseAll(),
26
 *	LockCheckConflicts(), GrantLock()
27 28 29
 *
 *-------------------------------------------------------------------------
 */
30 31
#include "postgres.h"

M
 
Marc G. Fournier 已提交
32
#include <signal.h>
33
#include <unistd.h>
B
Bruce Momjian 已提交
34

35 36
#include "access/twophase.h"
#include "access/twophase_rmgr.h"
B
Bruce Momjian 已提交
37
#include "access/xact.h"
B
Bruce Momjian 已提交
38
#include "miscadmin.h"
39
#include "storage/proc.h"
40
#include "utils/memutils.h"
M
 
Marc G. Fournier 已提交
41
#include "utils/ps_status.h"
42
#include "utils/resowner.h"
43

44 45

/* This configuration variable is used to set the lock table size */
46
int			max_locks_per_xact; /* set by guc.c */
47

B
Bruce Momjian 已提交
48
#define NLOCKENTS() \
49
	mul_size(max_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
50 51


52
/*
53 54 55
 * Data structures defining the semantics of the standard lock methods.
 *
 * The conflict table defines the semantics of the various lock modes.
56
 */
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
static const LOCKMASK LockConflicts[] = {
	0,

	/* AccessShareLock */
	(1 << AccessExclusiveLock),

	/* RowShareLock */
	(1 << ExclusiveLock) | (1 << AccessExclusiveLock),

	/* RowExclusiveLock */
	(1 << ShareLock) | (1 << ShareRowExclusiveLock) |
	(1 << ExclusiveLock) | (1 << AccessExclusiveLock),

	/* ShareUpdateExclusiveLock */
	(1 << ShareUpdateExclusiveLock) |
	(1 << ShareLock) | (1 << ShareRowExclusiveLock) |
	(1 << ExclusiveLock) | (1 << AccessExclusiveLock),

	/* ShareLock */
	(1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
	(1 << ShareRowExclusiveLock) |
	(1 << ExclusiveLock) | (1 << AccessExclusiveLock),

	/* ShareRowExclusiveLock */
	(1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
	(1 << ShareLock) | (1 << ShareRowExclusiveLock) |
	(1 << ExclusiveLock) | (1 << AccessExclusiveLock),

	/* ExclusiveLock */
	(1 << RowShareLock) |
	(1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
	(1 << ShareLock) | (1 << ShareRowExclusiveLock) |
	(1 << ExclusiveLock) | (1 << AccessExclusiveLock),

	/* AccessExclusiveLock */
	(1 << AccessShareLock) | (1 << RowShareLock) |
	(1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
	(1 << ShareLock) | (1 << ShareRowExclusiveLock) |
	(1 << ExclusiveLock) | (1 << AccessExclusiveLock)
96

97
};
98

99
/* Names of lock modes, for debug printouts */
B
Bruce Momjian 已提交
100
static const char *const lock_mode_names[] =
101
{
102
	"INVALID",
103 104 105 106 107 108 109 110
	"AccessShareLock",
	"RowShareLock",
	"RowExclusiveLock",
	"ShareUpdateExclusiveLock",
	"ShareLock",
	"ShareRowExclusiveLock",
	"ExclusiveLock",
	"AccessExclusiveLock"
111
};
112

113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
#ifndef LOCK_DEBUG
static bool		Dummy_trace = false;
#endif

static const LockMethodData default_lockmethod = {
	AccessExclusiveLock,		/* highest valid lock mode number */
	true,
	LockConflicts,
	lock_mode_names,
#ifdef LOCK_DEBUG
	&Trace_locks
#else
	&Dummy_trace
#endif
};

#ifdef USER_LOCKS

static const LockMethodData user_lockmethod = {
	AccessExclusiveLock,		/* highest valid lock mode number */
	false,
	LockConflicts,
	lock_mode_names,
#ifdef LOCK_DEBUG
	&Trace_userlocks
#else
	&Dummy_trace
#endif
};

#endif /* USER_LOCKS */

/*
 * map from lock method id to the lock table data structures
 */
static const LockMethod LockMethods[] = {
	NULL,
	&default_lockmethod,
#ifdef USER_LOCKS
	&user_lockmethod
#endif
};


/* Record that's written to 2PC state file when a lock is persisted */
typedef struct TwoPhaseLockRecord
{
	LOCKTAG		locktag;
	LOCKMODE	lockmode;
} TwoPhaseLockRecord;


/*
166 167 168 169
 * Pointers to hash tables containing lock state
 *
 * The LockMethodLockHash and LockMethodProcLockHash hash tables are in
 * shared memory; LockMethodLocalHash is local to each backend.
170
 */
171 172
static HTAB *LockMethodLockHash[NUM_LOCK_PARTITIONS];
static HTAB *LockMethodProcLockHash[NUM_LOCK_PARTITIONS];
173 174 175 176 177 178 179
static HTAB *LockMethodLocalHash;


/* private state for GrantAwaitedLock */
static LOCALLOCK *awaitedLock;
static ResourceOwner awaitedOwner;

180 181 182 183 184 185

#ifdef LOCK_DEBUG

/*------
 * The following configuration options are available for lock debugging:
 *
B
Bruce Momjian 已提交
186 187 188 189 190 191
 *	   TRACE_LOCKS		-- give a bunch of output what's going on in this file
 *	   TRACE_USERLOCKS	-- same but for user locks
 *	   TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
 *						   (use to avoid output on system tables)
 *	   TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
 *	   DEBUG_DEADLOCKS	-- currently dumps locks at untimely occasions ;)
B
Bruce Momjian 已提交
192
 *
193 194
 * Furthermore, but in storage/lmgr/lwlock.c:
 *	   TRACE_LWLOCKS	-- trace lightweight locks (pretty useless)
195
 *
B
Bruce Momjian 已提交
196 197
 * Define LOCK_DEBUG at compile time to get all these enabled.
 * --------
198 199
 */

200
int			Trace_lock_oidmin = FirstNormalObjectId;
B
Bruce Momjian 已提交
201 202 203 204
bool		Trace_locks = false;
bool		Trace_userlocks = false;
int			Trace_lock_table = 0;
bool		Debug_deadlocks = false;
205 206 207


inline static bool
208
LOCK_DEBUG_ENABLED(const LOCKTAG *tag)
209
{
B
Bruce Momjian 已提交
210
	return
211 212 213 214
		(*(LockMethods[tag->locktag_lockmethodid]->trace_flag) &&
		 ((Oid) tag->locktag_field2 >= (Oid) Trace_lock_oidmin))
		|| (Trace_lock_table &&
			(tag->locktag_field2 == Trace_lock_table));
215 216 217 218
}


inline static void
B
Bruce Momjian 已提交
219
LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
220
{
221
	if (LOCK_DEBUG_ENABLED(&lock->tag))
222
		elog(LOG,
223
			 "%s: lock(%lx) id(%u,%u,%u,%u,%u,%u) grantMask(%x) "
B
Bruce Momjian 已提交
224 225 226
			 "req(%d,%d,%d,%d,%d,%d,%d)=%d "
			 "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
			 where, MAKE_OFFSET(lock),
227 228 229 230
			 lock->tag.locktag_field1, lock->tag.locktag_field2,
			 lock->tag.locktag_field3, lock->tag.locktag_field4,
			 lock->tag.locktag_type, lock->tag.locktag_lockmethodid,
			 lock->grantMask,
B
Bruce Momjian 已提交
231
			 lock->requested[1], lock->requested[2], lock->requested[3],
232 233
			 lock->requested[4], lock->requested[5], lock->requested[6],
			 lock->requested[7], lock->nRequested,
B
Bruce Momjian 已提交
234 235 236
			 lock->granted[1], lock->granted[2], lock->granted[3],
			 lock->granted[4], lock->granted[5], lock->granted[6],
			 lock->granted[7], lock->nGranted,
237 238
			 lock->waitProcs.size,
			 LockMethods[LOCK_LOCKMETHOD(*lock)]->lockModeNames[type]);
239 240 241 242
}


inline static void
243
PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
244
{
245
	if (LOCK_DEBUG_ENABLED(&((LOCK *) MAKE_PTR(proclockP->tag.lock))->tag))
246
		elog(LOG,
B
Bruce Momjian 已提交
247
			 "%s: proclock(%lx) lock(%lx) method(%u) proc(%lx) hold(%x)",
248 249
			 where, MAKE_OFFSET(proclockP), proclockP->tag.lock,
			 PROCLOCK_LOCKMETHOD(*(proclockP)),
250
			 proclockP->tag.proc, (int) proclockP->holdMask);
251
}
B
Bruce Momjian 已提交
252
#else							/* not LOCK_DEBUG */
253 254

#define LOCK_PRINT(where, lock, type)
255
#define PROCLOCK_PRINT(where, proclockP)
256
#endif   /* not LOCK_DEBUG */
257

258

259 260
static void RemoveLocalLock(LOCALLOCK *locallock);
static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
261
static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
262
static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
B
Bruce Momjian 已提交
263
			PROCLOCK *proclock, LockMethod lockMethodTable);
264 265 266
static void CleanUpLock(LOCK *lock, PROCLOCK *proclock,
			LockMethod lockMethodTable, int partition,
			bool wakeupNeeded);
267

268

B
Bruce Momjian 已提交
269
/*
270 271 272 273 274 275 276 277 278 279
 * InitLocks -- Initialize the lock manager's data structures.
 *
 * This is called from CreateSharedMemoryAndSemaphores(), which see for
 * more comments.  In the normal postmaster case, the shared hash tables
 * are created here, as well as a locallock hash table that will remain
 * unused and empty in the postmaster itself.  Backends inherit the pointers
 * to the shared tables via fork(), and also inherit an image of the locallock
 * hash table, which they proceed to use.  In the EXEC_BACKEND case, each
 * backend re-executes this code to obtain pointers to the already existing
 * shared hash tables and to create its locallock hash table.
280 281
 */
void
282
InitLocks(void)
283
{
284
	char		shmemName[64];
285 286
	HASHCTL		info;
	int			hash_flags;
287 288
	long		init_table_size,
				max_table_size;
289
	int			i;
290

291 292 293 294
	/*
	 * Compute init/max size to request for lock hashtables.  Note these
	 * calculations must agree with LockShmemSize!
	 */
295
	max_table_size = NLOCKENTS();
296
	max_table_size = (max_table_size - 1) / NUM_LOCK_PARTITIONS + 1;
297
	init_table_size = max_table_size / 2;
298

B
Bruce Momjian 已提交
299
	/*
300
	 * Allocate hash tables for LOCK structs.  These are used to store
B
Bruce Momjian 已提交
301
	 * per-locked-object information.
302
	 */
303
	MemSet(&info, 0, sizeof(info));
304 305
	info.keysize = sizeof(LOCKTAG);
	info.entrysize = sizeof(LOCK);
306 307 308
	info.hash = tag_hash;
	hash_flags = (HASH_ELEM | HASH_FUNCTION);

309 310 311 312 313 314 315 316 317 318 319
	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
	{
		sprintf(shmemName, "LOCK hash %d", i);
		LockMethodLockHash[i] = ShmemInitHash(shmemName,
											  init_table_size,
											  max_table_size,
											  &info,
											  hash_flags);
		if (!LockMethodLockHash[i])
			elog(FATAL, "could not initialize lock table \"%s\"", shmemName);
	}
320

321 322 323
	/* Assume an average of 2 holders per lock */
	max_table_size *= 2;
	init_table_size *= 2;
324

B
Bruce Momjian 已提交
325
	/*
326 327
	 * Allocate hash tables for PROCLOCK structs.  These are used to store
	 * per-lock-per-holder information.
328
	 */
B
Bruce Momjian 已提交
329 330
	info.keysize = sizeof(PROCLOCKTAG);
	info.entrysize = sizeof(PROCLOCK);
331 332 333
	info.hash = tag_hash;
	hash_flags = (HASH_ELEM | HASH_FUNCTION);

334 335 336 337 338 339 340 341 342 343 344
	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
	{
		sprintf(shmemName, "PROCLOCK hash %d", i);
		LockMethodProcLockHash[i] = ShmemInitHash(shmemName,
												  init_table_size,
												  max_table_size,
												  &info,
												  hash_flags);
		if (!LockMethodProcLockHash[i])
			elog(FATAL, "could not initialize lock table \"%s\"", shmemName);
	}
345 346

	/*
347
	 * Allocate one non-shared hash table for LOCALLOCK structs.  This is used
B
Bruce Momjian 已提交
348
	 * to store lock counts and resource owner information.
349
	 *
350 351 352 353
	 * The non-shared table could already exist in this process (this occurs
	 * when the postmaster is recreating shared memory after a backend crash).
	 * If so, delete and recreate it.  (We could simply leave it, since it
	 * ought to be empty in the postmaster, but for safety let's zap it.)
354
	 */
355 356
	if (LockMethodLocalHash)
		hash_destroy(LockMethodLocalHash);
357 358 359 360 361 362

	info.keysize = sizeof(LOCALLOCKTAG);
	info.entrysize = sizeof(LOCALLOCK);
	info.hash = tag_hash;
	hash_flags = (HASH_ELEM | HASH_FUNCTION);

363 364 365 366
	LockMethodLocalHash = hash_create("LOCALLOCK hash",
									  128,
									  &info,
									  hash_flags);
367 368
}

369

370
/*
371
 * Fetch the lock method table associated with a given lock
372
 */
373 374
LockMethod
GetLocksMethodTable(const LOCK *lock)
375
{
376
	LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock);
377

378 379
	Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
	return LockMethods[lockmethodid];
380 381
}

382

383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415
/*
 * Given a LOCKTAG, determine which partition the lock belongs in.
 *
 * Basically what we want to do here is hash the locktag.  However, it
 * seems unwise to use hash_any() because that is the same function that
 * will be used to distribute the locks within each partition's hash table;
 * if we use it, we run a big risk of having uneven distribution of hash
 * codes within each hash table.  Instead, we use a simple linear XOR of the
 * bits of the locktag.
 */
int
LockTagToPartition(const LOCKTAG *locktag)
{
	const uint8 *ptr = (const uint8 *) locktag;
	int			result = 0;
	int			i;

	for (i = 0; i < sizeof(LOCKTAG); i++)
		result ^= *ptr++;
#if NUM_LOCK_PARTITIONS == 16
	result ^= result >> 4;
	result &= 0x0F;
#elif NUM_LOCK_PARTITIONS == 4
	result ^= result >> 4;
	result ^= result >> 2;
	result &= 0x03;
#else
#error unsupported NUM_LOCK_PARTITIONS
#endif
	return result;
}


416 417
/*
 * LockAcquire -- Check for lock conflicts, sleep if conflict found,
418
 *		set lock if/when no conflicts.
419
 *
420 421 422 423 424 425 426 427
 * Inputs:
 *	locktag: unique identifier for the lockable object
 *	isTempObject: is the lockable object a temporary object?  (Under 2PC,
 *		such locks cannot be persisted)
 *	lockmode: lock mode to acquire
 *	sessionLock: if true, acquire lock for session not current transaction
 *	dontWait: if true, don't wait to acquire lock
 *
428 429 430 431 432 433 434 435
 * Returns one of:
 *		LOCKACQUIRE_NOT_AVAIL		lock not available, and dontWait=true
 *		LOCKACQUIRE_OK				lock successfully acquired
 *		LOCKACQUIRE_ALREADY_HELD	incremented count for lock already held
 *
 * In the normal case where dontWait=false and the caller doesn't need to
 * distinguish a freshly acquired lock from one already taken earlier in
 * this same transaction, there is no need to examine the return value.
436
 *
437 438 439 440
 * Side Effects: The lock is acquired and recorded in lock tables.
 *
 * NOTE: if we wait for the lock, there is no way to abort the wait
 * short of aborting the transaction.
441
 */
442
LockAcquireResult
443
LockAcquire(const LOCKTAG *locktag,
444 445 446 447
			bool isTempObject,
			LOCKMODE lockmode,
			bool sessionLock,
			bool dontWait)
448
{
449 450
	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
	LockMethod	lockMethodTable;
451 452 453
	LOCALLOCKTAG localtag;
	LOCALLOCK  *locallock;
	LOCK	   *lock;
454 455
	PROCLOCK   *proclock;
	PROCLOCKTAG proclocktag;
456
	bool		found;
457
	ResourceOwner owner;
458 459
	int			partition;
	LWLockId	partitionLock;
460
	int			status;
461

462 463 464 465 466 467
	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
	lockMethodTable = LockMethods[lockmethodid];
	if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
		elog(ERROR, "unrecognized lock mode: %d", lockmode);

468
#ifdef LOCK_DEBUG
469 470
	if (LOCK_DEBUG_ENABLED(locktag))
		elog(LOG, "LockAcquire: lock [%u,%u] %s",
471
			 locktag->locktag_field1, locktag->locktag_field2,
472
			 lockMethodTable->lockModeNames[lockmode]);
473 474
#endif

475 476
	/* Session locks are never transactional, else check table */
	if (!sessionLock && lockMethodTable->transactional)
477 478 479 480 481 482 483
		owner = CurrentResourceOwner;
	else
		owner = NULL;

	/*
	 * Find or create a LOCALLOCK entry for this lock and lockmode
	 */
B
Bruce Momjian 已提交
484
	MemSet(&localtag, 0, sizeof(localtag));		/* must clear padding */
485 486 487
	localtag.lock = *locktag;
	localtag.mode = lockmode;

488
	locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
489 490 491 492 493 494 495 496 497 498
										  (void *) &localtag,
										  HASH_ENTER, &found);

	/*
	 * if it's a new locallock object, initialize it
	 */
	if (!found)
	{
		locallock->lock = NULL;
		locallock->proclock = NULL;
499
		locallock->isTempObject = isTempObject;
500
		locallock->partition = LockTagToPartition(&(localtag.lock));
501 502 503 504 505 506
		locallock->nLocks = 0;
		locallock->numLockOwners = 0;
		locallock->maxLockOwners = 8;
		locallock->lockOwners = NULL;
		locallock->lockOwners = (LOCALLOCKOWNER *)
			MemoryContextAlloc(TopMemoryContext,
B
Bruce Momjian 已提交
507
						  locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
508 509 510
	}
	else
	{
511 512
		Assert(locallock->isTempObject == isTempObject);

513 514 515
		/* Make sure there will be room to remember the lock */
		if (locallock->numLockOwners >= locallock->maxLockOwners)
		{
B
Bruce Momjian 已提交
516
			int			newsize = locallock->maxLockOwners * 2;
517 518 519 520 521 522 523 524 525

			locallock->lockOwners = (LOCALLOCKOWNER *)
				repalloc(locallock->lockOwners,
						 newsize * sizeof(LOCALLOCKOWNER));
			locallock->maxLockOwners = newsize;
		}
	}

	/*
B
Bruce Momjian 已提交
526
	 * If we already hold the lock, we can just increase the count locally.
527 528 529 530
	 */
	if (locallock->nLocks > 0)
	{
		GrantLockLocal(locallock, owner);
531
		return LOCKACQUIRE_ALREADY_HELD;
532 533 534 535 536
	}

	/*
	 * Otherwise we've got to mess with the shared lock table.
	 */
537 538
	partition = locallock->partition;
	partitionLock = FirstLockMgrLock + partition;
539

540
	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
541

M
 
Marc G. Fournier 已提交
542
	/*
543 544
	 * Find or create a lock with this tag.
	 *
545 546
	 * Note: if the locallock object already existed, it might have a pointer
	 * to the lock already ... but we probably should not assume that that
547 548
	 * pointer is valid, since a lock object with no locks can go away
	 * anytime.
M
 
Marc G. Fournier 已提交
549
	 */
550
	lock = (LOCK *) hash_search(LockMethodLockHash[partition],
551
								(void *) locktag,
552
								HASH_ENTER_NULL, &found);
553
	if (!lock)
554
	{
555
		LWLockRelease(partitionLock);
556 557
		ereport(ERROR,
				(errcode(ERRCODE_OUT_OF_MEMORY),
558
				 errmsg("out of shared memory"),
B
Bruce Momjian 已提交
559
			errhint("You may need to increase max_locks_per_transaction.")));
560
	}
561
	locallock->lock = lock;
562

B
Bruce Momjian 已提交
563
	/*
564
	 * if it's a new lock object, initialize it
565 566
	 */
	if (!found)
567
	{
568 569
		lock->grantMask = 0;
		lock->waitMask = 0;
570
		SHMQueueInit(&(lock->procLocks));
571
		ProcQueueInit(&(lock->waitProcs));
572 573
		lock->nRequested = 0;
		lock->nGranted = 0;
574 575
		MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
		MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
M
 
Marc G. Fournier 已提交
576
		LOCK_PRINT("LockAcquire: new", lock, lockmode);
577 578 579
	}
	else
	{
M
 
Marc G. Fournier 已提交
580
		LOCK_PRINT("LockAcquire: found", lock, lockmode);
581 582 583
		Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
		Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
		Assert(lock->nGranted <= lock->nRequested);
584
	}
585

B
Bruce Momjian 已提交
586
	/*
587
	 * Create the hash key for the proclock table.
588
	 */
B
Bruce Momjian 已提交
589
	MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG));		/* must clear padding */
590 591
	proclocktag.lock = MAKE_OFFSET(lock);
	proclocktag.proc = MAKE_OFFSET(MyProc);
592

M
 
Marc G. Fournier 已提交
593
	/*
594
	 * Find or create a proclock entry with this tag
M
 
Marc G. Fournier 已提交
595
	 */
596
	proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[partition],
B
Bruce Momjian 已提交
597
										(void *) &proclocktag,
598
										HASH_ENTER_NULL, &found);
599
	if (!proclock)
600
	{
601 602 603 604 605 606 607 608 609 610
		/* Ooops, not enough shmem for the proclock */
		if (lock->nRequested == 0)
		{
			/*
			 * There are no other requestors of this lock, so garbage-collect
			 * the lock object.  We *must* do this to avoid a permanent leak
			 * of shared memory, because there won't be anything to cause
			 * anyone to release the lock object later.
			 */
			Assert(SHMQueueEmpty(&(lock->procLocks)));
611
			if (!hash_search(LockMethodLockHash[partition],
612 613 614
							 (void *) &(lock->tag),
							 HASH_REMOVE, NULL))
				elog(PANIC, "lock table corrupted");
615
		}
616
		LWLockRelease(partitionLock);
617 618
		ereport(ERROR,
				(errcode(ERRCODE_OUT_OF_MEMORY),
619
				 errmsg("out of shared memory"),
B
Bruce Momjian 已提交
620
			errhint("You may need to increase max_locks_per_transaction.")));
621
	}
622
	locallock->proclock = proclock;
M
 
Marc G. Fournier 已提交
623 624

	/*
625
	 * If new, initialize the new entry
M
 
Marc G. Fournier 已提交
626
	 */
627
	if (!found)
628
	{
629
		proclock->holdMask = 0;
630
		proclock->releaseMask = 0;
631
		/* Add proclock to appropriate lists */
632
		SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
633 634
		SHMQueueInsertBefore(&(MyProc->myProcLocks[partition]),
							 &proclock->procLink);
635
		PROCLOCK_PRINT("LockAcquire: new", proclock);
636 637 638
	}
	else
	{
639
		PROCLOCK_PRINT("LockAcquire: found", proclock);
640
		Assert((proclock->holdMask & ~lock->grantMask) == 0);
641

642
#ifdef CHECK_DEADLOCK_RISK
B
Bruce Momjian 已提交
643

644
		/*
B
Bruce Momjian 已提交
645 646 647 648 649
		 * Issue warning if we already hold a lower-level lock on this object
		 * and do not hold a lock of the requested level or higher. This
		 * indicates a deadlock-prone coding practice (eg, we'd have a
		 * deadlock if another backend were following the same code path at
		 * about the same time).
650
		 *
B
Bruce Momjian 已提交
651 652 653
		 * This is not enabled by default, because it may generate log entries
		 * about user-level coding practices that are in fact safe in context.
		 * It can be enabled to help find system-level problems.
654
		 *
B
Bruce Momjian 已提交
655 656
		 * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
		 * better to use a table.  For now, though, this works.
657 658
		 */
		{
B
Bruce Momjian 已提交
659
			int			i;
660 661

			for (i = lockMethodTable->numLockModes; i > 0; i--)
662
			{
663 664 665
				if (proclock->holdMask & LOCKBIT_ON(i))
				{
					if (i >= (int) lockmode)
B
Bruce Momjian 已提交
666
						break;	/* safe: we have a lock >= req level */
667 668
					elog(LOG, "deadlock risk: raising lock level"
						 " from %s to %s on object %u/%u/%u",
669 670
						 lockMethodTable->lockModeNames[i],
						 lockMethodTable->lockModeNames[lockmode],
671 672 673 674
						 lock->tag.locktag_field1, lock->tag.locktag_field2,
						 lock->tag.locktag_field3);
					break;
				}
675 676
			}
		}
677
#endif   /* CHECK_DEADLOCK_RISK */
678
	}
679

B
Bruce Momjian 已提交
680
	/*
681
	 * lock->nRequested and lock->requested[] count the total number of
B
Bruce Momjian 已提交
682 683
	 * requests, whether granted or waiting, so increment those immediately.
	 * The other counts don't increment till we get the lock.
684
	 */
685 686 687
	lock->nRequested++;
	lock->requested[lockmode]++;
	Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
688

B
Bruce Momjian 已提交
689
	/*
B
Bruce Momjian 已提交
690 691
	 * We shouldn't already hold the desired lock; else locallock table is
	 * broken.
692
	 */
693 694
	if (proclock->holdMask & LOCKBIT_ON(lockmode))
		elog(ERROR, "lock %s on object %u/%u/%u is already held",
695
			 lockMethodTable->lockModeNames[lockmode],
696 697
			 lock->tag.locktag_field1, lock->tag.locktag_field2,
			 lock->tag.locktag_field3);
698

B
Bruce Momjian 已提交
699
	/*
B
Bruce Momjian 已提交
700 701 702
	 * If lock requested conflicts with locks requested by waiters, must join
	 * wait queue.	Otherwise, check for conflict with already-held locks.
	 * (That's last because most complex check.)
V
Vadim B. Mikheev 已提交
703
	 */
B
Bruce Momjian 已提交
704
	if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
705
		status = STATUS_FOUND;
V
Vadim B. Mikheev 已提交
706
	else
707
		status = LockCheckConflicts(lockMethodTable, lockmode,
708
									lock, proclock, MyProc);
V
Vadim B. Mikheev 已提交
709

710
	if (status == STATUS_OK)
711 712
	{
		/* No conflict with held or previously requested locks */
713
		GrantLock(lock, proclock, lockmode);
714
		GrantLockLocal(locallock, owner);
715 716
	}
	else
717
	{
718
		Assert(status == STATUS_FOUND);
719

720
		/*
721
		 * We can't acquire the lock immediately.  If caller specified no
B
Bruce Momjian 已提交
722 723
		 * blocking, remove useless table entries and return NOT_AVAIL without
		 * waiting.
724
		 */
725
		if (dontWait)
726
		{
727
			if (proclock->holdMask == 0)
728
			{
729 730
				SHMQueueDelete(&proclock->lockLink);
				SHMQueueDelete(&proclock->procLink);
731
				if (!hash_search(LockMethodProcLockHash[partition],
732 733 734
								 (void *) &(proclock->tag),
								 HASH_REMOVE, NULL))
					elog(PANIC, "proclock table corrupted");
735
			}
736
			else
737
				PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
738 739
			lock->nRequested--;
			lock->requested[lockmode]--;
740
			LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
741 742
			Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
			Assert(lock->nGranted <= lock->nRequested);
743
			LWLockRelease(partitionLock);
744 745
			if (locallock->nLocks == 0)
				RemoveLocalLock(locallock);
746
			return LOCKACQUIRE_NOT_AVAIL;
747
		}
B
Bruce Momjian 已提交
748

V
Vadim B. Mikheev 已提交
749
		/*
750
		 * Set bitmask of locks this process already holds on this object.
V
Vadim B. Mikheev 已提交
751
		 */
752
		MyProc->heldLocks = proclock->holdMask;
V
Vadim B. Mikheev 已提交
753

754 755 756
		/*
		 * Sleep till someone wakes me up.
		 */
757
		WaitOnLock(locallock, owner);
758

759 760
		/*
		 * NOTE: do not do any material change of state between here and
B
Bruce Momjian 已提交
761
		 * return.	All required changes in locktable state must have been
B
Bruce Momjian 已提交
762
		 * done when the lock was granted to us --- see notes in WaitOnLock.
763 764
		 */

M
 
Marc G. Fournier 已提交
765
		/*
766
		 * Check the proclock entry status, in case something in the ipc
767
		 * communication doesn't work correctly.
M
 
Marc G. Fournier 已提交
768
		 */
769
		if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
770
		{
771
			PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
772
			LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
M
 
Marc G. Fournier 已提交
773
			/* Should we retry ? */
774
			LWLockRelease(partitionLock);
775
			elog(ERROR, "LockAcquire failed");
M
 
Marc G. Fournier 已提交
776
		}
777
		PROCLOCK_PRINT("LockAcquire: granted", proclock);
M
 
Marc G. Fournier 已提交
778
		LOCK_PRINT("LockAcquire: granted", lock, lockmode);
779
	}
780

781
	LWLockRelease(partitionLock);
782

783
	return LOCKACQUIRE_OK;
784 785
}

786 787 788 789 790 791 792 793
/*
 * Subroutine to free a locallock entry
 */
static void
RemoveLocalLock(LOCALLOCK *locallock)
{
	pfree(locallock->lockOwners);
	locallock->lockOwners = NULL;
794
	if (!hash_search(LockMethodLocalHash,
795 796
					 (void *) &(locallock->tag),
					 HASH_REMOVE, NULL))
797 798 799
		elog(WARNING, "locallock table corrupted");
}

B
Bruce Momjian 已提交
800
/*
801 802 803 804
 * LockCheckConflicts -- test whether requested lock conflicts
 *		with those already granted
 *
 * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
805 806
 *
 * NOTES:
807
 *		Here's what makes this complicated: one process's locks don't
808 809
 * conflict with one another, no matter what purpose they are held for
 * (eg, session and transaction locks do not conflict).
810 811
 * So, we must subtract off our own locks when determining whether the
 * requested new lock conflicts with those already held.
812 813
 */
int
814
LockCheckConflicts(LockMethod lockMethodTable,
815 816
				   LOCKMODE lockmode,
				   LOCK *lock,
817
				   PROCLOCK *proclock,
818
				   PGPROC *proc)
819
{
B
Bruce Momjian 已提交
820
	int			numLockModes = lockMethodTable->numLockModes;
821 822
	LOCKMASK	myLocks;
	LOCKMASK	otherLocks;
823
	int			i;
824

B
Bruce Momjian 已提交
825
	/*
B
Bruce Momjian 已提交
826 827
	 * first check for global conflicts: If no locks conflict with my request,
	 * then I get the lock.
828
	 *
829 830 831 832
	 * Checking for conflict: lock->grantMask represents the types of
	 * currently held locks.  conflictTable[lockmode] has a bit set for each
	 * type of lock that conflicts with request.   Bitwise compare tells if
	 * there is a conflict.
833
	 */
B
Bruce Momjian 已提交
834
	if (!(lockMethodTable->conflictTab[lockmode] & lock->grantMask))
835
	{
836
		PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock);
837
		return STATUS_OK;
838
	}
839

B
Bruce Momjian 已提交
840
	/*
B
Bruce Momjian 已提交
841 842 843
	 * Rats.  Something conflicts.	But it could still be my own lock. We have
	 * to construct a conflict mask that does not reflect our own locks, but
	 * only lock types held by other processes.
844
	 */
845 846
	myLocks = proclock->holdMask;
	otherLocks = 0;
847
	for (i = 1; i <= numLockModes; i++)
848
	{
B
Bruce Momjian 已提交
849
		int			myHolding = (myLocks & LOCKBIT_ON(i)) ? 1 : 0;
850 851 852

		if (lock->granted[i] > myHolding)
			otherLocks |= LOCKBIT_ON(i);
853
	}
854

B
Bruce Momjian 已提交
855
	/*
856
	 * now check again for conflicts.  'otherLocks' describes the types of
B
Bruce Momjian 已提交
857 858
	 * locks held by other processes.  If one of these conflicts with the kind
	 * of lock that I want, there is a conflict and I have to sleep.
859
	 */
860
	if (!(lockMethodTable->conflictTab[lockmode] & otherLocks))
861
	{
862
		/* no conflict. OK to get the lock */
863
		PROCLOCK_PRINT("LockCheckConflicts: resolved", proclock);
864
		return STATUS_OK;
865
	}
866

867
	PROCLOCK_PRINT("LockCheckConflicts: conflicting", proclock);
868
	return STATUS_FOUND;
869 870
}

871
/*
872
 * GrantLock -- update the lock and proclock data structures to show
873
 *		the lock request has been granted.
874 875
 *
 * NOTE: if proc was blocked, it also needs to be removed from the wait list
876
 * and have its waitLock/waitProcLock fields cleared.  That's not done here.
877
 *
878 879 880
 * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK
 * table entry; but since we may be awaking some other process, we can't do
 * that here; it's done by GrantLockLocal, instead.
M
 
Marc G. Fournier 已提交
881 882
 */
void
883
GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
M
 
Marc G. Fournier 已提交
884
{
885 886
	lock->nGranted++;
	lock->granted[lockmode]++;
887
	lock->grantMask |= LOCKBIT_ON(lockmode);
888
	if (lock->granted[lockmode] == lock->requested[lockmode])
889
		lock->waitMask &= LOCKBIT_OFF(lockmode);
890
	proclock->holdMask |= LOCKBIT_ON(lockmode);
M
 
Marc G. Fournier 已提交
891
	LOCK_PRINT("GrantLock", lock, lockmode);
892 893
	Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
	Assert(lock->nGranted <= lock->nRequested);
894 895
}

896
/*
B
Bruce Momjian 已提交
897
 * UnGrantLock -- opposite of GrantLock.
898 899 900 901 902 903 904 905 906 907 908
 *
 * Updates the lock and proclock data structures to show that the lock
 * is no longer held nor requested by the current holder.
 *
 * Returns true if there were any waiters waiting on the lock that
 * should now be woken up with ProcLockWakeup.
 */
static bool
UnGrantLock(LOCK *lock, LOCKMODE lockmode,
			PROCLOCK *proclock, LockMethod lockMethodTable)
{
B
Bruce Momjian 已提交
909
	bool		wakeupNeeded = false;
910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931

	Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
	Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
	Assert(lock->nGranted <= lock->nRequested);

	/*
	 * fix the general lock stats
	 */
	lock->nRequested--;
	lock->requested[lockmode]--;
	lock->nGranted--;
	lock->granted[lockmode]--;

	if (lock->granted[lockmode] == 0)
	{
		/* change the conflict mask.  No more of this lock type. */
		lock->grantMask &= LOCKBIT_OFF(lockmode);
	}

	LOCK_PRINT("UnGrantLock: updated", lock, lockmode);

	/*
B
Bruce Momjian 已提交
932 933 934 935 936 937 938
	 * We need only run ProcLockWakeup if the released lock conflicts with at
	 * least one of the lock types requested by waiter(s).	Otherwise whatever
	 * conflict made them wait must still exist.  NOTE: before MVCC, we could
	 * skip wakeup if lock->granted[lockmode] was still positive. But that's
	 * not true anymore, because the remaining granted locks might belong to
	 * some waiter, who could now be awakened because he doesn't conflict with
	 * his own locks.
939 940 941 942 943 944 945 946 947 948 949 950 951
	 */
	if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
		wakeupNeeded = true;

	/*
	 * Now fix the per-proclock state.
	 */
	proclock->holdMask &= LOCKBIT_OFF(lockmode);
	PROCLOCK_PRINT("UnGrantLock: updated", proclock);

	return wakeupNeeded;
}

952
/*
B
Bruce Momjian 已提交
953
 * CleanUpLock -- clean up after releasing a lock.	We garbage-collect the
954 955 956 957 958
 * proclock and lock objects if possible, and call ProcLockWakeup if there
 * are remaining requests and the caller says it's OK.  (Normally, this
 * should be called after UnGrantLock, and wakeupNeeded is the result from
 * UnGrantLock.)
 *
959
 * The lock table's partition lock must be held at entry, and will be
960 961 962
 * held at exit.
 */
static void
963 964
CleanUpLock(LOCK *lock, PROCLOCK *proclock,
			LockMethod lockMethodTable, int partition,
965 966 967
			bool wakeupNeeded)
{
	/*
B
Bruce Momjian 已提交
968 969
	 * If this was my last hold on this lock, delete my entry in the proclock
	 * table.
970 971 972 973 974 975
	 */
	if (proclock->holdMask == 0)
	{
		PROCLOCK_PRINT("CleanUpLock: deleting", proclock);
		SHMQueueDelete(&proclock->lockLink);
		SHMQueueDelete(&proclock->procLink);
976
		if (!hash_search(LockMethodProcLockHash[partition],
977 978 979 980 981 982 983 984
						 (void *) &(proclock->tag),
						 HASH_REMOVE, NULL))
			elog(PANIC, "proclock table corrupted");
	}

	if (lock->nRequested == 0)
	{
		/*
B
Bruce Momjian 已提交
985 986
		 * The caller just released the last lock, so garbage-collect the lock
		 * object.
987 988 989
		 */
		LOCK_PRINT("CleanUpLock: deleting", lock, 0);
		Assert(SHMQueueEmpty(&(lock->procLocks)));
990
		if (!hash_search(LockMethodLockHash[partition],
991 992 993 994 995 996 997
						 (void *) &(lock->tag),
						 HASH_REMOVE, NULL))
			elog(PANIC, "lock table corrupted");
	}
	else if (wakeupNeeded)
	{
		/* There are waiters on this lock, so wake them up. */
998
		ProcLockWakeup(lockMethodTable, lock);
999 1000 1001
	}
}

1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012
/*
 * GrantLockLocal -- update the locallock data structures to show
 *		the lock request has been granted.
 *
 * We expect that LockAcquire made sure there is room to add a new
 * ResourceOwner entry.
 */
static void
GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
{
	LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
B
Bruce Momjian 已提交
1013
	int			i;
1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045

	Assert(locallock->numLockOwners < locallock->maxLockOwners);
	/* Count the total */
	locallock->nLocks++;
	/* Count the per-owner lock */
	for (i = 0; i < locallock->numLockOwners; i++)
	{
		if (lockOwners[i].owner == owner)
		{
			lockOwners[i].nLocks++;
			return;
		}
	}
	lockOwners[i].owner = owner;
	lockOwners[i].nLocks = 1;
	locallock->numLockOwners++;
}

/*
 * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing
 *		WaitOnLock on.
 *
 * proc.c needs this for the case where we are booted off the lock by
 * timeout, but discover that someone granted us the lock anyway.
 *
 * We could just export GrantLockLocal, but that would require including
 * resowner.h in lock.h, which creates circularity.
 */
void
GrantAwaitedLock(void)
{
	GrantLockLocal(awaitedLock, awaitedOwner);
M
 
Marc G. Fournier 已提交
1046 1047
}

1048 1049 1050
/*
 * WaitOnLock -- wait to acquire a lock
 *
1051
 * Caller must have set MyProc->heldLocks to reflect locks already held
1052
 * on the lockable object by this process.
1053
 *
1054
 * The appropriate partition lock must be held at entry.
1055
 */
1056
static void
1057
WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
1058
{
1059
	LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock);
1060
	LockMethod	lockMethodTable = LockMethods[lockmethodid];
1061 1062 1063
	const char *old_status;
	char	   *new_status;
	int			len;
1064

1065 1066
	LOCK_PRINT("WaitOnLock: sleeping on lock",
			   locallock->lock, locallock->tag.mode);
1067

1068
	old_status = get_ps_display(&len);
1069 1070 1071
	new_status = (char *) palloc(len + 8 + 1);
	memcpy(new_status, old_status, len);
	strcpy(new_status + len, " waiting");
1072
	set_ps_display(new_status);
1073
	new_status[len] = '\0';		/* truncate off " waiting" */
1074

1075 1076 1077
	awaitedLock = locallock;
	awaitedOwner = owner;

B
Bruce Momjian 已提交
1078
	/*
1079
	 * NOTE: Think not to put any shared-state cleanup after the call to
B
Bruce Momjian 已提交
1080 1081 1082 1083 1084 1085 1086 1087 1088
	 * ProcSleep, in either the normal or failure path.  The lock state must
	 * be fully set by the lock grantor, or by CheckDeadLock if we give up
	 * waiting for the lock.  This is necessary because of the possibility
	 * that a cancel/die interrupt will interrupt ProcSleep after someone else
	 * grants us the lock, but before we've noticed it. Hence, after granting,
	 * the locktable state must fully reflect the fact that we own the lock;
	 * we can't do additional work on return. Contrariwise, if we fail, any
	 * cleanup must happen in xact abort processing, not here, to ensure it
	 * will also happen in the cancel/die case.
1089 1090
	 */

1091
	if (ProcSleep(locallock, lockMethodTable) != STATUS_OK)
1092
	{
B
Bruce Momjian 已提交
1093
		/*
B
Bruce Momjian 已提交
1094
		 * We failed as a result of a deadlock, see CheckDeadLock(). Quit now.
1095
		 */
1096 1097 1098
		awaitedLock = NULL;
		LOCK_PRINT("WaitOnLock: aborting on lock",
				   locallock->lock, locallock->tag.mode);
1099
		LWLockRelease(FirstLockMgrLock + locallock->partition);
B
Bruce Momjian 已提交
1100

1101
		/*
1102
		 * Now that we aren't holding the partition lock, we can give an error
B
Bruce Momjian 已提交
1103
		 * report including details about the detected deadlock.
1104 1105
		 */
		DeadLockReport();
M
 
Marc G. Fournier 已提交
1106
		/* not reached */
1107
	}
1108

1109 1110
	awaitedLock = NULL;

1111
	set_ps_display(new_status);
1112 1113
	pfree(new_status);

1114 1115
	LOCK_PRINT("WaitOnLock: wakeup on lock",
			   locallock->lock, locallock->tag.mode);
1116 1117
}

1118
/*
1119 1120 1121
 * Remove a proc from the wait-queue it is on
 * (caller must know it is on one).
 *
1122
 * Appropriate partition lock must be held by caller.
1123
 *
1124
 * NB: this does not clean up any locallock object that may exist for the lock.
1125 1126
 */
void
1127
RemoveFromWaitQueue(PGPROC *proc, int partition)
1128
{
B
Bruce Momjian 已提交
1129
	LOCK	   *waitLock = proc->waitLock;
1130
	PROCLOCK   *proclock = proc->waitProcLock;
B
Bruce Momjian 已提交
1131
	LOCKMODE	lockmode = proc->waitLockMode;
1132
	LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*waitLock);
1133 1134 1135 1136 1137

	/* Make sure proc is waiting */
	Assert(proc->links.next != INVALID_OFFSET);
	Assert(waitLock);
	Assert(waitLock->waitProcs.size > 0);
1138
	Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151

	/* Remove proc from lock's wait queue */
	SHMQueueDelete(&(proc->links));
	waitLock->waitProcs.size--;

	/* Undo increments of request counts by waiting process */
	Assert(waitLock->nRequested > 0);
	Assert(waitLock->nRequested > proc->waitLock->nGranted);
	waitLock->nRequested--;
	Assert(waitLock->requested[lockmode] > 0);
	waitLock->requested[lockmode]--;
	/* don't forget to clear waitMask bit if appropriate */
	if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
1152
		waitLock->waitMask &= LOCKBIT_OFF(lockmode);
1153 1154 1155

	/* Clean up the proc's own state */
	proc->waitLock = NULL;
1156
	proc->waitProcLock = NULL;
1157

1158 1159
	/*
	 * Delete the proclock immediately if it represents no already-held locks.
1160 1161
	 * (This must happen now because if the owner of the lock decides to
	 * release it, and the requested/granted counts then go to zero,
B
Bruce Momjian 已提交
1162 1163
	 * LockRelease expects there to be no remaining proclocks.) Then see if
	 * any other waiters for the lock can be woken up now.
1164
	 */
1165 1166 1167
	CleanUpLock(waitLock, proclock,
				LockMethods[lockmethodid], partition,
				true);
1168 1169
}

1170
/*
1171 1172 1173
 * LockRelease -- look up 'locktag' and release one 'lockmode' lock on it.
 *		Release a session lock if 'sessionLock' is true, else release a
 *		regular transaction lock.
1174
 *
1175 1176 1177 1178
 * Side Effects: find any waiting processes that are now wakable,
 *		grant them their requested locks and awaken them.
 *		(We have to grant the lock here to avoid a race between
 *		the waking process and any new process to
1179
 *		come along and request the lock.)
1180 1181
 */
bool
1182
LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
1183
{
1184 1185
	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
	LockMethod	lockMethodTable;
1186 1187
	LOCALLOCKTAG localtag;
	LOCALLOCK  *locallock;
1188
	LOCK	   *lock;
1189
	PROCLOCK   *proclock;
1190 1191
	int			partition;
	LWLockId	partitionLock;
1192
	bool		wakeupNeeded;
1193

1194 1195 1196 1197 1198 1199
	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
	lockMethodTable = LockMethods[lockmethodid];
	if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
		elog(ERROR, "unrecognized lock mode: %d", lockmode);

1200
#ifdef LOCK_DEBUG
1201 1202
	if (LOCK_DEBUG_ENABLED(locktag))
		elog(LOG, "LockRelease: lock [%u,%u] %s",
1203
			 locktag->locktag_field1, locktag->locktag_field2,
1204
			 lockMethodTable->lockModeNames[lockmode]);
1205 1206
#endif

1207
	/*
1208
	 * Find the LOCALLOCK entry for this lock and lockmode
1209
	 */
B
Bruce Momjian 已提交
1210
	MemSet(&localtag, 0, sizeof(localtag));		/* must clear padding */
1211 1212 1213
	localtag.lock = *locktag;
	localtag.mode = lockmode;

1214
	locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
1215 1216
										  (void *) &localtag,
										  HASH_FIND, NULL);
1217

1218
	/*
B
Bruce Momjian 已提交
1219
	 * let the caller print its own error message, too. Do not ereport(ERROR).
1220
	 */
1221
	if (!locallock || locallock->nLocks <= 0)
1222
	{
1223
		elog(WARNING, "you don't own a lock of type %s",
1224
			 lockMethodTable->lockModeNames[lockmode]);
1225
		return FALSE;
1226
	}
1227

M
 
Marc G. Fournier 已提交
1228
	/*
1229
	 * Decrease the count for the resource owner.
M
 
Marc G. Fournier 已提交
1230
	 */
1231
	{
1232 1233
		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
		ResourceOwner owner;
B
Bruce Momjian 已提交
1234
		int			i;
1235

1236 1237
		/* Session locks are never transactional, else check table */
		if (!sessionLock && lockMethodTable->transactional)
1238
			owner = CurrentResourceOwner;
1239
		else
1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260
			owner = NULL;

		for (i = locallock->numLockOwners - 1; i >= 0; i--)
		{
			if (lockOwners[i].owner == owner)
			{
				Assert(lockOwners[i].nLocks > 0);
				if (--lockOwners[i].nLocks == 0)
				{
					/* compact out unused slot */
					locallock->numLockOwners--;
					if (i < locallock->numLockOwners)
						lockOwners[i] = lockOwners[locallock->numLockOwners];
				}
				break;
			}
		}
		if (i < 0)
		{
			/* don't release a lock belonging to another owner */
			elog(WARNING, "you don't own a lock of type %s",
1261
				 lockMethodTable->lockModeNames[lockmode]);
1262 1263
			return FALSE;
		}
1264
	}
1265 1266

	/*
B
Bruce Momjian 已提交
1267 1268
	 * Decrease the total local count.	If we're still holding the lock, we're
	 * done.
1269 1270 1271 1272 1273 1274 1275 1276 1277
	 */
	locallock->nLocks--;

	if (locallock->nLocks > 0)
		return TRUE;

	/*
	 * Otherwise we've got to mess with the shared lock table.
	 */
1278 1279
	partition = locallock->partition;
	partitionLock = FirstLockMgrLock + partition;
1280

1281
	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
1282 1283 1284

	/*
	 * We don't need to re-find the lock or proclock, since we kept their
B
Bruce Momjian 已提交
1285 1286
	 * addresses in the locallock table, and they couldn't have been removed
	 * while we were holding a lock on them.
1287 1288 1289 1290
	 */
	lock = locallock->lock;
	LOCK_PRINT("LockRelease: found", lock, lockmode);
	proclock = locallock->proclock;
1291
	PROCLOCK_PRINT("LockRelease: found", proclock);
M
 
Marc G. Fournier 已提交
1292 1293

	/*
B
Bruce Momjian 已提交
1294 1295
	 * Double-check that we are actually holding a lock of the type we want to
	 * release.
M
 
Marc G. Fournier 已提交
1296
	 */
1297
	if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
1298
	{
1299
		PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
1300
		LWLockRelease(partitionLock);
1301
		elog(WARNING, "you don't own a lock of type %s",
1302
			 lockMethodTable->lockModeNames[lockmode]);
1303
		RemoveLocalLock(locallock);
1304
		return FALSE;
M
 
Marc G. Fournier 已提交
1305 1306
	}

1307
	/*
1308
	 * Do the releasing.  CleanUpLock will waken any now-wakable waiters.
1309
	 */
1310
	wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
1311

1312 1313 1314
	CleanUpLock(lock, proclock,
				lockMethodTable, partition,
				wakeupNeeded);
1315

1316
	LWLockRelease(partitionLock);
1317 1318

	RemoveLocalLock(locallock);
1319
	return TRUE;
1320 1321
}

1322
/*
1323
 * LockReleaseAll -- Release all locks of the specified lock method that
1324
 *		are held by the current process.
1325
 *
1326
 * Well, not necessarily *all* locks.  The available behaviors are:
1327 1328
 *		allLocks == true: release all locks including session locks.
 *		allLocks == false: release all non-session locks.
1329
 */
1330
void
1331
LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
1332
{
1333
	HASH_SEQ_STATUS status;
1334
	LockMethod	lockMethodTable;
1335
	int			i,
1336
				numLockModes;
B
Bruce Momjian 已提交
1337
	LOCALLOCK  *locallock;
1338
	LOCK	   *lock;
1339 1340
	PROCLOCK   *proclock;
	int			partition;
1341

1342 1343 1344 1345
	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
	lockMethodTable = LockMethods[lockmethodid];

1346
#ifdef LOCK_DEBUG
1347
	if (*(lockMethodTable->trace_flag))
1348
		elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid);
1349 1350
#endif

B
Bruce Momjian 已提交
1351
	numLockModes = lockMethodTable->numLockModes;
M
 
Marc G. Fournier 已提交
1352

1353 1354
	/*
	 * First we run through the locallock table and get rid of unwanted
B
Bruce Momjian 已提交
1355 1356 1357 1358
	 * entries, then we scan the process's proclocks and get rid of those. We
	 * do this separately because we may have multiple locallock entries
	 * pointing to the same proclock, and we daren't end up with any dangling
	 * pointers.
1359
	 */
1360
	hash_seq_init(&status, LockMethodLocalHash);
1361 1362 1363 1364 1365 1366

	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
	{
		if (locallock->proclock == NULL || locallock->lock == NULL)
		{
			/*
B
Bruce Momjian 已提交
1367 1368
			 * We must've run out of shared memory while trying to set up this
			 * lock.  Just forget the local entry.
1369 1370 1371 1372 1373 1374 1375 1376 1377 1378
			 */
			Assert(locallock->nLocks == 0);
			RemoveLocalLock(locallock);
			continue;
		}

		/* Ignore items that are not of the lockmethod to be removed */
		if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
			continue;

B
Bruce Momjian 已提交
1379
		/*
B
Bruce Momjian 已提交
1380 1381 1382
		 * If we are asked to release all locks, we can just zap the entry.
		 * Otherwise, must scan to see if there are session locks. We assume
		 * there is at most one lockOwners entry for session locks.
B
Bruce Momjian 已提交
1383
		 */
1384 1385 1386
		if (!allLocks)
		{
			LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
1387

1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414
			/* If it's above array position 0, move it down to 0 */
			for (i = locallock->numLockOwners - 1; i > 0; i--)
			{
				if (lockOwners[i].owner == NULL)
				{
					lockOwners[0] = lockOwners[i];
					break;
				}
			}

			if (locallock->numLockOwners > 0 &&
				lockOwners[0].owner == NULL &&
				lockOwners[0].nLocks > 0)
			{
				/* Fix the locallock to show just the session locks */
				locallock->nLocks = lockOwners[0].nLocks;
				locallock->numLockOwners = 1;
				/* We aren't deleting this locallock, so done */
				continue;
			}
		}

		/* Mark the proclock to show we need to release this lockmode */
		if (locallock->nLocks > 0)
			locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);

		/* And remove the locallock hashtable entry */
1415 1416 1417
		RemoveLocalLock(locallock);
	}

1418 1419 1420 1421 1422 1423 1424
	/*
	 * Now, scan each lock partition separately.
	 */
	for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
	{
		LWLockId	partitionLock = FirstLockMgrLock + partition;
		SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
1425

1426 1427
		proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
											 offsetof(PROCLOCK, procLink));
1428

1429 1430
		if (!proclock)
			continue;			/* needn't examine this partition */
1431

1432
		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
1433

1434 1435 1436 1437
		while (proclock)
		{
			bool		wakeupNeeded = false;
			PROCLOCK   *nextplock;
1438

1439 1440 1441 1442
			/* Get link first, since we may unlink/delete this proclock */
			nextplock = (PROCLOCK *)
				SHMQueueNext(procLocks, &proclock->procLink,
							 offsetof(PROCLOCK, procLink));
1443

1444
			Assert(proclock->tag.proc == MAKE_OFFSET(MyProc));
1445

1446
			lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
1447

1448 1449 1450
			/* Ignore items that are not of the lockmethod to be removed */
			if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
				goto next_item;
M
 
Marc G. Fournier 已提交
1451

1452 1453 1454 1455 1456 1457 1458 1459
			/*
			 * In allLocks mode, force release of all locks even if locallock
			 * table had problems
			 */
			if (allLocks)
				proclock->releaseMask = proclock->holdMask;
			else
				Assert((proclock->releaseMask & ~proclock->holdMask) == 0);
1460

1461 1462 1463 1464 1465 1466
			/*
			 * Ignore items that have nothing to be released, unless they have
			 * holdMask == 0 and are therefore recyclable
			 */
			if (proclock->releaseMask == 0 && proclock->holdMask != 0)
				goto next_item;
M
 
Marc G. Fournier 已提交
1467

1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486
			PROCLOCK_PRINT("LockReleaseAll", proclock);
			LOCK_PRINT("LockReleaseAll", lock, 0);
			Assert(lock->nRequested >= 0);
			Assert(lock->nGranted >= 0);
			Assert(lock->nGranted <= lock->nRequested);
			Assert((proclock->holdMask & ~lock->grantMask) == 0);

			/*
			 * Release the previously-marked lock modes
			 */
			for (i = 1; i <= numLockModes; i++)
			{
				if (proclock->releaseMask & LOCKBIT_ON(i))
					wakeupNeeded |= UnGrantLock(lock, i, proclock,
												lockMethodTable);
			}
			Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
			Assert(lock->nGranted <= lock->nRequested);
			LOCK_PRINT("LockReleaseAll: updated", lock, 0);
1487

1488
			proclock->releaseMask = 0;
1489

1490 1491 1492 1493
			/* CleanUpLock will wake up waiters if needed. */
			CleanUpLock(lock, proclock,
						lockMethodTable, partition,
						wakeupNeeded);
M
 
Marc G. Fournier 已提交
1494

1495 1496 1497 1498 1499 1500
		next_item:
			proclock = nextplock;
		} /* loop over PROCLOCKs within this partition */

		LWLockRelease(partitionLock);
	} /* loop over partitions */
1501

1502
#ifdef LOCK_DEBUG
1503
	if (*(lockMethodTable->trace_flag))
1504
		elog(LOG, "LockReleaseAll done");
1505
#endif
1506 1507
}

1508 1509 1510 1511 1512 1513 1514 1515
/*
 * LockReleaseCurrentOwner
 *		Release all locks belonging to CurrentResourceOwner
 */
void
LockReleaseCurrentOwner(void)
{
	HASH_SEQ_STATUS status;
B
Bruce Momjian 已提交
1516
	LOCALLOCK  *locallock;
1517 1518 1519
	LOCALLOCKOWNER *lockOwners;
	int			i;

1520
	hash_seq_init(&status, LockMethodLocalHash);
1521 1522 1523 1524

	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
	{
		/* Ignore items that must be nontransactional */
1525
		if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional)
1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552
			continue;

		/* Scan to see if there are any locks belonging to current owner */
		lockOwners = locallock->lockOwners;
		for (i = locallock->numLockOwners - 1; i >= 0; i--)
		{
			if (lockOwners[i].owner == CurrentResourceOwner)
			{
				Assert(lockOwners[i].nLocks > 0);
				if (lockOwners[i].nLocks < locallock->nLocks)
				{
					/*
					 * We will still hold this lock after forgetting this
					 * ResourceOwner.
					 */
					locallock->nLocks -= lockOwners[i].nLocks;
					/* compact out unused slot */
					locallock->numLockOwners--;
					if (i < locallock->numLockOwners)
						lockOwners[i] = lockOwners[locallock->numLockOwners];
				}
				else
				{
					Assert(lockOwners[i].nLocks == locallock->nLocks);
					/* We want to call LockRelease just once */
					lockOwners[i].nLocks = 1;
					locallock->nLocks = 1;
1553
					if (!LockRelease(&locallock->tag.lock,
1554 1555
									 locallock->tag.mode,
									 false))
1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573
						elog(WARNING, "LockReleaseCurrentOwner: failed??");
				}
				break;
			}
		}
	}
}

/*
 * LockReassignCurrentOwner
 *		Reassign all locks belonging to CurrentResourceOwner to belong
 *		to its parent resource owner
 */
void
LockReassignCurrentOwner(void)
{
	ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
	HASH_SEQ_STATUS status;
B
Bruce Momjian 已提交
1574
	LOCALLOCK  *locallock;
1575 1576 1577 1578
	LOCALLOCKOWNER *lockOwners;

	Assert(parent != NULL);

1579
	hash_seq_init(&status, LockMethodLocalHash);
1580 1581 1582 1583 1584 1585 1586 1587

	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
	{
		int			i;
		int			ic = -1;
		int			ip = -1;

		/* Ignore items that must be nontransactional */
1588
		if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional)
1589 1590 1591
			continue;

		/*
B
Bruce Momjian 已提交
1592 1593
		 * Scan to see if there are any locks belonging to current owner or
		 * its parent
1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624
		 */
		lockOwners = locallock->lockOwners;
		for (i = locallock->numLockOwners - 1; i >= 0; i--)
		{
			if (lockOwners[i].owner == CurrentResourceOwner)
				ic = i;
			else if (lockOwners[i].owner == parent)
				ip = i;
		}

		if (ic < 0)
			continue;			/* no current locks */

		if (ip < 0)
		{
			/* Parent has no slot, so just give it child's slot */
			lockOwners[ic].owner = parent;
		}
		else
		{
			/* Merge child's count with parent's */
			lockOwners[ip].nLocks += lockOwners[ic].nLocks;
			/* compact out unused slot */
			locallock->numLockOwners--;
			if (ic < locallock->numLockOwners)
				lockOwners[ic] = lockOwners[locallock->numLockOwners];
		}
	}
}


1625 1626 1627 1628 1629
/*
 * AtPrepare_Locks
 *		Do the preparatory work for a PREPARE: make 2PC state file records
 *		for all locks currently held.
 *
1630
 * Non-transactional locks are ignored.
1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647
 *
 * There are some special cases that we error out on: we can't be holding
 * any session locks (should be OK since only VACUUM uses those) and we
 * can't be holding any locks on temporary objects (since that would mess
 * up the current backend if it tries to exit before the prepared xact is
 * committed).
 */
void
AtPrepare_Locks(void)
{
	HASH_SEQ_STATUS status;
	LOCALLOCK  *locallock;

	/*
	 * We don't need to touch shared memory for this --- all the necessary
	 * state information is in the locallock table.
	 */
1648
	hash_seq_init(&status, LockMethodLocalHash);
1649 1650 1651 1652 1653

	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
	{
		TwoPhaseLockRecord record;
		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
B
Bruce Momjian 已提交
1654
		int			i;
1655

1656 1657
		/* Ignore nontransactional locks */
		if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional)
1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709
			continue;

		/* Ignore it if we don't actually hold the lock */
		if (locallock->nLocks <= 0)
			continue;

		/* Scan to verify there are no session locks */
		for (i = locallock->numLockOwners - 1; i >= 0; i--)
		{
			/* elog not ereport since this should not happen */
			if (lockOwners[i].owner == NULL)
				elog(ERROR, "cannot PREPARE when session locks exist");
		}

		/* Can't handle it if the lock is on a temporary object */
		if (locallock->isTempObject)
			ereport(ERROR,
					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
					 errmsg("cannot PREPARE a transaction that has operated on temporary tables")));

		/*
		 * Create a 2PC record.
		 */
		memcpy(&(record.locktag), &(locallock->tag.lock), sizeof(LOCKTAG));
		record.lockmode = locallock->tag.mode;

		RegisterTwoPhaseRecord(TWOPHASE_RM_LOCK_ID, 0,
							   &record, sizeof(TwoPhaseLockRecord));
	}
}

/*
 * PostPrepare_Locks
 *		Clean up after successful PREPARE
 *
 * Here, we want to transfer ownership of our locks to a dummy PGPROC
 * that's now associated with the prepared transaction, and we want to
 * clean out the corresponding entries in the LOCALLOCK table.
 *
 * Note: by removing the LOCALLOCK entries, we are leaving dangling
 * pointers in the transaction's resource owner.  This is OK at the
 * moment since resowner.c doesn't try to free locks retail at a toplevel
 * transaction commit or abort.  We could alternatively zero out nLocks
 * and leave the LOCALLOCK entries to be garbage-collected by LockReleaseAll,
 * but that probably costs more cycles.
 */
void
PostPrepare_Locks(TransactionId xid)
{
	PGPROC	   *newproc = TwoPhaseGetDummyProc(xid);
	HASH_SEQ_STATUS status;
	LOCALLOCK  *locallock;
1710
	LOCK	   *lock;
1711 1712 1713
	PROCLOCK   *proclock;
	PROCLOCKTAG proclocktag;
	bool		found;
1714
	int			partition;
1715 1716 1717 1718 1719 1720

	/* This is a critical section: any error means big trouble */
	START_CRIT_SECTION();

	/*
	 * First we run through the locallock table and get rid of unwanted
B
Bruce Momjian 已提交
1721 1722
	 * entries, then we scan the process's proclocks and transfer them to the
	 * target proc.
1723
	 *
B
Bruce Momjian 已提交
1724 1725 1726
	 * We do this separately because we may have multiple locallock entries
	 * pointing to the same proclock, and we daren't end up with any dangling
	 * pointers.
1727
	 */
1728
	hash_seq_init(&status, LockMethodLocalHash);
1729 1730 1731 1732 1733 1734

	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
	{
		if (locallock->proclock == NULL || locallock->lock == NULL)
		{
			/*
B
Bruce Momjian 已提交
1735 1736
			 * We must've run out of shared memory while trying to set up this
			 * lock.  Just forget the local entry.
1737 1738 1739 1740 1741 1742
			 */
			Assert(locallock->nLocks == 0);
			RemoveLocalLock(locallock);
			continue;
		}

1743 1744
		/* Ignore nontransactional locks */
		if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional)
1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756
			continue;

		/* We already checked there are no session locks */

		/* Mark the proclock to show we need to release this lockmode */
		if (locallock->nLocks > 0)
			locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);

		/* And remove the locallock hashtable entry */
		RemoveLocalLock(locallock);
	}

1757 1758 1759 1760 1761 1762 1763
	/*
	 * Now, scan each lock partition separately.
	 */
	for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
	{
		LWLockId	partitionLock = FirstLockMgrLock + partition;
		SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
1764

1765 1766
		proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
											 offsetof(PROCLOCK, procLink));
1767

1768 1769
		if (!proclock)
			continue;			/* needn't examine this partition */
1770

1771
		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
1772

1773 1774 1775 1776 1777
		while (proclock)
		{
			PROCLOCK   *nextplock;
			LOCKMASK	holdMask;
			PROCLOCK   *newproclock;
1778

1779 1780 1781 1782
			/* Get link first, since we may unlink/delete this proclock */
			nextplock = (PROCLOCK *)
				SHMQueueNext(procLocks, &proclock->procLink,
							 offsetof(PROCLOCK, procLink));
1783

1784
			Assert(proclock->tag.proc == MAKE_OFFSET(MyProc));
1785

1786
			lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
1787

1788 1789 1790
			/* Ignore nontransactional locks */
			if (!LockMethods[LOCK_LOCKMETHOD(*lock)]->transactional)
				goto next_item;
1791

1792 1793 1794 1795 1796 1797
			PROCLOCK_PRINT("PostPrepare_Locks", proclock);
			LOCK_PRINT("PostPrepare_Locks", lock, 0);
			Assert(lock->nRequested >= 0);
			Assert(lock->nGranted >= 0);
			Assert(lock->nGranted <= lock->nRequested);
			Assert((proclock->holdMask & ~lock->grantMask) == 0);
1798

1799 1800 1801 1802 1803 1804
			/*
			 * Since there were no session locks, we should be releasing all
			 * locks
			 */
			if (proclock->releaseMask != proclock->holdMask)
				elog(PANIC, "we seem to have dropped a bit somewhere");
1805

1806
			holdMask = proclock->holdMask;
1807

1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823
			/*
			 * We cannot simply modify proclock->tag.proc to reassign
			 * ownership of the lock, because that's part of the hash key and
			 * the proclock would then be in the wrong hash chain.  So, unlink
			 * and delete the old proclock; create a new one with the right
			 * contents; and link it into place.  We do it in this order to be
			 * certain we won't run out of shared memory (the way dynahash.c
			 * works, the deleted object is certain to be available for
			 * reallocation).
			 */
			SHMQueueDelete(&proclock->lockLink);
			SHMQueueDelete(&proclock->procLink);
			if (!hash_search(LockMethodProcLockHash[partition],
							 (void *) &(proclock->tag),
							 HASH_REMOVE, NULL))
				elog(PANIC, "proclock table corrupted");
1824

1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839
			/*
			 * Create the hash key for the new proclock table.
			 */
			MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG));
			proclocktag.lock = MAKE_OFFSET(lock);
			proclocktag.proc = MAKE_OFFSET(newproc);

			newproclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[partition],
												   (void *) &proclocktag,
												   HASH_ENTER_NULL, &found);
			if (!newproclock)
				ereport(PANIC,		/* should not happen */
						(errcode(ERRCODE_OUT_OF_MEMORY),
						 errmsg("out of shared memory"),
						 errdetail("Not enough memory for reassigning the prepared transaction's locks.")));
1840

1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868
			/*
			 * If new, initialize the new entry
			 */
			if (!found)
			{
				newproclock->holdMask = 0;
				newproclock->releaseMask = 0;
				/* Add new proclock to appropriate lists */
				SHMQueueInsertBefore(&lock->procLocks, &newproclock->lockLink);
				SHMQueueInsertBefore(&(newproc->myProcLocks[partition]),
									 &newproclock->procLink);
				PROCLOCK_PRINT("PostPrepare_Locks: new", newproclock);
			}
			else
			{
				PROCLOCK_PRINT("PostPrepare_Locks: found", newproclock);
				Assert((newproclock->holdMask & ~lock->grantMask) == 0);
			}

			/*
			 * Pass over the identified lock ownership.
			 */
			Assert((newproclock->holdMask & holdMask) == 0);
			newproclock->holdMask |= holdMask;

		next_item:
			proclock = nextplock;
		} /* loop over PROCLOCKs within this partition */
1869

1870 1871
		LWLockRelease(partitionLock);
	} /* loop over partitions */
1872 1873 1874 1875 1876

	END_CRIT_SECTION();
}


1877 1878 1879
/*
 * Estimate shared-memory space used for lock tables
 */
1880
Size
1881
LockShmemSize(void)
1882
{
1883
	Size		size = 0;
1884 1885
	Size		tabsize;
	long		max_table_size;
1886

1887 1888 1889 1890 1891
	/* lock hash tables */
	max_table_size = NLOCKENTS();
	max_table_size = (max_table_size - 1) / NUM_LOCK_PARTITIONS + 1;
	tabsize = hash_estimate_size(max_table_size, sizeof(LOCK));
	size = add_size(size, mul_size(tabsize, NUM_LOCK_PARTITIONS));
1892

1893 1894 1895 1896
	/* proclock hash tables */
	max_table_size *= 2;
	tabsize = hash_estimate_size(max_table_size, sizeof(PROCLOCK));
	size = add_size(size, mul_size(tabsize, NUM_LOCK_PARTITIONS));
1897

B
Bruce Momjian 已提交
1898
	/*
1899 1900
	 * Since there is likely to be some space wastage due to uneven use
	 * of the partitions, add 10% safety margin.
1901
	 */
1902
	size = add_size(size, size / 10);
1903 1904

	return size;
1905 1906
}

1907 1908
/*
 * GetLockStatusData - Return a summary of the lock manager's internal
1909
 * status, for use in a user-level reporting function.
1910
 *
1911 1912 1913 1914 1915
 * The return data consists of an array of PROCLOCK objects, with the
 * associated PGPROC and LOCK objects for each.  Note that multiple
 * copies of the same PGPROC and/or LOCK objects are likely to appear.
 * It is the caller's responsibility to match up duplicates if wanted.
 *
1916
 * The design goal is to hold the LWLocks for as short a time as possible;
1917
 * thus, this function simply makes a copy of the necessary data and releases
1918
 * the locks, allowing the caller to contemplate and format the data for as
1919
 * long as it pleases.
1920
 */
1921 1922
LockData *
GetLockStatusData(void)
1923
{
B
Bruce Momjian 已提交
1924
	LockData   *data;
1925 1926
	HTAB	   *proclockTable;
	PROCLOCK   *proclock;
1927
	HASH_SEQ_STATUS seqstat;
1928 1929
	int			els;
	int			el;
B
Bruce Momjian 已提交
1930
	int			i;
1931

1932
	data = (LockData *) palloc(sizeof(LockData));
1933

1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954
	/*
	 * Acquire lock on the entire shared lock data structures.  We can't
	 * operate one partition at a time if we want to deliver a self-consistent
	 * view of the state.
	 *
	 * Since this is a read-only operation, we take shared instead of exclusive
	 * lock.  There's not a whole lot of point to this, because all the normal
	 * operations require exclusive lock, but it doesn't hurt anything either.
	 * It will at least allow two backends to do GetLockStatusData in parallel.
	 *
	 * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
	 *
	 * Use same loop to count up the total number of PROCLOCK objects.
	 */
	els = 0;
	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
	{
		LWLockAcquire(FirstLockMgrLock + i, LW_SHARED);
		proclockTable = LockMethodProcLockHash[i];
		els += proclockTable->hctl->nentries;
	}
1955

1956 1957 1958 1959 1960
	data->nelements = els;
	data->proclockaddrs = (SHMEM_OFFSET *) palloc(sizeof(SHMEM_OFFSET) * els);
	data->proclocks = (PROCLOCK *) palloc(sizeof(PROCLOCK) * els);
	data->procs = (PGPROC *) palloc(sizeof(PGPROC) * els);
	data->locks = (LOCK *) palloc(sizeof(LOCK) * els);
1961

1962
	el = 0;
1963

1964 1965
	/* Now scan the tables to copy the data */
	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
1966
	{
1967 1968
		proclockTable = LockMethodProcLockHash[i];
		hash_seq_init(&seqstat, proclockTable);
1969

1970 1971 1972 1973 1974 1975 1976 1977 1978
		while ((proclock = hash_seq_search(&seqstat)))
		{
			PGPROC	   *proc = (PGPROC *) MAKE_PTR(proclock->tag.proc);
			LOCK	   *lock = (LOCK *) MAKE_PTR(proclock->tag.lock);

			data->proclockaddrs[el] = MAKE_OFFSET(proclock);
			memcpy(&(data->proclocks[el]), proclock, sizeof(PROCLOCK));
			memcpy(&(data->procs[el]), proc, sizeof(PGPROC));
			memcpy(&(data->locks[el]), lock, sizeof(LOCK));
1979

1980 1981
			el++;
		}
1982 1983
	}

1984 1985 1986
	/* And release locks */
	for (i = NUM_LOCK_PARTITIONS; --i >= 0; )
		LWLockRelease(FirstLockMgrLock + i);
1987

1988
	Assert(el == data->nelements);
1989

1990
	return data;
1991 1992
}

1993 1994
/* Provide the textual name of any lock mode */
const char *
1995
GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode)
1996
{
1997 1998 1999
	Assert(lockmethodid > 0 && lockmethodid < lengthof(LockMethods));
	Assert(mode > 0 && mode <= LockMethods[lockmethodid]->numLockModes);
	return LockMethods[lockmethodid]->lockModeNames[mode];
2000
}
B
Bruce Momjian 已提交
2001

2002
#ifdef LOCK_DEBUG
2003
/*
2004
 * Dump all locks in the given proc's myProcLocks lists.
2005
 *
2006
 * Caller is responsible for having acquired appropriate LWLocks.
2007 2008
 */
void
2009
DumpLocks(PGPROC *proc)
2010
{
2011
	SHM_QUEUE  *procLocks;
2012
	PROCLOCK   *proclock;
2013
	LOCK	   *lock;
2014
	int			i;
2015

2016
	if (proc == NULL)
2017
		return;
2018

2019 2020 2021
	if (proc->waitLock)
		LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);

2022
	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
2023
	{
2024
		procLocks = &(proc->myProcLocks[i]);
2025

2026 2027
		proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
											 offsetof(PROCLOCK, procLink));
M
 
Marc G. Fournier 已提交
2028

2029 2030 2031
		while (proclock)
		{
			Assert(proclock->tag.proc == MAKE_OFFSET(proc));
2032

2033 2034 2035 2036 2037 2038 2039 2040 2041
			lock = (LOCK *) MAKE_PTR(proclock->tag.lock);

			PROCLOCK_PRINT("DumpLocks", proclock);
			LOCK_PRINT("DumpLocks", lock, 0);

			proclock = (PROCLOCK *)
				SHMQueueNext(procLocks, &proclock->procLink,
							 offsetof(PROCLOCK, procLink));
		}
2042
	}
2043
}
2044

M
 
Marc G. Fournier 已提交
2045
/*
2046 2047 2048
 * Dump all lmgr locks.
 *
 * Caller is responsible for having acquired appropriate LWLocks.
M
 
Marc G. Fournier 已提交
2049 2050
 */
void
2051
DumpAllLocks(void)
M
 
Marc G. Fournier 已提交
2052
{
J
Jan Wieck 已提交
2053
	PGPROC	   *proc;
2054
	PROCLOCK   *proclock;
2055
	LOCK	   *lock;
2056
	HTAB	   *proclockTable;
2057
	HASH_SEQ_STATUS status;
2058
	int			i;
M
 
Marc G. Fournier 已提交
2059

2060
	proc = MyProc;
M
 
Marc G. Fournier 已提交
2061

2062
	if (proc && proc->waitLock)
2063
		LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
M
 
Marc G. Fournier 已提交
2064

2065
	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
2066
	{
2067 2068
		proclockTable = LockMethodProcLockHash[i];
		hash_seq_init(&status, proclockTable);
M
 
Marc G. Fournier 已提交
2069

2070
		while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL)
2071
		{
2072 2073 2074 2075 2076 2077 2078 2079 2080
			PROCLOCK_PRINT("DumpAllLocks", proclock);

			if (proclock->tag.lock)
			{
				lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
				LOCK_PRINT("DumpAllLocks", lock, 0);
			}
			else
				elog(LOG, "DumpAllLocks: proclock->tag.lock = NULL");
M
 
Marc G. Fournier 已提交
2081 2082 2083
		}
	}
}
2084
#endif   /* LOCK_DEBUG */
2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102

/*
 * LOCK 2PC resource manager's routines
 */

/*
 * Re-acquire a lock belonging to a transaction that was prepared.
 *
 * Because this function is run at db startup, re-acquiring the locks should
 * never conflict with running transactions because there are none.  We
 * assume that the lock state represented by the stored 2PC files is legal.
 */
void
lock_twophase_recover(TransactionId xid, uint16 info,
					  void *recdata, uint32 len)
{
	TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
	PGPROC	   *proc = TwoPhaseGetDummyProc(xid);
B
Bruce Momjian 已提交
2103
	LOCKTAG    *locktag;
2104 2105 2106 2107 2108 2109
	LOCKMODE	lockmode;
	LOCKMETHODID lockmethodid;
	LOCK	   *lock;
	PROCLOCK   *proclock;
	PROCLOCKTAG proclocktag;
	bool		found;
2110 2111
	int			partition;
	LWLockId	partitionLock;
2112 2113 2114 2115 2116 2117 2118
	LockMethod	lockMethodTable;

	Assert(len == sizeof(TwoPhaseLockRecord));
	locktag = &rec->locktag;
	lockmode = rec->lockmode;
	lockmethodid = locktag->locktag_lockmethodid;

2119
	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
2120
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
2121
	lockMethodTable = LockMethods[lockmethodid];
2122

2123 2124
	partition = LockTagToPartition(locktag);
	partitionLock = FirstLockMgrLock + partition;
2125

2126
	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2127 2128 2129 2130

	/*
	 * Find or create a lock with this tag.
	 */
2131
	lock = (LOCK *) hash_search(LockMethodLockHash[partition],
2132 2133 2134 2135
								(void *) locktag,
								HASH_ENTER_NULL, &found);
	if (!lock)
	{
2136
		LWLockRelease(partitionLock);
2137 2138 2139
		ereport(ERROR,
				(errcode(ERRCODE_OUT_OF_MEMORY),
				 errmsg("out of shared memory"),
B
Bruce Momjian 已提交
2140
			errhint("You may need to increase max_locks_per_transaction.")));
2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168
	}

	/*
	 * if it's a new lock object, initialize it
	 */
	if (!found)
	{
		lock->grantMask = 0;
		lock->waitMask = 0;
		SHMQueueInit(&(lock->procLocks));
		ProcQueueInit(&(lock->waitProcs));
		lock->nRequested = 0;
		lock->nGranted = 0;
		MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
		MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
		LOCK_PRINT("lock_twophase_recover: new", lock, lockmode);
	}
	else
	{
		LOCK_PRINT("lock_twophase_recover: found", lock, lockmode);
		Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
		Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
		Assert(lock->nGranted <= lock->nRequested);
	}

	/*
	 * Create the hash key for the proclock table.
	 */
B
Bruce Momjian 已提交
2169
	MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG));		/* must clear padding */
2170 2171 2172 2173 2174 2175
	proclocktag.lock = MAKE_OFFSET(lock);
	proclocktag.proc = MAKE_OFFSET(proc);

	/*
	 * Find or create a proclock entry with this tag
	 */
2176
	proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[partition],
2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190
										(void *) &proclocktag,
										HASH_ENTER_NULL, &found);
	if (!proclock)
	{
		/* Ooops, not enough shmem for the proclock */
		if (lock->nRequested == 0)
		{
			/*
			 * There are no other requestors of this lock, so garbage-collect
			 * the lock object.  We *must* do this to avoid a permanent leak
			 * of shared memory, because there won't be anything to cause
			 * anyone to release the lock object later.
			 */
			Assert(SHMQueueEmpty(&(lock->procLocks)));
2191
			if (!hash_search(LockMethodLockHash[partition],
2192 2193 2194 2195
							 (void *) &(lock->tag),
							 HASH_REMOVE, NULL))
				elog(PANIC, "lock table corrupted");
		}
2196
		LWLockRelease(partitionLock);
2197 2198 2199
		ereport(ERROR,
				(errcode(ERRCODE_OUT_OF_MEMORY),
				 errmsg("out of shared memory"),
B
Bruce Momjian 已提交
2200
			errhint("You may need to increase max_locks_per_transaction.")));
2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211
	}

	/*
	 * If new, initialize the new entry
	 */
	if (!found)
	{
		proclock->holdMask = 0;
		proclock->releaseMask = 0;
		/* Add proclock to appropriate lists */
		SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
2212 2213
		SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
							 &proclock->procLink);
2214 2215 2216 2217 2218 2219 2220 2221 2222 2223
		PROCLOCK_PRINT("lock_twophase_recover: new", proclock);
	}
	else
	{
		PROCLOCK_PRINT("lock_twophase_recover: found", proclock);
		Assert((proclock->holdMask & ~lock->grantMask) == 0);
	}

	/*
	 * lock->nRequested and lock->requested[] count the total number of
B
Bruce Momjian 已提交
2224
	 * requests, whether granted or waiting, so increment those immediately.
2225 2226 2227 2228 2229 2230 2231 2232 2233 2234
	 */
	lock->nRequested++;
	lock->requested[lockmode]++;
	Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));

	/*
	 * We shouldn't already hold the desired lock.
	 */
	if (proclock->holdMask & LOCKBIT_ON(lockmode))
		elog(ERROR, "lock %s on object %u/%u/%u is already held",
2235
			 lockMethodTable->lockModeNames[lockmode],
2236 2237 2238 2239 2240 2241 2242 2243
			 lock->tag.locktag_field1, lock->tag.locktag_field2,
			 lock->tag.locktag_field3);

	/*
	 * We ignore any possible conflicts and just grant ourselves the lock.
	 */
	GrantLock(lock, proclock, lockmode);

2244
	LWLockRelease(partitionLock);
2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257
}

/*
 * 2PC processing routine for COMMIT PREPARED case.
 *
 * Find and release the lock indicated by the 2PC record.
 */
void
lock_twophase_postcommit(TransactionId xid, uint16 info,
						 void *recdata, uint32 len)
{
	TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
	PGPROC	   *proc = TwoPhaseGetDummyProc(xid);
B
Bruce Momjian 已提交
2258
	LOCKTAG    *locktag;
2259 2260 2261 2262
	LOCKMODE	lockmode;
	LOCKMETHODID lockmethodid;
	LOCK	   *lock;
	PROCLOCK   *proclock;
2263 2264 2265
	PROCLOCKTAG proclocktag;
	int			partition;
	LWLockId	partitionLock;
2266 2267 2268 2269 2270 2271 2272 2273
	LockMethod	lockMethodTable;
	bool		wakeupNeeded;

	Assert(len == sizeof(TwoPhaseLockRecord));
	locktag = &rec->locktag;
	lockmode = rec->lockmode;
	lockmethodid = locktag->locktag_lockmethodid;

2274
	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
2275
		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
2276
	lockMethodTable = LockMethods[lockmethodid];
2277

2278 2279
	partition = LockTagToPartition(locktag);
	partitionLock = FirstLockMgrLock + partition;
2280

2281
	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2282 2283 2284 2285

	/*
	 * Re-find the lock object (it had better be there).
	 */
2286
	lock = (LOCK *) hash_search(LockMethodLockHash[partition],
2287 2288 2289 2290 2291 2292 2293 2294
								(void *) locktag,
								HASH_FIND, NULL);
	if (!lock)
		elog(PANIC, "failed to re-find shared lock object");

	/*
	 * Re-find the proclock object (ditto).
	 */
B
Bruce Momjian 已提交
2295
	MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG));		/* must clear padding */
2296 2297
	proclocktag.lock = MAKE_OFFSET(lock);
	proclocktag.proc = MAKE_OFFSET(proc);
2298
	proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[partition],
2299 2300 2301 2302 2303 2304
										(void *) &proclocktag,
										HASH_FIND, NULL);
	if (!proclock)
		elog(PANIC, "failed to re-find shared proclock object");

	/*
B
Bruce Momjian 已提交
2305 2306
	 * Double-check that we are actually holding a lock of the type we want to
	 * release.
2307 2308 2309 2310
	 */
	if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
	{
		PROCLOCK_PRINT("lock_twophase_postcommit: WRONGTYPE", proclock);
2311
		LWLockRelease(partitionLock);
2312
		elog(WARNING, "you don't own a lock of type %s",
2313
			 lockMethodTable->lockModeNames[lockmode]);
2314 2315 2316 2317 2318 2319 2320 2321
		return;
	}

	/*
	 * Do the releasing.  CleanUpLock will waken any now-wakable waiters.
	 */
	wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);

2322 2323 2324
	CleanUpLock(lock, proclock,
				lockMethodTable, partition,
				wakeupNeeded);
2325

2326
	LWLockRelease(partitionLock);
2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339
}

/*
 * 2PC processing routine for ROLLBACK PREPARED case.
 *
 * This is actually just the same as the COMMIT case.
 */
void
lock_twophase_postabort(TransactionId xid, uint16 info,
						void *recdata, uint32 len)
{
	lock_twophase_postcommit(xid, info, recdata, len);
}