lock.c 42.3 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * lock.c
4
 *	  POSTGRES low-level lock mechanism
5
 *
6
 * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
B
Add:  
Bruce Momjian 已提交
7
 * Portions Copyright (c) 1994, Regents of the University of California
8 9 10
 *
 *
 * IDENTIFICATION
11
 *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.90 2001/06/27 23:31:39 tgl Exp $
12 13
 *
 * NOTES
14 15
 *	  Outside modules can create a lock table and acquire/release
 *	  locks.  A lock table is a shared memory hash table.  When
16
 *	  a process tries to acquire a lock of a type that conflicts
17 18
 *	  with existing locks, it is put to sleep using the routines
 *	  in storage/lmgr/proc.c.
19
 *
20 21 22
 *	  For the most part, this code should be invoked via lmgr.c
 *	  or another lock-management module, not directly.
 *
23
 *	Interface:
24
 *
M
 
Marc G. Fournier 已提交
25
 *	LockAcquire(), LockRelease(), LockMethodTableInit(),
26
 *	LockMethodTableRename(), LockReleaseAll,
27
 *	LockCheckConflicts(), GrantLock()
28 29 30
 *
 *-------------------------------------------------------------------------
 */
31 32
#include "postgres.h"

33 34
#include <sys/types.h>
#include <unistd.h>
M
 
Marc G. Fournier 已提交
35
#include <signal.h>
B
Bruce Momjian 已提交
36

B
Bruce Momjian 已提交
37
#include "access/xact.h"
B
Bruce Momjian 已提交
38
#include "miscadmin.h"
39
#include "storage/proc.h"
40
#include "utils/memutils.h"
M
 
Marc G. Fournier 已提交
41
#include "utils/ps_status.h"
42

43 44 45 46 47 48 49

/* This configuration variable is used to set the lock table size */
int		max_locks_per_xact;		/* set by guc.c */

#define NLOCKENTS(maxBackends)	(max_locks_per_xact * (maxBackends))


B
Bruce Momjian 已提交
50 51
static int WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
		   LOCK *lock, HOLDER *holder);
52
static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc,
B
Bruce Momjian 已提交
53
				 int *myHolding);
54

B
Bruce Momjian 已提交
55
static char *lock_mode_names[] =
56
{
V
Vadim B. Mikheev 已提交
57 58 59 60 61 62 63 64
	"INVALID",
	"AccessShareLock",
	"RowShareLock",
	"RowExclusiveLock",
	"ShareLock",
	"ShareRowExclusiveLock",
	"ExclusiveLock",
	"AccessExclusiveLock"
65
};
66

67
static char *DeadLockMessage = "Deadlock detected.\n\tSee the lock(l) manual page for a possible cause.";
68 69 70 71 72 73 74


#ifdef LOCK_DEBUG

/*------
 * The following configuration options are available for lock debugging:
 *
B
Bruce Momjian 已提交
75 76 77 78 79 80
 *	   TRACE_LOCKS		-- give a bunch of output what's going on in this file
 *	   TRACE_USERLOCKS	-- same but for user locks
 *	   TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
 *						   (use to avoid output on system tables)
 *	   TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
 *	   DEBUG_DEADLOCKS	-- currently dumps locks at untimely occasions ;)
B
Bruce Momjian 已提交
81
 *
82
 * Furthermore, but in storage/ipc/spin.c:
B
Bruce Momjian 已提交
83
 *	   TRACE_SPINLOCKS	-- trace spinlocks (pretty useless)
84
 *
B
Bruce Momjian 已提交
85 86
 * Define LOCK_DEBUG at compile time to get all these enabled.
 * --------
87 88
 */

B
Bruce Momjian 已提交
89 90 91 92 93
int			Trace_lock_oidmin = BootstrapObjectIdData;
bool		Trace_locks = false;
bool		Trace_userlocks = false;
int			Trace_lock_table = 0;
bool		Debug_deadlocks = false;
94 95 96


inline static bool
B
Bruce Momjian 已提交
97
LOCK_DEBUG_ENABLED(const LOCK *lock)
98
{
B
Bruce Momjian 已提交
99
	return
B
Bruce Momjian 已提交
100 101 102 103
	(((LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD && Trace_locks)
	  || (LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD && Trace_userlocks))
	 && (lock->tag.relId >= (Oid) Trace_lock_oidmin))
	|| (Trace_lock_table && (lock->tag.relId == Trace_lock_table));
104 105 106 107
}


inline static void
B
Bruce Momjian 已提交
108
LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
109 110
{
	if (LOCK_DEBUG_ENABLED(lock))
B
Bruce Momjian 已提交
111 112 113 114 115 116 117 118
		elog(DEBUG,
			 "%s: lock(%lx) tbl(%d) rel(%u) db(%u) obj(%u) grantMask(%x) "
			 "req(%d,%d,%d,%d,%d,%d,%d)=%d "
			 "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
			 where, MAKE_OFFSET(lock),
			 lock->tag.lockmethod, lock->tag.relId, lock->tag.dbId,
			 lock->tag.objId.blkno, lock->grantMask,
			 lock->requested[1], lock->requested[2], lock->requested[3],
119 120
			 lock->requested[4], lock->requested[5], lock->requested[6],
			 lock->requested[7], lock->nRequested,
B
Bruce Momjian 已提交
121 122 123
			 lock->granted[1], lock->granted[2], lock->granted[3],
			 lock->granted[4], lock->granted[5], lock->granted[6],
			 lock->granted[7], lock->nGranted,
B
Bruce Momjian 已提交
124
			 lock->waitProcs.size, lock_mode_names[type]);
125 126 127 128
}


inline static void
B
Bruce Momjian 已提交
129
HOLDER_PRINT(const char *where, const HOLDER *holderP)
130 131
{
	if (
B
Bruce Momjian 已提交
132 133 134 135 136
	 (((HOLDER_LOCKMETHOD(*holderP) == DEFAULT_LOCKMETHOD && Trace_locks)
	   || (HOLDER_LOCKMETHOD(*holderP) == USER_LOCKMETHOD && Trace_userlocks))
	  && (((LOCK *) MAKE_PTR(holderP->tag.lock))->tag.relId >= (Oid) Trace_lock_oidmin))
		|| (Trace_lock_table && (((LOCK *) MAKE_PTR(holderP->tag.lock))->tag.relId == Trace_lock_table))
	)
B
Bruce Momjian 已提交
137 138 139
		elog(DEBUG,
			 "%s: holder(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
			 where, MAKE_OFFSET(holderP), holderP->tag.lock,
140
			 HOLDER_LOCKMETHOD(*(holderP)),
B
Bruce Momjian 已提交
141
			 holderP->tag.proc, holderP->tag.xid,
B
Bruce Momjian 已提交
142 143
		   holderP->holding[1], holderP->holding[2], holderP->holding[3],
		   holderP->holding[4], holderP->holding[5], holderP->holding[6],
144
			 holderP->holding[7], holderP->nHolding);
145 146
}

B
Bruce Momjian 已提交
147
#else							/* not LOCK_DEBUG */
148 149

#define LOCK_PRINT(where, lock, type)
150
#define HOLDER_PRINT(where, holderP)
151

B
Bruce Momjian 已提交
152
#endif	 /* not LOCK_DEBUG */
153 154 155



156
SPINLOCK	LockMgrLock;		/* in Shmem or created in
157
								 * CreateSpinlocks() */
158

159 160 161 162 163 164
/*
 * These are to simplify/speed up some bit arithmetic.
 *
 * XXX is a fetch from a static array really faster than a shift?
 * Wouldn't bet on it...
 */
165

B
Bruce Momjian 已提交
166 167
static LOCKMASK BITS_OFF[MAX_LOCKMODES];
static LOCKMASK BITS_ON[MAX_LOCKMODES];
168

B
Bruce Momjian 已提交
169
/*
170
 * Disable flag
171
 */
172
static bool LockingIsDisabled;
173

B
Bruce Momjian 已提交
174
/*
175
 * map from lockmethod to the lock table structure
176
 */
177
static LOCKMETHODTABLE *LockMethodTable[MAX_LOCK_METHODS];
178

179
static int	NumLockMethods;
180

B
Bruce Momjian 已提交
181
/*
182
 * InitLocks -- Init the lock module.  Create a private data
183
 *		structure for constructing conflict masks.
184 185
 */
void
186
InitLocks(void)
187
{
188 189
	int			i;
	int			bit;
190 191

	bit = 1;
192
	for (i = 0; i < MAX_LOCKMODES; i++, bit <<= 1)
193
	{
194 195
		BITS_ON[i] = bit;
		BITS_OFF[i] = ~bit;
196 197 198
	}
}

B
Bruce Momjian 已提交
199
/*
200 201 202
 * LockDisable -- sets LockingIsDisabled flag to TRUE or FALSE.
 */
void
203
LockDisable(bool status)
204
{
205
	LockingIsDisabled = status;
206 207
}

B
Bruce Momjian 已提交
208
/*
209 210 211 212 213 214 215 216
 * Boolean function to determine current locking status
 */
bool
LockingDisabled(void)
{
	return LockingIsDisabled;
}

217 218 219 220 221 222
/*
 * Fetch the lock method table associated with a given lock
 */
LOCKMETHODTABLE *
GetLocksMethodTable(LOCK *lock)
{
B
Bruce Momjian 已提交
223
	LOCKMETHOD	lockmethod = LOCK_LOCKMETHOD(*lock);
224 225 226 227 228

	Assert(lockmethod > 0 && lockmethod < NumLockMethods);
	return LockMethodTable[lockmethod];
}

229 230

/*
231
 * LockMethodInit -- initialize the lock table's lock type
232
 *		structures
233 234 235 236
 *
 * Notes: just copying.  Should only be called once.
 */
static void
237
LockMethodInit(LOCKMETHODTABLE *lockMethodTable,
B
Bruce Momjian 已提交
238
			   LOCKMASK *conflictsP,
239 240
			   int *prioP,
			   int numModes)
241
{
242
	int			i;
243

244 245 246
	lockMethodTable->ctl->numLockModes = numModes;
	numModes++;
	for (i = 0; i < numModes; i++, prioP++, conflictsP++)
247
	{
248 249
		lockMethodTable->ctl->conflictTab[i] = *conflictsP;
		lockMethodTable->ctl->prio[i] = *prioP;
250 251 252 253
	}
}

/*
254
 * LockMethodTableInit -- initialize a lock table structure
255 256
 *
 * Notes:
257
 *		(a) a lock table has four separate entries in the shmem index
258
 *		table.	This is because every shared hash table and spinlock
259
 *		has its name stored in the shmem index at its creation.  It
260
 *		is wasteful, in this case, but not much space is involved.
261
 *
262
 * NOTE: data structures allocated here are allocated permanently, using
B
Bruce Momjian 已提交
263
 * TopMemoryContext and shared memory.	We don't ever release them anyway,
264 265 266
 * and in normal multi-backend operation the lock table structures set up
 * by the postmaster are inherited by each backend, so they must be in
 * TopMemoryContext.
267
 */
268 269
LOCKMETHOD
LockMethodTableInit(char *tabName,
B
Bruce Momjian 已提交
270
					LOCKMASK *conflictsP,
271
					int *prioP,
272 273
					int numModes,
					int maxBackends)
274
{
275
	LOCKMETHODTABLE *lockMethodTable;
276 277 278 279
	char	   *shmemName;
	HASHCTL		info;
	int			hash_flags;
	bool		found;
280 281
	long		init_table_size,
				max_table_size;
282

283
	if (numModes >= MAX_LOCKMODES)
284
	{
285 286
		elog(NOTICE, "LockMethodTableInit: too many lock types %d greater than %d",
			 numModes, MAX_LOCKMODES);
287
		return INVALID_LOCKMETHOD;
288
	}
289

290 291 292 293
	/* Compute init/max size to request for lock hashtables */
	max_table_size = NLOCKENTS(maxBackends);
	init_table_size = max_table_size / 10;

294 295 296
	/* Allocate a string for the shmem index table lookups. */
	/* This is just temp space in this routine, so palloc is OK. */
	shmemName = (char *) palloc(strlen(tabName) + 32);
297

298 299 300
	/* each lock table has a non-shared, permanent header */
	lockMethodTable = (LOCKMETHODTABLE *)
		MemoryContextAlloc(TopMemoryContext, sizeof(LOCKMETHODTABLE));
301

B
Bruce Momjian 已提交
302
	/*
303 304 305 306
	 * find/acquire the spinlock for the table
	 */
	SpinAcquire(LockMgrLock);

B
Bruce Momjian 已提交
307
	/*
B
Bruce Momjian 已提交
308 309
	 * allocate a control structure from shared memory or attach to it if
	 * it already exists.
310 311
	 */
	sprintf(shmemName, "%s (ctl)", tabName);
312
	lockMethodTable->ctl = (LOCKMETHODCTL *)
313
		ShmemInitStruct(shmemName, sizeof(LOCKMETHODCTL), &found);
314

315 316
	if (!lockMethodTable->ctl)
		elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
317

B
Bruce Momjian 已提交
318
	/*
319 320 321
	 * no zero-th table
	 */
	NumLockMethods = 1;
322

B
Bruce Momjian 已提交
323
	/*
324 325 326
	 * we're first - initialize
	 */
	if (!found)
327
	{
328 329 330
		MemSet(lockMethodTable->ctl, 0, sizeof(LOCKMETHODCTL));
		lockMethodTable->ctl->masterLock = LockMgrLock;
		lockMethodTable->ctl->lockmethod = NumLockMethods;
331
	}
332

B
Bruce Momjian 已提交
333
	/*
334
	 * other modules refer to the lock table by a lockmethod ID
335
	 */
336 337 338
	LockMethodTable[NumLockMethods] = lockMethodTable;
	NumLockMethods++;
	Assert(NumLockMethods <= MAX_LOCK_METHODS);
339

B
Bruce Momjian 已提交
340
	/*
B
Bruce Momjian 已提交
341 342
	 * allocate a hash table for LOCK structs.	This is used to store
	 * per-locked-object information.
343
	 */
344 345
	info.keysize = SHMEM_LOCKTAB_KEYSIZE;
	info.datasize = SHMEM_LOCKTAB_DATASIZE;
346 347 348 349
	info.hash = tag_hash;
	hash_flags = (HASH_ELEM | HASH_FUNCTION);

	sprintf(shmemName, "%s (lock hash)", tabName);
350 351 352 353 354
	lockMethodTable->lockHash = ShmemInitHash(shmemName,
											  init_table_size,
											  max_table_size,
											  &info,
											  hash_flags);
355

356 357
	if (!lockMethodTable->lockHash)
		elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
358
	Assert(lockMethodTable->lockHash->hash == tag_hash);
359

B
Bruce Momjian 已提交
360
	/*
B
Bruce Momjian 已提交
361 362
	 * allocate a hash table for HOLDER structs.  This is used to store
	 * per-lock-holder information.
363
	 */
364 365
	info.keysize = SHMEM_HOLDERTAB_KEYSIZE;
	info.datasize = SHMEM_HOLDERTAB_DATASIZE;
366 367 368
	info.hash = tag_hash;
	hash_flags = (HASH_ELEM | HASH_FUNCTION);

369 370 371 372 373 374
	sprintf(shmemName, "%s (holder hash)", tabName);
	lockMethodTable->holderHash = ShmemInitHash(shmemName,
												init_table_size,
												max_table_size,
												&info,
												hash_flags);
375

376
	if (!lockMethodTable->holderHash)
377
		elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
378 379

	/* init ctl data structures */
380
	LockMethodInit(lockMethodTable, conflictsP, prioP, numModes);
381 382 383 384 385

	SpinRelease(LockMgrLock);

	pfree(shmemName);

386
	return lockMethodTable->ctl->lockmethod;
387 388 389
}

/*
390
 * LockMethodTableRename -- allocate another lockmethod ID to the same
391
 *		lock table.
392 393
 *
 * NOTES: Both the lock module and the lock chain (lchain.c)
394 395 396 397
 *		module use table id's to distinguish between different
 *		kinds of locks.  Short term and long term locks look
 *		the same to the lock table, but are handled differently
 *		by the lock chain manager.	This function allows the
398
 *		client to use different lockmethods when acquiring/releasing
399
 *		short term and long term locks, yet store them all in one hashtable.
400
 */
M
 
Marc G. Fournier 已提交
401

402 403
LOCKMETHOD
LockMethodTableRename(LOCKMETHOD lockmethod)
404
{
405
	LOCKMETHOD	newLockMethod;
406

407
	if (NumLockMethods >= MAX_LOCK_METHODS)
408
		return INVALID_LOCKMETHOD;
M
 
Marc G. Fournier 已提交
409
	if (LockMethodTable[lockmethod] == INVALID_LOCKMETHOD)
410
		return INVALID_LOCKMETHOD;
411

412
	/* other modules refer to the lock table by a lockmethod ID */
413 414
	newLockMethod = NumLockMethods;
	NumLockMethods++;
415

416
	LockMethodTable[newLockMethod] = LockMethodTable[lockmethod];
417
	return newLockMethod;
418 419 420 421
}

/*
 * LockAcquire -- Check for lock conflicts, sleep if conflict found,
422
 *		set lock if/when no conflicts.
423
 *
424 425 426 427 428
 * Returns: TRUE if lock was acquired, FALSE otherwise.  Note that
 *		a FALSE return is to be expected if dontWait is TRUE;
 *		but if dontWait is FALSE, only a parameter error can cause
 *		a FALSE return.  (XXX probably we should just elog on parameter
 *		errors, instead of conflating this with failure to acquire lock?)
429
 *
430 431 432 433
 * Side Effects: The lock is acquired and recorded in lock tables.
 *
 * NOTE: if we wait for the lock, there is no way to abort the wait
 * short of aborting the transaction.
M
 
Marc G. Fournier 已提交
434 435
 *
 *
436
 * Note on User Locks:
M
 
Marc G. Fournier 已提交
437
 *
438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458
 *		User locks are handled totally on the application side as
 *		long term cooperative locks which extend beyond the normal
 *		transaction boundaries.  Their purpose is to indicate to an
 *		application that someone is `working' on an item.  So it is
 *		possible to put an user lock on a tuple's oid, retrieve the
 *		tuple, work on it for an hour and then update it and remove
 *		the lock.  While the lock is active other clients can still
 *		read and write the tuple but they can be aware that it has
 *		been locked at the application level by someone.
 *		User locks use lock tags made of an uint16 and an uint32, for
 *		example 0 and a tuple oid, or any other arbitrary pair of
 *		numbers following a convention established by the application.
 *		In this sense tags don't refer to tuples or database entities.
 *		User locks and normal locks are completely orthogonal and
 *		they don't interfere with each other, so it is possible
 *		to acquire a normal lock on an user-locked tuple or user-lock
 *		a tuple for which a normal write lock already exists.
 *		User locks are always non blocking, therefore they are never
 *		acquired if already held by another process.  They must be
 *		released explicitly by the application but they are released
 *		automatically when a backend terminates.
M
 
Marc G. Fournier 已提交
459 460
 *		They are indicated by a lockmethod 2 which is an alias for the
 *		normal lock table, and are distinguished from normal locks
461
 *		by the following differences:
462
 *
463
 *										normal lock		user lock
464
 *
M
 
Marc G. Fournier 已提交
465
 *		lockmethod						1				2
466 467 468 469 470
 *		tag.dbId						database oid	database oid
 *		tag.relId						rel oid or 0	0
 *		tag.objId						block id		lock id2
 *										or xact id
 *		tag.offnum						0				lock id1
471
 *		holder.xid						xid or 0		0
472
 *		persistence						transaction		user or backend
473
 *										or backend
474
 *
475
 *		The lockmode parameter can have the same values for normal locks
476
 *		although probably only WRITE_LOCK can have some practical use.
477
 *
M
 
Marc G. Fournier 已提交
478
 *														DZ - 22 Nov 1997
479
 */
480

481
bool
482
LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
483
			TransactionId xid, LOCKMODE lockmode, bool dontWait)
484
{
485 486 487
	HOLDER	   *holder;
	HOLDERTAG	holdertag;
	HTAB	   *holderTable;
488
	bool		found;
489
	LOCK	   *lock;
490
	SPINLOCK	masterLock;
491
	LOCKMETHODTABLE *lockMethodTable;
492
	int			status;
493
	int			myHolding[MAX_LOCKMODES];
494
	int			i;
495

496 497
#ifdef LOCK_DEBUG
	if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
498
		elog(DEBUG, "LockAcquire: user lock [%u] %s",
B
Bruce Momjian 已提交
499
			 locktag->objId.blkno, lock_mode_names[lockmode]);
500 501
#endif

M
 
Marc G. Fournier 已提交
502 503 504
	/* ???????? This must be changed when short term locks will be used */
	locktag->lockmethod = lockmethod;

505 506 507
	Assert(lockmethod < NumLockMethods);
	lockMethodTable = LockMethodTable[lockmethod];
	if (!lockMethodTable)
508
	{
509
		elog(NOTICE, "LockAcquire: bad lock table %d", lockmethod);
510
		return FALSE;
511
	}
512 513

	if (LockingIsDisabled)
514
		return TRUE;
515

516
	masterLock = lockMethodTable->ctl->masterLock;
517 518 519

	SpinAcquire(masterLock);

M
 
Marc G. Fournier 已提交
520 521 522
	/*
	 * Find or create a lock with this tag
	 */
523
	Assert(lockMethodTable->lockHash->hash == tag_hash);
M
 
Marc G. Fournier 已提交
524 525
	lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
								HASH_ENTER, &found);
526
	if (!lock)
527
	{
528
		SpinRelease(masterLock);
529
		elog(FATAL, "LockAcquire: lock table %d is corrupted", lockmethod);
530
		return FALSE;
531
	}
532

B
Bruce Momjian 已提交
533
	/*
534
	 * if it's a new lock object, initialize it
535 536
	 */
	if (!found)
537
	{
538 539
		lock->grantMask = 0;
		lock->waitMask = 0;
540 541
		SHMQueueInit(&(lock->lockHolders));
		ProcQueueInit(&(lock->waitProcs));
542 543 544 545
		lock->nRequested = 0;
		lock->nGranted = 0;
		MemSet((char *) lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
		MemSet((char *) lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
M
 
Marc G. Fournier 已提交
546
		LOCK_PRINT("LockAcquire: new", lock, lockmode);
547 548 549
	}
	else
	{
M
 
Marc G. Fournier 已提交
550
		LOCK_PRINT("LockAcquire: found", lock, lockmode);
551 552 553
		Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
		Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
		Assert(lock->nGranted <= lock->nRequested);
554
	}
555

B
Bruce Momjian 已提交
556
	/*
557
	 * Create the hash key for the holder table.
558
	 */
B
Bruce Momjian 已提交
559 560
	MemSet(&holdertag, 0, sizeof(HOLDERTAG));	/* must clear padding,
												 * needed */
561
	holdertag.lock = MAKE_OFFSET(lock);
562
	holdertag.proc = MAKE_OFFSET(MyProc);
563
	TransactionIdStore(xid, &holdertag.xid);
564

M
 
Marc G. Fournier 已提交
565
	/*
566
	 * Find or create a holder entry with this tag
M
 
Marc G. Fournier 已提交
567
	 */
568 569 570 571
	holderTable = lockMethodTable->holderHash;
	holder = (HOLDER *) hash_search(holderTable, (Pointer) &holdertag,
									HASH_ENTER, &found);
	if (!holder)
572
	{
573
		SpinRelease(masterLock);
574
		elog(FATAL, "LockAcquire: holder table corrupted");
575
		return FALSE;
576
	}
M
 
Marc G. Fournier 已提交
577 578

	/*
579
	 * If new, initialize the new entry
M
 
Marc G. Fournier 已提交
580
	 */
581
	if (!found)
582
	{
583
		holder->nHolding = 0;
584
		MemSet((char *) holder->holding, 0, sizeof(int) * MAX_LOCKMODES);
585 586 587
		/* Add holder to appropriate lists */
		SHMQueueInsertBefore(&lock->lockHolders, &holder->lockLink);
		SHMQueueInsertBefore(&MyProc->procHolders, &holder->procLink);
588
		HOLDER_PRINT("LockAcquire: new", holder);
589 590 591
	}
	else
	{
592
		HOLDER_PRINT("LockAcquire: found", holder);
593 594
		Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
		Assert(holder->nHolding <= lock->nGranted);
595

596
#ifdef CHECK_DEADLOCK_RISK
B
Bruce Momjian 已提交
597

598 599 600 601 602 603 604
		/*
		 * Issue warning if we already hold a lower-level lock on this
		 * object and do not hold a lock of the requested level or higher.
		 * This indicates a deadlock-prone coding practice (eg, we'd have
		 * a deadlock if another backend were following the same code path
		 * at about the same time).
		 *
B
Bruce Momjian 已提交
605 606 607 608
		 * This is not enabled by default, because it may generate log
		 * entries about user-level coding practices that are in fact safe
		 * in context. It can be enabled to help find system-level
		 * problems.
609
		 *
B
Bruce Momjian 已提交
610 611
		 * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
		 * better to use a table.  For now, though, this works.
612 613 614
		 */
		for (i = lockMethodTable->ctl->numLockModes; i > 0; i--)
		{
615
			if (holder->holding[i] > 0)
616 617 618 619 620
			{
				if (i >= (int) lockmode)
					break;		/* safe: we have a lock >= req level */
				elog(DEBUG, "Deadlock risk: raising lock level"
					 " from %s to %s on object %u/%u/%u",
B
Bruce Momjian 已提交
621
					 lock_mode_names[i], lock_mode_names[lockmode],
B
Bruce Momjian 已提交
622
				 lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno);
623 624 625
				break;
			}
		}
B
Bruce Momjian 已提交
626
#endif	 /* CHECK_DEADLOCK_RISK */
627
	}
628

B
Bruce Momjian 已提交
629
	/*
630
	 * lock->nRequested and lock->requested[] count the total number of
B
Bruce Momjian 已提交
631 632
	 * requests, whether granted or waiting, so increment those
	 * immediately. The other counts don't increment till we get the lock.
633
	 */
634 635 636
	lock->nRequested++;
	lock->requested[lockmode]++;
	Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
637

B
Bruce Momjian 已提交
638
	/*
B
Bruce Momjian 已提交
639 640
	 * If I already hold one or more locks of the requested type, just
	 * grant myself another one without blocking.
641
	 */
642
	if (holder->holding[lockmode] > 0)
643 644 645 646 647 648 649
	{
		GrantLock(lock, holder, lockmode);
		HOLDER_PRINT("LockAcquire: owning", holder);
		SpinRelease(masterLock);
		return TRUE;
	}

B
Bruce Momjian 已提交
650
	/*
B
Bruce Momjian 已提交
651 652
	 * If this process (under any XID) is a holder of the lock, also grant
	 * myself another one without blocking.
653
	 */
654
	LockCountMyLocks(holder->tag.lock, MyProc, myHolding);
655
	if (myHolding[lockmode] > 0)
656
	{
657 658
		GrantLock(lock, holder, lockmode);
		HOLDER_PRINT("LockAcquire: my other XID owning", holder);
659
		SpinRelease(masterLock);
660
		return TRUE;
661
	}
662

B
Bruce Momjian 已提交
663
	/*
B
Bruce Momjian 已提交
664 665 666
	 * If lock requested conflicts with locks requested by waiters, must
	 * join wait queue.  Otherwise, check for conflict with already-held
	 * locks.  (That's last because most complex check.)
V
Vadim B. Mikheev 已提交
667 668
	 */
	if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
669
		status = STATUS_FOUND;
V
Vadim B. Mikheev 已提交
670
	else
671 672 673
		status = LockCheckConflicts(lockMethodTable, lockmode,
									lock, holder,
									MyProc, myHolding);
V
Vadim B. Mikheev 已提交
674

675
	if (status == STATUS_OK)
676 677
	{
		/* No conflict with held or previously requested locks */
678
		GrantLock(lock, holder, lockmode);
679 680
	}
	else
681
	{
682
		Assert(status == STATUS_FOUND);
683
		/*
684 685
		 * We can't acquire the lock immediately.  If caller specified no
		 * blocking, remove the holder entry and return FALSE without waiting.
686
		 */
687
		if (dontWait)
688
		{
689
			if (holder->nHolding == 0)
690
			{
691 692
				SHMQueueDelete(&holder->lockLink);
				SHMQueueDelete(&holder->procLink);
693 694 695 696 697
				holder = (HOLDER *) hash_search(holderTable,
												(Pointer) holder,
												HASH_REMOVE, &found);
				if (!holder || !found)
					elog(NOTICE, "LockAcquire: remove holder, table corrupted");
698
			}
699
			else
700
				HOLDER_PRINT("LockAcquire: NHOLDING", holder);
701 702
			lock->nRequested--;
			lock->requested[lockmode]--;
703
			LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
704 705
			Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
			Assert(lock->nGranted <= lock->nRequested);
706
			SpinRelease(masterLock);
707
			return FALSE;
708
		}
B
Bruce Momjian 已提交
709

V
Vadim B. Mikheev 已提交
710
		/*
711
		 * Construct bitmask of locks this process holds on this object.
V
Vadim B. Mikheev 已提交
712 713
		 */
		{
714
			int			heldLocks = 0;
715
			int			tmpMask;
V
Vadim B. Mikheev 已提交
716

717 718
			for (i = 1, tmpMask = 2;
				 i <= lockMethodTable->ctl->numLockModes;
B
Bruce Momjian 已提交
719
				 i++, tmpMask <<= 1)
V
Vadim B. Mikheev 已提交
720
			{
721 722
				if (myHolding[i] > 0)
					heldLocks |= tmpMask;
V
Vadim B. Mikheev 已提交
723
			}
724
			MyProc->heldLocks = heldLocks;
V
Vadim B. Mikheev 已提交
725 726
		}

727 728 729 730
		/*
		 * Sleep till someone wakes me up.
		 */
		status = WaitOnLock(lockmethod, lockmode, lock, holder);
731

732 733
		/*
		 * NOTE: do not do any material change of state between here and
B
Bruce Momjian 已提交
734 735 736
		 * return.	All required changes in locktable state must have been
		 * done when the lock was granted to us --- see notes in
		 * WaitOnLock.
737 738
		 */

M
 
Marc G. Fournier 已提交
739
		/*
740
		 * Check the holder entry status, in case something in the ipc
741
		 * communication doesn't work correctly.
M
 
Marc G. Fournier 已提交
742
		 */
743
		if (!((holder->nHolding > 0) && (holder->holding[lockmode] > 0)))
744
		{
745
			HOLDER_PRINT("LockAcquire: INCONSISTENT", holder);
746
			LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
M
 
Marc G. Fournier 已提交
747
			/* Should we retry ? */
748
			SpinRelease(masterLock);
749
			return FALSE;
M
 
Marc G. Fournier 已提交
750
		}
751
		HOLDER_PRINT("LockAcquire: granted", holder);
M
 
Marc G. Fournier 已提交
752
		LOCK_PRINT("LockAcquire: granted", lock, lockmode);
753
	}
754 755 756

	SpinRelease(masterLock);

757
	return status == STATUS_OK;
758 759
}

B
Bruce Momjian 已提交
760
/*
761 762 763 764
 * LockCheckConflicts -- test whether requested lock conflicts
 *		with those already granted
 *
 * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
765 766
 *
 * NOTES:
767 768 769 770 771
 *		Here's what makes this complicated: one process's locks don't
 * conflict with one another, even if they are held under different
 * transaction IDs (eg, session and xact locks do not conflict).
 * So, we must subtract off our own locks when determining whether the
 * requested new lock conflicts with those already held.
772
 *
773
 * The caller can optionally pass the process's total holding counts, if
774
 * known.  If NULL is passed then these values will be computed internally.
775 776
 */
int
777 778 779 780 781 782
LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable,
				   LOCKMODE lockmode,
				   LOCK *lock,
				   HOLDER *holder,
				   PROC *proc,
				   int *myHolding)		/* myHolding[] array or NULL */
783
{
784
	LOCKMETHODCTL *lockctl = lockMethodTable->ctl;
785
	int			numLockModes = lockctl->numLockModes;
786 787 788
	int			bitmask;
	int			i,
				tmpMask;
789
	int			localHolding[MAX_LOCKMODES];
790

B
Bruce Momjian 已提交
791
	/*
B
Bruce Momjian 已提交
792 793
	 * first check for global conflicts: If no locks conflict with my
	 * request, then I get the lock.
794
	 *
795
	 * Checking for conflict: lock->grantMask represents the types of
B
Bruce Momjian 已提交
796 797 798
	 * currently held locks.  conflictTable[lockmode] has a bit set for
	 * each type of lock that conflicts with request.	Bitwise compare
	 * tells if there is a conflict.
799
	 */
800
	if (!(lockctl->conflictTab[lockmode] & lock->grantMask))
801
	{
802
		HOLDER_PRINT("LockCheckConflicts: no conflict", holder);
803
		return STATUS_OK;
804
	}
805

B
Bruce Momjian 已提交
806
	/*
B
Bruce Momjian 已提交
807 808 809 810
	 * Rats.  Something conflicts. But it could still be my own lock.  We
	 * have to construct a conflict mask that does not reflect our own
	 * locks.  Locks held by the current process under another XID also
	 * count as "our own locks".
811
	 */
812
	if (myHolding == NULL)
813 814
	{
		/* Caller didn't do calculation of total holding for me */
815 816
		LockCountMyLocks(holder->tag.lock, proc, localHolding);
		myHolding = localHolding;
817 818 819
	}

	/* Compute mask of lock types held by other processes */
820 821
	bitmask = 0;
	tmpMask = 2;
822
	for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
823
	{
824
		if (lock->granted[i] != myHolding[i])
825
			bitmask |= tmpMask;
826
	}
827

B
Bruce Momjian 已提交
828
	/*
B
Bruce Momjian 已提交
829 830 831
	 * now check again for conflicts.  'bitmask' describes the types of
	 * locks held by other processes.  If one of these conflicts with the
	 * kind of lock that I want, there is a conflict and I have to sleep.
832
	 */
833
	if (!(lockctl->conflictTab[lockmode] & bitmask))
834
	{
835
		/* no conflict. OK to get the lock */
836
		HOLDER_PRINT("LockCheckConflicts: resolved", holder);
837
		return STATUS_OK;
838
	}
839

840
	HOLDER_PRINT("LockCheckConflicts: conflicting", holder);
841
	return STATUS_FOUND;
842 843
}

M
 
Marc G. Fournier 已提交
844
/*
845 846 847 848 849 850 851 852 853 854
 * LockCountMyLocks --- Count total number of locks held on a given lockable
 *		object by a given process (under any transaction ID).
 *
 * XXX This could be rather slow if the process holds a large number of locks.
 * Perhaps it could be sped up if we kept yet a third hashtable of per-
 * process lock information.  However, for the normal case where a transaction
 * doesn't hold a large number of locks, keeping such a table would probably
 * be a net slowdown.
 */
static void
855
LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolding)
856
{
857 858
	SHM_QUEUE  *procHolders = &(proc->procHolders);
	HOLDER	   *holder;
859 860
	int			i;

861
	MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int));
862

863 864
	holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
									 offsetof(HOLDER, procLink));
865

866
	while (holder)
867 868 869 870
	{
		if (lockOffset == holder->tag.lock)
		{
			for (i = 1; i < MAX_LOCKMODES; i++)
871
				myHolding[i] += holder->holding[i];
872 873
		}

874 875 876
		holder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
										 offsetof(HOLDER, procLink));
	}
877 878 879 880
}

/*
 * GrantLock -- update the lock and holder data structures to show
881
 *		the lock request has been granted.
882 883 884
 *
 * NOTE: if proc was blocked, it also needs to be removed from the wait list
 * and have its waitLock/waitHolder fields cleared.  That's not done here.
M
 
Marc G. Fournier 已提交
885 886
 */
void
887
GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode)
M
 
Marc G. Fournier 已提交
888
{
889 890 891 892
	lock->nGranted++;
	lock->granted[lockmode]++;
	lock->grantMask |= BITS_ON[lockmode];
	if (lock->granted[lockmode] == lock->requested[lockmode])
893
		lock->waitMask &= BITS_OFF[lockmode];
M
 
Marc G. Fournier 已提交
894
	LOCK_PRINT("GrantLock", lock, lockmode);
895 896 897
	Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
	Assert(lock->nGranted <= lock->nRequested);
	holder->holding[lockmode]++;
898
	holder->nHolding++;
899
	Assert((holder->nHolding > 0) && (holder->holding[lockmode] > 0));
M
 
Marc G. Fournier 已提交
900 901
}

902 903 904
/*
 * WaitOnLock -- wait to acquire a lock
 *
905 906 907
 * Caller must have set MyProc->heldLocks to reflect locks already held
 * on the lockable object by this process (under all XIDs).
 *
908 909
 * The locktable spinlock must be held at entry.
 */
910
static int
911 912
WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
		   LOCK *lock, HOLDER *holder)
913
{
914
	LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod];
915 916
	char	   *new_status,
			   *old_status;
917

918
	Assert(lockmethod < NumLockMethods);
919

920
	LOCK_PRINT("WaitOnLock: sleeping on lock", lock, lockmode);
921 922

	old_status = pstrdup(get_ps_display());
923 924
	new_status = (char *) palloc(strlen(old_status) + 10);
	strcpy(new_status, old_status);
925
	strcat(new_status, " waiting");
926 927
	set_ps_display(new_status);

B
Bruce Momjian 已提交
928
	/*
929
	 * NOTE: Think not to put any shared-state cleanup after the call to
930
	 * ProcSleep, in either the normal or failure path.  The lock state
931
	 * must be fully set by the lock grantor, or by HandleDeadLock if we
932 933 934 935 936
	 * give up waiting for the lock.  This is necessary because of the
	 * possibility that a cancel/die interrupt will interrupt ProcSleep
	 * after someone else grants us the lock, but before we've noticed it.
	 * Hence, after granting, the locktable state must fully reflect the
	 * fact that we own the lock; we can't do additional work on return.
937
	 * Contrariwise, if we fail, any cleanup must happen in xact abort
B
Bruce Momjian 已提交
938 939
	 * processing, not here, to ensure it will also happen in the
	 * cancel/die case.
940 941
	 */

942
	if (ProcSleep(lockMethodTable,
943
				  lockmode,
944
				  lock,
945
				  holder) != STATUS_OK)
946
	{
B
Bruce Momjian 已提交
947

B
Bruce Momjian 已提交
948
		/*
B
Bruce Momjian 已提交
949 950
		 * We failed as a result of a deadlock, see HandleDeadLock(). Quit
		 * now.  Removal of the holder and lock objects, if no longer
951
		 * needed, will happen in xact cleanup (see above for motivation).
952
		 */
953
		LOCK_PRINT("WaitOnLock: aborting on lock", lock, lockmode);
954
		SpinRelease(lockMethodTable->ctl->masterLock);
955
		elog(ERROR, DeadLockMessage);
M
 
Marc G. Fournier 已提交
956
		/* not reached */
957
	}
958

959 960 961 962
	set_ps_display(old_status);
	pfree(old_status);
	pfree(new_status);

963
	LOCK_PRINT("WaitOnLock: wakeup on lock", lock, lockmode);
964
	return STATUS_OK;
965 966
}

967
/*
968 969 970 971 972 973 974 975
 * Remove a proc from the wait-queue it is on
 * (caller must know it is on one).
 *
 * Locktable lock must be held by caller.
 *
 * NB: this does not remove the process' holder object, nor the lock object,
 * even though their counts might now have gone to zero.  That will happen
 * during a subsequent LockReleaseAll call, which we expect will happen
B
Bruce Momjian 已提交
976
 * during transaction cleanup.	(Removal of a proc from its wait queue by
977 978 979 980 981
 * this routine can only happen if we are aborting the transaction.)
 */
void
RemoveFromWaitQueue(PROC *proc)
{
B
Bruce Momjian 已提交
982 983
	LOCK	   *waitLock = proc->waitLock;
	LOCKMODE	lockmode = proc->waitLockMode;
984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011

	/* Make sure proc is waiting */
	Assert(proc->links.next != INVALID_OFFSET);
	Assert(waitLock);
	Assert(waitLock->waitProcs.size > 0);

	/* Remove proc from lock's wait queue */
	SHMQueueDelete(&(proc->links));
	waitLock->waitProcs.size--;

	/* Undo increments of request counts by waiting process */
	Assert(waitLock->nRequested > 0);
	Assert(waitLock->nRequested > proc->waitLock->nGranted);
	waitLock->nRequested--;
	Assert(waitLock->requested[lockmode] > 0);
	waitLock->requested[lockmode]--;
	/* don't forget to clear waitMask bit if appropriate */
	if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
		waitLock->waitMask &= BITS_OFF[lockmode];

	/* Clean up the proc's own state */
	proc->waitLock = NULL;
	proc->waitHolder = NULL;

	/* See if any other waiters for the lock can be woken up now */
	ProcLockWakeup(GetLocksMethodTable(waitLock), waitLock);
}

1012
/*
1013
 * LockRelease -- look up 'locktag' in lock table 'lockmethod' and
1014
 *		release one 'lockmode' lock on it.
1015
 *
1016 1017 1018 1019
 * Side Effects: find any waiting processes that are now wakable,
 *		grant them their requested locks and awaken them.
 *		(We have to grant the lock here to avoid a race between
 *		the waking process and any new process to
1020
 *		come along and request the lock.)
1021 1022
 */
bool
1023 1024
LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
			TransactionId xid, LOCKMODE lockmode)
1025
{
1026
	LOCK	   *lock;
1027 1028
	SPINLOCK	masterLock;
	bool		found;
1029
	LOCKMETHODTABLE *lockMethodTable;
1030 1031 1032
	HOLDER	   *holder;
	HOLDERTAG	holdertag;
	HTAB	   *holderTable;
1033
	bool		wakeupNeeded = false;
1034

1035 1036
#ifdef LOCK_DEBUG
	if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
B
Bruce Momjian 已提交
1037
		elog(DEBUG, "LockRelease: user lock tag [%u] %d", locktag->objId.blkno, lockmode);
1038 1039
#endif

M
 
Marc G. Fournier 已提交
1040 1041 1042
	/* ???????? This must be changed when short term locks will be used */
	locktag->lockmethod = lockmethod;

1043 1044 1045
	Assert(lockmethod < NumLockMethods);
	lockMethodTable = LockMethodTable[lockmethod];
	if (!lockMethodTable)
1046
	{
1047
		elog(NOTICE, "lockMethodTable is null in LockRelease");
1048
		return FALSE;
1049 1050 1051
	}

	if (LockingIsDisabled)
1052
		return TRUE;
1053

1054
	masterLock = lockMethodTable->ctl->masterLock;
1055 1056 1057
	SpinAcquire(masterLock);

	/*
M
 
Marc G. Fournier 已提交
1058
	 * Find a lock with this tag
1059
	 */
M
 
Marc G. Fournier 已提交
1060 1061 1062
	Assert(lockMethodTable->lockHash->hash == tag_hash);
	lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
								HASH_FIND, &found);
1063

1064
	/*
1065 1066
	 * let the caller print its own error message, too. Do not
	 * elog(ERROR).
1067 1068
	 */
	if (!lock)
1069
	{
1070 1071
		SpinRelease(masterLock);
		elog(NOTICE, "LockRelease: locktable corrupted");
1072
		return FALSE;
1073
	}
1074 1075

	if (!found)
1076
	{
1077
		SpinRelease(masterLock);
1078
		elog(NOTICE, "LockRelease: no such lock");
1079
		return FALSE;
1080
	}
M
 
Marc G. Fournier 已提交
1081
	LOCK_PRINT("LockRelease: found", lock, lockmode);
1082

M
 
Marc G. Fournier 已提交
1083
	/*
1084
	 * Find the holder entry for this holder.
M
 
Marc G. Fournier 已提交
1085
	 */
B
Bruce Momjian 已提交
1086 1087
	MemSet(&holdertag, 0, sizeof(HOLDERTAG));	/* must clear padding,
												 * needed */
1088
	holdertag.lock = MAKE_OFFSET(lock);
1089
	holdertag.proc = MAKE_OFFSET(MyProc);
1090 1091 1092 1093 1094 1095
	TransactionIdStore(xid, &holdertag.xid);

	holderTable = lockMethodTable->holderHash;
	holder = (HOLDER *) hash_search(holderTable, (Pointer) &holdertag,
									HASH_FIND_SAVE, &found);
	if (!holder || !found)
1096
	{
1097
		SpinRelease(masterLock);
1098
#ifdef USER_LOCKS
1099
		if (!found && lockmethod == USER_LOCKMETHOD)
B
Bruce Momjian 已提交
1100
			elog(NOTICE, "LockRelease: no lock with this tag");
1101
		else
1102
#endif
1103
			elog(NOTICE, "LockRelease: holder table corrupted");
1104
		return FALSE;
1105
	}
1106
	HOLDER_PRINT("LockRelease: found", holder);
M
 
Marc G. Fournier 已提交
1107 1108

	/*
1109 1110
	 * Check that we are actually holding a lock of the type we want to
	 * release.
M
 
Marc G. Fournier 已提交
1111
	 */
1112
	if (!(holder->holding[lockmode] > 0))
1113
	{
1114
		HOLDER_PRINT("LockRelease: WRONGTYPE", holder);
1115 1116
		Assert(holder->holding[lockmode] >= 0);
		SpinRelease(masterLock);
M
 
Marc G. Fournier 已提交
1117
		elog(NOTICE, "LockRelease: you don't own a lock of type %s",
B
Bruce Momjian 已提交
1118
			 lock_mode_names[lockmode]);
1119
		return FALSE;
M
 
Marc G. Fournier 已提交
1120
	}
1121
	Assert(holder->nHolding > 0);
1122 1123 1124
	Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
	Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
	Assert(lock->nGranted <= lock->nRequested);
M
 
Marc G. Fournier 已提交
1125 1126 1127 1128

	/*
	 * fix the general lock stats
	 */
1129 1130 1131 1132
	lock->nRequested--;
	lock->requested[lockmode]--;
	lock->nGranted--;
	lock->granted[lockmode]--;
M
 
Marc G. Fournier 已提交
1133

1134
	if (lock->granted[lockmode] == 0)
1135 1136
	{
		/* change the conflict mask.  No more of this lock type. */
1137
		lock->grantMask &= BITS_OFF[lockmode];
1138 1139
	}

1140 1141 1142 1143 1144
	LOCK_PRINT("LockRelease: updated", lock, lockmode);
	Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
	Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
	Assert(lock->nGranted <= lock->nRequested);

B
Bruce Momjian 已提交
1145
	/*
1146 1147
	 * We need only run ProcLockWakeup if the released lock conflicts with
	 * at least one of the lock types requested by waiter(s).  Otherwise
B
Bruce Momjian 已提交
1148 1149 1150 1151 1152
	 * whatever conflict made them wait must still exist.  NOTE: before
	 * MVCC, we could skip wakeup if lock->granted[lockmode] was still
	 * positive. But that's not true anymore, because the remaining
	 * granted locks might belong to some waiter, who could now be
	 * awakened because he doesn't conflict with his own locks.
M
 
Marc G. Fournier 已提交
1153
	 */
V
Vadim B. Mikheev 已提交
1154 1155 1156
	if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
		wakeupNeeded = true;

1157
	if (lock->nRequested == 0)
M
 
Marc G. Fournier 已提交
1158
	{
B
Bruce Momjian 已提交
1159

B
Bruce Momjian 已提交
1160
		/*
B
Bruce Momjian 已提交
1161 1162
		 * if there's no one waiting in the queue, we just released the
		 * last lock on this object. Delete it from the lock table.
M
 
Marc G. Fournier 已提交
1163 1164 1165 1166 1167 1168
		 */
		Assert(lockMethodTable->lockHash->hash == tag_hash);
		lock = (LOCK *) hash_search(lockMethodTable->lockHash,
									(Pointer) &(lock->tag),
									HASH_REMOVE,
									&found);
1169 1170 1171 1172 1173 1174 1175
		if (!lock || !found)
		{
			SpinRelease(masterLock);
			elog(NOTICE, "LockRelease: remove lock, table corrupted");
			return FALSE;
		}
		wakeupNeeded = false;	/* should be false, but make sure */
M
 
Marc G. Fournier 已提交
1176
	}
1177 1178

	/*
1179
	 * Now fix the per-holder lock stats.
1180
	 */
1181
	holder->holding[lockmode]--;
1182 1183
	holder->nHolding--;
	HOLDER_PRINT("LockRelease: updated", holder);
1184
	Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
1185 1186

	/*
B
Bruce Momjian 已提交
1187 1188
	 * If this was my last hold on this lock, delete my entry in the
	 * holder table.
1189
	 */
1190
	if (holder->nHolding == 0)
1191
	{
1192
		HOLDER_PRINT("LockRelease: deleting", holder);
1193 1194
		SHMQueueDelete(&holder->lockLink);
		SHMQueueDelete(&holder->procLink);
1195 1196 1197
		holder = (HOLDER *) hash_search(holderTable, (Pointer) &holder,
										HASH_REMOVE_SAVED, &found);
		if (!holder || !found)
1198
		{
1199
			SpinRelease(masterLock);
1200
			elog(NOTICE, "LockRelease: remove holder, table corrupted");
1201
			return FALSE;
1202 1203
		}
	}
1204

1205 1206 1207
	/*
	 * Wake up waiters if needed.
	 */
1208
	if (wakeupNeeded)
1209
		ProcLockWakeup(lockMethodTable, lock);
1210 1211

	SpinRelease(masterLock);
1212
	return TRUE;
1213 1214
}

1215
/*
1216
 * LockReleaseAll -- Release all locks in a process's lock list.
1217 1218 1219 1220 1221 1222 1223 1224
 *
 * Well, not really *all* locks.
 *
 * If 'allxids' is TRUE, all locks of the specified lock method are
 * released, regardless of transaction affiliation.
 *
 * If 'allxids' is FALSE, all locks of the specified lock method and
 * specified XID are released.
1225
 */
1226
bool
1227 1228
LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
			   bool allxids, TransactionId xid)
1229
{
1230 1231 1232
	SHM_QUEUE  *procHolders = &(proc->procHolders);
	HOLDER	   *holder;
	HOLDER	   *nextHolder;
1233
	SPINLOCK	masterLock;
1234
	LOCKMETHODTABLE *lockMethodTable;
1235
	int			i,
1236
				numLockModes;
1237 1238
	LOCK	   *lock;
	bool		found;
1239

1240 1241
#ifdef LOCK_DEBUG
	if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1242
		elog(DEBUG, "LockReleaseAll: lockmethod=%d, pid=%d",
1243
			 lockmethod, proc->pid);
1244 1245
#endif

1246 1247
	Assert(lockmethod < NumLockMethods);
	lockMethodTable = LockMethodTable[lockmethod];
1248 1249
	if (!lockMethodTable)
	{
1250
		elog(NOTICE, "LockReleaseAll: bad lockmethod %d", lockmethod);
1251
		return FALSE;
M
 
Marc G. Fournier 已提交
1252
	}
1253

M
 
Marc G. Fournier 已提交
1254 1255 1256
	numLockModes = lockMethodTable->ctl->numLockModes;
	masterLock = lockMethodTable->ctl->masterLock;

1257 1258
	SpinAcquire(masterLock);

1259 1260
	holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
									 offsetof(HOLDER, procLink));
1261

1262
	while (holder)
1263
	{
B
Bruce Momjian 已提交
1264
		bool		wakeupNeeded = false;
1265

1266 1267 1268
		/* Get link first, since we may unlink/delete this holder */
		nextHolder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
											 offsetof(HOLDER, procLink));
1269

1270
		Assert(holder->tag.proc == MAKE_OFFSET(proc));
1271

1272 1273 1274 1275 1276
		lock = (LOCK *) MAKE_PTR(holder->tag.lock);

		/* Ignore items that are not of the lockmethod to be removed */
		if (LOCK_LOCKMETHOD(*lock) != lockmethod)
			goto next_item;
1277

1278 1279
		/* If not allxids, ignore items that are of the wrong xid */
		if (!allxids && xid != holder->tag.xid)
M
 
Marc G. Fournier 已提交
1280 1281
			goto next_item;

1282 1283
		HOLDER_PRINT("LockReleaseAll", holder);
		LOCK_PRINT("LockReleaseAll", lock, 0);
1284 1285 1286
		Assert(lock->nRequested >= 0);
		Assert(lock->nGranted >= 0);
		Assert(lock->nGranted <= lock->nRequested);
1287
		Assert(holder->nHolding >= 0);
1288
		Assert(holder->nHolding <= lock->nRequested);
1289

B
Bruce Momjian 已提交
1290
		/*
1291 1292
		 * fix the general lock stats
		 */
1293
		if (lock->nRequested != holder->nHolding)
1294
		{
1295
			for (i = 1; i <= numLockModes; i++)
1296
			{
1297 1298 1299 1300 1301 1302 1303 1304
				Assert(holder->holding[i] >= 0);
				if (holder->holding[i] > 0)
				{
					lock->requested[i] -= holder->holding[i];
					lock->granted[i] -= holder->holding[i];
					Assert(lock->requested[i] >= 0 && lock->granted[i] >= 0);
					if (lock->granted[i] == 0)
						lock->grantMask &= BITS_OFF[i];
B
Bruce Momjian 已提交
1305

1306 1307 1308 1309
					/*
					 * Read comments in LockRelease
					 */
					if (!wakeupNeeded &&
B
Bruce Momjian 已提交
1310
					lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
1311 1312
						wakeupNeeded = true;
				}
1313
			}
1314 1315 1316 1317
			lock->nRequested -= holder->nHolding;
			lock->nGranted -= holder->nHolding;
			Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
			Assert(lock->nGranted <= lock->nRequested);
1318
		}
1319
		else
1320
		{
B
Bruce Momjian 已提交
1321
			/*
B
Bruce Momjian 已提交
1322 1323
			 * This holder accounts for all the requested locks on the
			 * object, so we can be lazy and just zero things out.
1324
			 */
1325 1326
			lock->nRequested = 0;
			lock->nGranted = 0;
M
 
Marc G. Fournier 已提交
1327
			/* Fix the lock status, just for next LOCK_PRINT message. */
1328 1329
			for (i = 1; i <= numLockModes; i++)
			{
1330 1331
				Assert(lock->requested[i] == lock->granted[i]);
				lock->requested[i] = lock->granted[i] = 0;
M
 
Marc G. Fournier 已提交
1332
			}
1333
		}
M
 
Marc G. Fournier 已提交
1334 1335
		LOCK_PRINT("LockReleaseAll: updated", lock, 0);

1336 1337
		HOLDER_PRINT("LockReleaseAll: deleting", holder);

M
 
Marc G. Fournier 已提交
1338
		/*
1339
		 * Remove the holder entry from the linked lists
M
 
Marc G. Fournier 已提交
1340
		 */
1341 1342
		SHMQueueDelete(&holder->lockLink);
		SHMQueueDelete(&holder->procLink);
M
 
Marc G. Fournier 已提交
1343

1344 1345
		/*
		 * remove the holder entry from the hashtable
1346
		 */
1347 1348 1349 1350 1351
		holder = (HOLDER *) hash_search(lockMethodTable->holderHash,
										(Pointer) holder,
										HASH_REMOVE,
										&found);
		if (!holder || !found)
1352
		{
1353
			SpinRelease(masterLock);
1354
			elog(NOTICE, "LockReleaseAll: holder table corrupted");
1355
			return FALSE;
1356
		}
1357

1358
		if (lock->nRequested == 0)
1359
		{
B
Bruce Momjian 已提交
1360

B
Bruce Momjian 已提交
1361
			/*
1362 1363
			 * We've just released the last lock, so garbage-collect the
			 * lock object.
1364
			 */
M
 
Marc G. Fournier 已提交
1365
			LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
1366
			Assert(lockMethodTable->lockHash->hash == tag_hash);
M
 
Marc G. Fournier 已提交
1367 1368 1369
			lock = (LOCK *) hash_search(lockMethodTable->lockHash,
										(Pointer) &(lock->tag),
										HASH_REMOVE, &found);
1370
			if (!lock || !found)
1371
			{
1372 1373
				SpinRelease(masterLock);
				elog(NOTICE, "LockReleaseAll: cannot remove lock from HTAB");
1374
				return FALSE;
1375 1376
			}
		}
V
Vadim B. Mikheev 已提交
1377
		else if (wakeupNeeded)
1378
			ProcLockWakeup(lockMethodTable, lock);
1379 1380

next_item:
1381
		holder = nextHolder;
M
 
Marc G. Fournier 已提交
1382 1383 1384
	}

	SpinRelease(masterLock);
1385

1386
#ifdef LOCK_DEBUG
B
Bruce Momjian 已提交
1387 1388
	if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
		elog(DEBUG, "LockReleaseAll: done");
1389
#endif
M
 
Marc G. Fournier 已提交
1390

1391
	return TRUE;
1392 1393 1394
}

int
1395
LockShmemSize(int maxBackends)
1396
{
1397
	int			size = 0;
1398
	long		max_table_size = NLOCKENTS(maxBackends);
1399

1400
	size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */
1401
	size += maxBackends * MAXALIGN(sizeof(PROC));		/* each MyProc */
B
Bruce Momjian 已提交
1402 1403
	size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LOCKMETHODCTL)); /* each
																 * lockMethodTable->ctl */
1404

1405
	/* lockHash table */
1406
	size += hash_estimate_size(max_table_size,
1407 1408
							   SHMEM_LOCKTAB_KEYSIZE,
							   SHMEM_LOCKTAB_DATASIZE);
1409

1410
	/* holderHash table */
1411
	size += hash_estimate_size(max_table_size,
1412 1413
							   SHMEM_HOLDERTAB_KEYSIZE,
							   SHMEM_HOLDERTAB_DATASIZE);
1414

B
Bruce Momjian 已提交
1415 1416 1417
	/*
	 * Since the lockHash entry count above is only an estimate, add 10%
	 * safety margin.
1418 1419
	 */
	size += size / 10;
1420 1421

	return size;
1422 1423
}

B
Bruce Momjian 已提交
1424

1425
#ifdef LOCK_DEBUG
1426
/*
1427 1428 1429
 * Dump all locks in the proc->procHolders list.
 *
 * Must have already acquired the masterLock.
1430 1431
 */
void
1432
DumpLocks(void)
1433
{
1434 1435
	SHMEM_OFFSET location;
	PROC	   *proc;
1436 1437
	SHM_QUEUE  *procHolders;
	HOLDER	   *holder;
1438
	LOCK	   *lock;
M
 
Marc G. Fournier 已提交
1439
	int			lockmethod = DEFAULT_LOCKMETHOD;
1440
	LOCKMETHODTABLE *lockMethodTable;
1441

B
Bruce Momjian 已提交
1442
	ShmemPIDLookup(MyProcPid, &location);
1443 1444 1445 1446 1447
	if (location == INVALID_OFFSET)
		return;
	proc = (PROC *) MAKE_PTR(location);
	if (proc != MyProc)
		return;
1448
	procHolders = &proc->procHolders;
1449

1450 1451 1452
	Assert(lockmethod < NumLockMethods);
	lockMethodTable = LockMethodTable[lockmethod];
	if (!lockMethodTable)
1453 1454
		return;

1455 1456 1457
	if (proc->waitLock)
		LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);

1458 1459
	holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
									 offsetof(HOLDER, procLink));
1460

1461
	while (holder)
1462
	{
1463
		Assert(holder->tag.proc == MAKE_OFFSET(proc));
1464

1465
		lock = (LOCK *) MAKE_PTR(holder->tag.lock);
M
 
Marc G. Fournier 已提交
1466

1467 1468 1469
		HOLDER_PRINT("DumpLocks", holder);
		LOCK_PRINT("DumpLocks", lock, 0);

1470 1471 1472
		holder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
										 offsetof(HOLDER, procLink));
	}
1473
}
1474

M
 
Marc G. Fournier 已提交
1475 1476 1477 1478
/*
 * Dump all postgres locks. Must have already acquired the masterLock.
 */
void
1479
DumpAllLocks(void)
M
 
Marc G. Fournier 已提交
1480 1481
{
	SHMEM_OFFSET location;
1482
	PROC	   *proc;
1483
	HOLDER	   *holder = NULL;
1484 1485
	LOCK	   *lock;
	int			pid;
M
 
Marc G. Fournier 已提交
1486
	int			lockmethod = DEFAULT_LOCKMETHOD;
1487
	LOCKMETHODTABLE *lockMethodTable;
1488
	HTAB	   *holderTable;
1489
	HASH_SEQ_STATUS status;
M
 
Marc G. Fournier 已提交
1490 1491

	pid = getpid();
1492
	ShmemPIDLookup(pid, &location);
M
 
Marc G. Fournier 已提交
1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503
	if (location == INVALID_OFFSET)
		return;
	proc = (PROC *) MAKE_PTR(location);
	if (proc != MyProc)
		return;

	Assert(lockmethod < NumLockMethods);
	lockMethodTable = LockMethodTable[lockmethod];
	if (!lockMethodTable)
		return;

1504
	holderTable = lockMethodTable->holderHash;
M
 
Marc G. Fournier 已提交
1505

1506 1507
	if (proc->waitLock)
		LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
M
 
Marc G. Fournier 已提交
1508

1509 1510
	hash_seq_init(&status, holderTable);
	while ((holder = (HOLDER *) hash_seq_search(&status)) &&
1511
		   (holder != (HOLDER *) TRUE))
1512
	{
1513
		HOLDER_PRINT("DumpAllLocks", holder);
M
 
Marc G. Fournier 已提交
1514

1515
		if (holder->tag.lock)
1516
		{
1517
			lock = (LOCK *) MAKE_PTR(holder->tag.lock);
1518
			LOCK_PRINT("DumpAllLocks", lock, 0);
M
 
Marc G. Fournier 已提交
1519
		}
1520
		else
1521
			elog(DEBUG, "DumpAllLocks: holder->tag.lock = NULL");
M
 
Marc G. Fournier 已提交
1522 1523
	}
}
1524

B
Bruce Momjian 已提交
1525
#endif	 /* LOCK_DEBUG */