proc.c 54.1 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * proc.c
4
 *	  routines to manage per-process shared memory data structure
5
 *
B
Bruce Momjian 已提交
6
 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
B
Add:  
Bruce Momjian 已提交
7
 * Portions Copyright (c) 1994, Regents of the University of California
8 9 10
 *
 *
 * IDENTIFICATION
11
 *	  src/backend/storage/lmgr/proc.c
12 13 14 15 16
 *
 *-------------------------------------------------------------------------
 */
/*
 * Interface (a):
17
 *		ProcSleep(), ProcWakeup(),
18 19
 *		ProcQueueAlloc() -- create a shm queue for sleeping processes
 *		ProcQueueInit() -- create a queue without allocing memory
20
 *
21 22
 * Waiting for a lock causes the backend to be put to sleep.  Whoever releases
 * the lock wakes the process up again (and gives it an error code so it knows
23 24 25 26
 * whether it was awoken on an error condition).
 *
 * Interface (b):
 *
27 28
 * ProcReleaseLocks -- frees the locks associated with current transaction
 *
29
 * ProcKill -- destroys the shared memory state (and locks)
30
 * associated with the process.
31
 */
32 33
#include "postgres.h"

34
#include <signal.h>
35 36
#include <unistd.h>
#include <sys/time.h>
M
Marc G. Fournier 已提交
37

38
#include "access/transam.h"
39
#include "access/twophase.h"
40
#include "access/xact.h"
41
#include "miscadmin.h"
42
#include "postmaster/autovacuum.h"
R
Robert Haas 已提交
43
#include "replication/slot.h"
44
#include "replication/syncrep.h"
45
#include "storage/ipc.h"
46
#include "storage/lmgr.h"
47
#include "storage/pmsignal.h"
48
#include "storage/proc.h"
49
#include "storage/procarray.h"
50
#include "storage/procsignal.h"
51
#include "storage/spin.h"
52
#include "utils/timeout.h"
53
#include "utils/timestamp.h"
54

55

56
/* GUC variables */
B
Bruce Momjian 已提交
57
int			DeadlockTimeout = 1000;
58
int			StatementTimeout = 0;
59
int			LockTimeout = 0;
60
bool		log_lock_waits = false;
M
 
Marc G. Fournier 已提交
61

62
/* Pointer to this process's PGPROC and PGXACT structs, if any */
J
Jan Wieck 已提交
63
PGPROC	   *MyProc = NULL;
64
PGXACT	   *MyPgXact = NULL;
65 66

/*
J
Jan Wieck 已提交
67
 * This spinlock protects the freelist of recycled PGPROC structures.
68
 * We cannot use an LWLock because the LWLock manager depends on already
J
Jan Wieck 已提交
69
 * having a PGPROC and a wait semaphore!  But these structures are touched
70 71
 * relatively infrequently (only at backend startup or shutdown) and not for
 * very long, so a spinlock is okay.
72
 */
73
NON_EXEC_STATIC slock_t *ProcStructLock = NULL;
74

75
/* Pointers to shared-memory structures */
76
PROC_HDR   *ProcGlobal = NULL;
77
NON_EXEC_STATIC PGPROC *AuxiliaryProcs = NULL;
78
PGPROC	   *PreparedXactProcs = NULL;
79

80 81
/* If we are waiting for a lock, this points to the associated LOCALLOCK */
static LOCALLOCK *lockAwaited = NULL;
82

83
static DeadLockState deadlock_state = DS_NOT_YET_CHECKED;
84

85 86
/* Is a deadlock check pending? */
static volatile sig_atomic_t got_deadlock_timeout;
87

88
static void RemoveProcFromArray(int code, Datum arg);
89
static void ProcKill(int code, Datum arg);
90
static void AuxiliaryProcKill(int code, Datum arg);
91
static void CheckDeadLock(void);
92

V
Vadim B. Mikheev 已提交
93

94 95 96
/*
 * Report shared-memory space needed by InitProcGlobal.
 */
97
Size
98
ProcGlobalShmemSize(void)
99
{
100 101 102 103
	Size		size = 0;

	/* ProcGlobal */
	size = add_size(size, sizeof(PROC_HDR));
104
	/* MyProcs, including autovacuum workers and launcher */
105
	size = add_size(size, mul_size(MaxBackends, sizeof(PGPROC)));
106 107 108 109
	/* AuxiliaryProcs */
	size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGPROC)));
	/* Prepared xacts */
	size = add_size(size, mul_size(max_prepared_xacts, sizeof(PGPROC)));
110 111
	/* ProcStructLock */
	size = add_size(size, sizeof(slock_t));
112

113 114 115 116
	size = add_size(size, mul_size(MaxBackends, sizeof(PGXACT)));
	size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGXACT)));
	size = add_size(size, mul_size(max_prepared_xacts, sizeof(PGXACT)));

117 118 119
	return size;
}

120 121 122 123
/*
 * Report number of semaphores needed by InitProcGlobal.
 */
int
124
ProcGlobalSemas(void)
125
{
126 127 128 129
	/*
	 * We need a sema per backend (including autovacuum), plus one for each
	 * auxiliary process.
	 */
130
	return MaxBackends + NUM_AUXILIARY_PROCS;
131 132
}

133 134
/*
 * InitProcGlobal -
135 136
 *	  Initialize the global process table during postmaster or standalone
 *	  backend startup.
137
 *
138
 *	  We also create all the per-process semaphores we will need to support
139 140 141 142 143 144 145
 *	  the requested number of backends.  We used to allocate semaphores
 *	  only when backends were actually started up, but that is bad because
 *	  it lets Postgres fail under load --- a lot of Unix systems are
 *	  (mis)configured with small limits on the number of semaphores, and
 *	  running out when trying to start another backend is a common failure.
 *	  So, now we grab enough semaphores to support the desired max number
 *	  of backends immediately at initialization --- if the sysadmin has set
146 147
 *	  MaxConnections, max_worker_processes, or autovacuum_max_workers higher
 *	  than his kernel will support, he'll find out sooner rather than later.
148 149 150 151
 *
 *	  Another reason for creating semaphores here is that the semaphore
 *	  implementation typically requires us to create semaphores in the
 *	  postmaster, not in backends.
152 153
 *
 * Note: this is NOT called by individual backends under a postmaster,
154
 * not even in the EXEC_BACKEND case.  The ProcGlobal and AuxiliaryProcs
155
 * pointers must be propagated specially for EXEC_BACKEND operation.
156 157
 */
void
158
InitProcGlobal(void)
159
{
160
	PGPROC	   *procs;
161
	PGXACT	   *pgxacts;
162 163
	int			i,
				j;
164
	bool		found;
165
	uint32		TotalProcs = MaxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts;
166

167
	/* Create the ProcGlobal shared structure */
168
	ProcGlobal = (PROC_HDR *)
169 170
		ShmemInitStruct("Proc Header", sizeof(PROC_HDR), &found);
	Assert(!found);
171

172 173 174
	/*
	 * Initialize the data structures.
	 */
R
Robert Haas 已提交
175
	ProcGlobal->spins_per_delay = DEFAULT_SPINS_PER_DELAY;
176 177
	ProcGlobal->freeProcs = NULL;
	ProcGlobal->autovacFreeProcs = NULL;
A
Alvaro Herrera 已提交
178
	ProcGlobal->bgworkerFreeProcs = NULL;
179 180 181
	ProcGlobal->startupProc = NULL;
	ProcGlobal->startupProcPid = 0;
	ProcGlobal->startupBufferPinWaitBufId = -1;
182 183
	ProcGlobal->walwriterLatch = NULL;
	ProcGlobal->checkpointerLatch = NULL;
184
	pg_atomic_init_u32(&ProcGlobal->firstClearXidElem, INVALID_PGPROCNO);
185

186
	/*
187
	 * Create and initialize all the PGPROC structures we'll need.  There are
A
Alvaro Herrera 已提交
188 189 190
	 * five separate consumers: (1) normal backends, (2) autovacuum workers
	 * and the autovacuum launcher, (3) background workers, (4) auxiliary
	 * processes, and (5) prepared transactions.  Each PGPROC structure is
B
Bruce Momjian 已提交
191 192
	 * dedicated to exactly one of these purposes, and they do not move
	 * between groups.
193
	 */
R
Robert Haas 已提交
194
	procs = (PGPROC *) ShmemAlloc(TotalProcs * sizeof(PGPROC));
195
	ProcGlobal->allProcs = procs;
196 197
	/* XXX allProcCount isn't really all of them; it excludes prepared xacts */
	ProcGlobal->allProcCount = MaxBackends + NUM_AUXILIARY_PROCS;
198 199 200 201
	if (!procs)
		ereport(FATAL,
				(errcode(ERRCODE_OUT_OF_MEMORY),
				 errmsg("out of shared memory")));
R
Robert Haas 已提交
202
	MemSet(procs, 0, TotalProcs * sizeof(PGPROC));
203 204 205 206 207 208

	/*
	 * Also allocate a separate array of PGXACT structures.  This is separate
	 * from the main PGPROC array so that the most heavily accessed data is
	 * stored contiguously in memory in as few cache lines as possible. This
	 * provides significant performance benefits, especially on a
209
	 * multiprocessor system.  There is one PGXACT structure for every PGPROC
210 211 212 213 214 215
	 * structure.
	 */
	pgxacts = (PGXACT *) ShmemAlloc(TotalProcs * sizeof(PGXACT));
	MemSet(pgxacts, 0, TotalProcs * sizeof(PGXACT));
	ProcGlobal->allPgXact = pgxacts;

R
Robert Haas 已提交
216
	for (i = 0; i < TotalProcs; i++)
217
	{
R
Robert Haas 已提交
218
		/* Common initialization for all PGPROCs, regardless of type. */
219

220
		/*
221 222 223
		 * Set up per-PGPROC semaphore, latch, and backendLock. Prepared xact
		 * dummy PGPROCs don't need these though - they're never associated
		 * with a real process
224 225 226 227 228
		 */
		if (i < MaxBackends + NUM_AUXILIARY_PROCS)
		{
			PGSemaphoreCreate(&(procs[i].sem));
			InitSharedLatch(&(procs[i].procLatch));
229
			LWLockInitialize(&(procs[i].backendLock), LWTRANCHE_PROC);
230 231
		}
		procs[i].pgprocno = i;
R
Robert Haas 已提交
232 233

		/*
A
Alvaro Herrera 已提交
234
		 * Newly created PGPROCs for normal backends, autovacuum and bgworkers
B
Bruce Momjian 已提交
235
		 * must be queued up on the appropriate free list.  Because there can
A
Alvaro Herrera 已提交
236 237
		 * only ever be a small, fixed number of auxiliary processes, no free
		 * list is used in that case; InitAuxiliaryProcess() instead uses a
B
Bruce Momjian 已提交
238
		 * linear search.   PGPROCs for prepared transactions are added to a
A
Alvaro Herrera 已提交
239
		 * free list by TwoPhaseShmemInit().
R
Robert Haas 已提交
240 241 242 243 244 245
		 */
		if (i < MaxConnections)
		{
			/* PGPROC for normal backend, add to freeProcs list */
			procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs;
			ProcGlobal->freeProcs = &procs[i];
246
			procs[i].procgloballist = &ProcGlobal->freeProcs;
R
Robert Haas 已提交
247
		}
A
Alvaro Herrera 已提交
248
		else if (i < MaxConnections + autovacuum_max_workers + 1)
R
Robert Haas 已提交
249 250 251 252
		{
			/* PGPROC for AV launcher/worker, add to autovacFreeProcs list */
			procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs;
			ProcGlobal->autovacFreeProcs = &procs[i];
253
			procs[i].procgloballist = &ProcGlobal->autovacFreeProcs;
R
Robert Haas 已提交
254
		}
A
Alvaro Herrera 已提交
255 256 257 258 259
		else if (i < MaxBackends)
		{
			/* PGPROC for bgworker, add to bgworkerFreeProcs list */
			procs[i].links.next = (SHM_QUEUE *) ProcGlobal->bgworkerFreeProcs;
			ProcGlobal->bgworkerFreeProcs = &procs[i];
260
			procs[i].procgloballist = &ProcGlobal->bgworkerFreeProcs;
A
Alvaro Herrera 已提交
261
		}
262 263 264 265

		/* Initialize myProcLocks[] shared memory queues. */
		for (j = 0; j < NUM_LOCK_PARTITIONS; j++)
			SHMQueueInit(&(procs[i].myProcLocks[j]));
266 267 268

		/* Initialize lockGroupMembers list. */
		dlist_init(&procs[i].lockGroupMembers);
269 270
	}

271
	/*
272 273
	 * Save pointers to the blocks of PGPROC structures reserved for auxiliary
	 * processes and prepared transactions.
274
	 */
R
Robert Haas 已提交
275
	AuxiliaryProcs = &procs[MaxBackends];
276
	PreparedXactProcs = &procs[MaxBackends + NUM_AUXILIARY_PROCS];
277 278 279 280

	/* Create ProcStructLock spinlock, too */
	ProcStructLock = (slock_t *) ShmemAlloc(sizeof(slock_t));
	SpinLockInit(ProcStructLock);
281 282
}

283
/*
284
 * InitProcess -- initialize a per-process data structure for this backend
285 286
 */
void
287
InitProcess(void)
288
{
289
	PGPROC * volatile * procgloballist;
290 291

	/*
292 293
	 * ProcGlobal should be set up already (if we are a backend, we inherit
	 * this by fork() or EXEC_BACKEND mechanism from the postmaster).
294
	 */
295
	if (ProcGlobal == NULL)
296
		elog(PANIC, "proc header uninitialized");
297 298

	if (MyProc != NULL)
299
		elog(ERROR, "you already exist");
300

301 302
	/* Decide which list should supply our PGPROC. */
	if (IsAnyAutoVacuumProcess())
303
		procgloballist = &ProcGlobal->autovacFreeProcs;
304
	else if (IsBackgroundWorker)
305
		procgloballist = &ProcGlobal->bgworkerFreeProcs;
306
	else
307
		procgloballist = &ProcGlobal->freeProcs;
308

309
	/*
310 311
	 * Try to get a proc struct from the appropriate free list.  If this
	 * fails, we must be out of PGPROC structures (not to mention semaphores).
312
	 *
B
Bruce Momjian 已提交
313 314
	 * While we are holding the ProcStructLock, also copy the current shared
	 * estimate of spins_per_delay to local storage.
315
	 */
316
	SpinLockAcquire(ProcStructLock);
317

318
	set_spins_per_delay(ProcGlobal->spins_per_delay);
319

320
	MyProc = *procgloballist;
321

322
	if (MyProc != NULL)
323
	{
324
		*procgloballist = (PGPROC *) MyProc->links.next;
325
		SpinLockRelease(ProcStructLock);
326 327 328 329
	}
	else
	{
		/*
B
Bruce Momjian 已提交
330 331
		 * If we reach here, all the PGPROCs are in use.  This is one of the
		 * possible places to detect "too many backends", so give the standard
332 333
		 * error message.  XXX do we need to give a different failure message
		 * in the autovacuum case?
334
		 */
335
		SpinLockRelease(ProcStructLock);
336 337 338
		ereport(FATAL,
				(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
				 errmsg("sorry, too many clients already")));
339
	}
340
	MyPgXact = &ProcGlobal->allPgXact[MyProc->pgprocno];
341

342 343 344 345 346 347
	/*
	 * Cross-check that the PGPROC is of the type we expect; if this were
	 * not the case, it would get returned to the wrong list.
	 */
	Assert(MyProc->procgloballist == procgloballist);

348 349
	/*
	 * Now that we have a PGPROC, mark ourselves as an active postmaster
350
	 * child; this is so that the postmaster can detect it if we exit without
351 352
	 * cleaning up.  (XXX autovac launcher currently doesn't participate in
	 * this; it probably should.)
353
	 */
354
	if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess())
355
		MarkPostmasterChildActive();
356

357
	/*
358 359
	 * Initialize all fields of MyProc, except for those previously
	 * initialized by InitProcGlobal.
360
	 */
361
	SHMQueueElemInit(&(MyProc->links));
362
	MyProc->waitStatus = STATUS_OK;
363
	MyProc->lxid = InvalidLocalTransactionId;
364 365
	MyProc->fpVXIDLock = false;
	MyProc->fpLocalTransactionId = InvalidLocalTransactionId;
366 367
	MyPgXact->xid = InvalidTransactionId;
	MyPgXact->xmin = InvalidTransactionId;
368
	MyProc->pid = MyProcPid;
369 370
	/* backendId, databaseId and roleId will be filled in later */
	MyProc->backendId = InvalidBackendId;
371
	MyProc->databaseId = InvalidOid;
372
	MyProc->roleId = InvalidOid;
373
	MyPgXact->delayChkpt = false;
374
	MyPgXact->vacuumFlags = 0;
375
	/* NB -- autovac launcher intentionally does not set IS_AUTOVACUUM */
376
	if (IsAutoVacuumWorkerProcess())
377
		MyPgXact->vacuumFlags |= PROC_IS_AUTOVACUUM;
378
	MyProc->lwWaiting = false;
379
	MyProc->lwWaitMode = 0;
380
	MyProc->waitLock = NULL;
381
	MyProc->waitProcLock = NULL;
382 383
#ifdef USE_ASSERT_CHECKING
	{
384
		int			i;
385 386 387 388 389 390

		/* Last process should have released all locks. */
		for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
			Assert(SHMQueueEmpty(&(MyProc->myProcLocks[i])));
	}
#endif
391
	MyProc->recoveryConflictPending = false;
392

393
	/* Initialize fields for sync rep */
394
	MyProc->waitLSN = 0;
395 396
	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
	SHMQueueElemInit(&(MyProc->syncRepLinks));
397

398
	/* Initialize fields for group XID clearing. */
399
	MyProc->clearXid = false;
400 401 402
	MyProc->backendLatestXid = InvalidTransactionId;
	pg_atomic_init_u32(&MyProc->nextClearXidElem, INVALID_PGPROCNO);

403 404 405 406 407
	/* Check that group locking fields are in a proper initial state. */
	Assert(MyProc->lockGroupLeaderIdentifier == 0);
	Assert(MyProc->lockGroupLeader == NULL);
	Assert(dlist_is_empty(&MyProc->lockGroupMembers));

408
	/*
409 410 411
	 * Acquire ownership of the PGPROC's latch, so that we can use WaitLatch
	 * on it.  That allows us to repoint the process latch, which so far
	 * points to process local one, to the shared one.
412 413
	 */
	OwnLatch(&MyProc->procLatch);
414
	SwitchToSharedLatch();
415

416
	/*
417
	 * We might be reusing a semaphore that belonged to a failed process. So
B
Bruce Momjian 已提交
418
	 * be careful and reinitialize its value here.  (This is not strictly
419
	 * necessary anymore, but seems like a good idea for cleanliness.)
420
	 */
421
	PGSemaphoreReset(&MyProc->sem);
422

423
	/*
424
	 * Arrange to clean up at backend exit.
425
	 */
426
	on_shmem_exit(ProcKill, 0);
427 428

	/*
B
Bruce Momjian 已提交
429
	 * Now that we have a PGPROC, we could try to acquire locks, so initialize
430
	 * local state needed for LWLocks, and the deadlock checker.
431
	 */
432
	InitLWLockAccess();
433
	InitDeadLockChecking();
434 435
}

436 437 438 439
/*
 * InitProcessPhase2 -- make MyProc visible in the shared ProcArray.
 *
 * This is separate from InitProcess because we can't acquire LWLocks until
440 441
 * we've created a PGPROC, but in the EXEC_BACKEND case ProcArrayAdd won't
 * work until after we've done CreateSharedMemoryAndSemaphores.
442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458
 */
void
InitProcessPhase2(void)
{
	Assert(MyProc != NULL);

	/*
	 * Add our PGPROC to the PGPROC array in shared memory.
	 */
	ProcArrayAdd(MyProc);

	/*
	 * Arrange to clean that up at backend exit.
	 */
	on_shmem_exit(RemoveProcFromArray, 0);
}

459
/*
460
 * InitAuxiliaryProcess -- create a per-auxiliary-process data structure
461
 *
462 463
 * This is called by bgwriter and similar processes so that they will have a
 * MyProc value that's real enough to let them wait for LWLocks.  The PGPROC
464
 * and sema that are assigned are one of the extra ones created during
465
 * InitProcGlobal.
466
 *
467
 * Auxiliary processes are presently not expected to wait for real (lockmgr)
468
 * locks, so we need not set up the deadlock checker.  They are never added
B
Bruce Momjian 已提交
469
 * to the ProcArray or the sinval messaging mechanism, either.  They also
470 471
 * don't get a VXID assigned, since this is only useful when we actually
 * hold lockmgr locks.
472 473 474 475 476
 *
 * Startup process however uses locks but never waits for them in the
 * normal backend sense. Startup process also takes part in sinval messaging
 * as a sendOnly process, so never reads messages from sinval queue. So
 * Startup process does have a VXID and does show up in pg_locks.
477 478
 */
void
479
InitAuxiliaryProcess(void)
480
{
481
	PGPROC	   *auxproc;
482
	int			proctype;
J
Jan Wieck 已提交
483

484
	/*
485 486
	 * ProcGlobal should be set up already (if we are a backend, we inherit
	 * this by fork() or EXEC_BACKEND mechanism from the postmaster).
487
	 */
488
	if (ProcGlobal == NULL || AuxiliaryProcs == NULL)
489
		elog(PANIC, "proc header uninitialized");
490 491

	if (MyProc != NULL)
492
		elog(ERROR, "you already exist");
493

494
	/*
495
	 * We use the ProcStructLock to protect assignment and releasing of
496
	 * AuxiliaryProcs entries.
497
	 *
B
Bruce Momjian 已提交
498 499
	 * While we are holding the ProcStructLock, also copy the current shared
	 * estimate of spins_per_delay to local storage.
500 501 502 503 504
	 */
	SpinLockAcquire(ProcStructLock);

	set_spins_per_delay(ProcGlobal->spins_per_delay);

505
	/*
506
	 * Find a free auxproc ... *big* trouble if there isn't one ...
507
	 */
508
	for (proctype = 0; proctype < NUM_AUXILIARY_PROCS; proctype++)
509
	{
510 511
		auxproc = &AuxiliaryProcs[proctype];
		if (auxproc->pid == 0)
512 513
			break;
	}
514
	if (proctype >= NUM_AUXILIARY_PROCS)
515 516
	{
		SpinLockRelease(ProcStructLock);
517
		elog(FATAL, "all AuxiliaryProcs are in use");
518
	}
519

520
	/* Mark auxiliary proc as in use by me */
521
	/* use volatile pointer to prevent code rearrangement */
522
	((volatile PGPROC *) auxproc)->pid = MyProcPid;
523

524
	MyProc = auxproc;
525
	MyPgXact = &ProcGlobal->allPgXact[auxproc->pgprocno];
526 527 528

	SpinLockRelease(ProcStructLock);

529
	/*
530 531
	 * Initialize all fields of MyProc, except for those previously
	 * initialized by InitProcGlobal.
532 533
	 */
	SHMQueueElemInit(&(MyProc->links));
534
	MyProc->waitStatus = STATUS_OK;
535
	MyProc->lxid = InvalidLocalTransactionId;
536 537
	MyProc->fpVXIDLock = false;
	MyProc->fpLocalTransactionId = InvalidLocalTransactionId;
538 539
	MyPgXact->xid = InvalidTransactionId;
	MyPgXact->xmin = InvalidTransactionId;
540
	MyProc->backendId = InvalidBackendId;
541
	MyProc->databaseId = InvalidOid;
542
	MyProc->roleId = InvalidOid;
543
	MyPgXact->delayChkpt = false;
544
	MyPgXact->vacuumFlags = 0;
545
	MyProc->lwWaiting = false;
546
	MyProc->lwWaitMode = 0;
547
	MyProc->waitLock = NULL;
548
	MyProc->waitProcLock = NULL;
549 550
#ifdef USE_ASSERT_CHECKING
	{
551
		int			i;
552 553 554 555 556 557

		/* Last process should have released all locks. */
		for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
			Assert(SHMQueueEmpty(&(MyProc->myProcLocks[i])));
	}
#endif
558

559
	/*
560 561 562
	 * Acquire ownership of the PGPROC's latch, so that we can use WaitLatch
	 * on it.  That allows us to repoint the process latch, which so far
	 * points to process local one, to the shared one.
563 564
	 */
	OwnLatch(&MyProc->procLatch);
565
	SwitchToSharedLatch();
566

567 568 569 570 571
	/* Check that group locking fields are in a proper initial state. */
	Assert(MyProc->lockGroupLeaderIdentifier == 0);
	Assert(MyProc->lockGroupLeader == NULL);
	Assert(dlist_is_empty(&MyProc->lockGroupMembers));

572
	/*
B
Bruce Momjian 已提交
573
	 * We might be reusing a semaphore that belonged to a failed process. So
B
Bruce Momjian 已提交
574
	 * be careful and reinitialize its value here.  (This is not strictly
575
	 * necessary anymore, but seems like a good idea for cleanliness.)
576
	 */
577
	PGSemaphoreReset(&MyProc->sem);
578 579 580 581

	/*
	 * Arrange to clean up at process exit.
	 */
582
	on_shmem_exit(AuxiliaryProcKill, Int32GetDatum(proctype));
583 584
}

585 586 587 588 589 590 591 592 593
/*
 * Record the PID and PGPROC structures for the Startup process, for use in
 * ProcSendSignal().  See comments there for further explanation.
 */
void
PublishStartupProcessInformation(void)
{
	SpinLockAcquire(ProcStructLock);

594 595
	ProcGlobal->startupProc = MyProc;
	ProcGlobal->startupProcPid = MyProcPid;
596 597 598 599

	SpinLockRelease(ProcStructLock);
}

600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624
/*
 * Used from bufgr to share the value of the buffer that Startup waits on,
 * or to reset the value to "not waiting" (-1). This allows processing
 * of recovery conflicts for buffer pins. Set is made before backends look
 * at this value, so locking not required, especially since the set is
 * an atomic integer set operation.
 */
void
SetStartupBufferPinWaitBufId(int bufid)
{
	/* use volatile pointer to prevent code rearrangement */
	volatile PROC_HDR *procglobal = ProcGlobal;

	procglobal->startupBufferPinWaitBufId = bufid;
}

/*
 * Used by backends when they receive a request to check for buffer pin waits.
 */
int
GetStartupBufferPinWaitBufId(void)
{
	/* use volatile pointer to prevent code rearrangement */
	volatile PROC_HDR *procglobal = ProcGlobal;

625
	return procglobal->startupBufferPinWaitBufId;
626 627
}

628 629 630 631 632 633 634 635 636
/*
 * Check whether there are at least N free PGPROC objects.
 *
 * Note: this is designed on the assumption that N will generally be small.
 */
bool
HaveNFreeProcs(int n)
{
	PGPROC	   *proc;
B
Bruce Momjian 已提交
637

638 639
	SpinLockAcquire(ProcStructLock);

640
	proc = ProcGlobal->freeProcs;
641

642
	while (n > 0 && proc != NULL)
643
	{
644
		proc = (PGPROC *) proc->links.next;
645 646 647 648 649 650 651 652
		n--;
	}

	SpinLockRelease(ProcStructLock);

	return (n <= 0);
}

653 654 655
/*
 * Check if the current process is awaiting a lock.
 */
656 657 658 659 660 661 662 663 664
bool
IsWaitingForLock(void)
{
	if (lockAwaited == NULL)
		return false;

	return true;
}

665
/*
666 667
 * Cancel any pending wait for lock, when aborting a transaction, and revert
 * any strong lock count acquisition for a lock being acquired.
668 669
 *
 * (Normally, this would only happen if we accept a cancel/die
670 671
 * interrupt while waiting; but an ereport(ERROR) before or during the lock
 * wait is within the realm of possibility, too.)
672
 */
673
void
674
LockErrorCleanup(void)
675
{
676
	LWLock	   *partitionLock;
677
	DisableTimeoutParams timeouts[2];
678

679 680
	HOLD_INTERRUPTS();

681 682
	AbortStrongLockAcquire();

683
	/* Nothing to do if we weren't waiting for a lock */
684
	if (lockAwaited == NULL)
685 686
	{
		RESUME_INTERRUPTS();
687
		return;
688
	}
689

690 691 692 693 694 695 696 697 698 699 700 701 702
	/*
	 * Turn off the deadlock and lock timeout timers, if they are still
	 * running (see ProcSleep).  Note we must preserve the LOCK_TIMEOUT
	 * indicator flag, since this function is executed before
	 * ProcessInterrupts when responding to SIGINT; else we'd lose the
	 * knowledge that the SIGINT came from a lock timeout and not an external
	 * source.
	 */
	timeouts[0].id = DEADLOCK_TIMEOUT;
	timeouts[0].keep_indicator = false;
	timeouts[1].id = LOCK_TIMEOUT;
	timeouts[1].keep_indicator = true;
	disable_timeouts(timeouts, 2);
703 704

	/* Unlink myself from the wait queue, if on it (might not be anymore!) */
705
	partitionLock = LockHashPartitionLock(lockAwaited->hashcode);
706
	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
707

708
	if (MyProc->links.next != NULL)
709 710
	{
		/* We could not have been granted the lock yet */
711
		RemoveFromWaitQueue(MyProc, lockAwaited->hashcode);
712 713 714 715 716
	}
	else
	{
		/*
		 * Somebody kicked us off the lock queue already.  Perhaps they
B
Bruce Momjian 已提交
717 718 719
		 * granted us the lock, or perhaps they detected a deadlock. If they
		 * did grant us the lock, we'd better remember it in our local lock
		 * table.
720
		 */
721 722
		if (MyProc->waitStatus == STATUS_OK)
			GrantAwaitedLock();
723 724
	}

725
	lockAwaited = NULL;
726

727
	LWLockRelease(partitionLock);
H
Hiroshi Inoue 已提交
728

729
	RESUME_INTERRUPTS();
H
Hiroshi Inoue 已提交
730
}
731

732

733
/*
734
 * ProcReleaseLocks() -- release locks associated with current transaction
735
 *			at main transaction commit or abort
736
 *
737
 * At main transaction commit, we release standard locks except session locks.
738
 * At main transaction abort, we release all locks including session locks.
739
 *
740 741 742
 * Advisory locks are released only if they are transaction-level;
 * session-level holds remain, whether this is a commit or not.
 *
743
 * At subtransaction commit, we don't release any locks (so this func is not
744
 * needed at all); we will defer the releasing to the parent transaction.
745
 * At subtransaction abort, we release all locks held by the subtransaction;
746 747
 * this is implemented by retail releasing of the locks under control of
 * the ResourceOwner mechanism.
748 749
 */
void
750
ProcReleaseLocks(bool isCommit)
751
{
752 753
	if (!MyProc)
		return;
754
	/* If waiting, get off wait queue (should only be needed after error) */
755
	LockErrorCleanup();
756
	/* Release standard locks, including session-level if aborting */
757
	LockReleaseAll(DEFAULT_LOCKMETHOD, !isCommit);
758
	/* Release transaction-level advisory locks */
759
	LockReleaseAll(USER_LOCKMETHOD, false);
760 761 762
}


763 764 765 766 767 768 769
/*
 * RemoveProcFromArray() -- Remove this process from the shared ProcArray.
 */
static void
RemoveProcFromArray(int code, Datum arg)
{
	Assert(MyProc != NULL);
770
	ProcArrayRemove(MyProc, InvalidTransactionId);
771 772
}

773 774
/*
 * ProcKill() -- Destroy the per-proc data structure for
775
 *		this process. Release any of its held LW locks.
776 777
 */
static void
778
ProcKill(int code, Datum arg)
779
{
780
	PGPROC	   *proc;
781
	PGPROC * volatile * procgloballist;
782

783
	Assert(MyProc != NULL);
784

785 786 787
	/* Make sure we're out of the sync rep lists */
	SyncRepCleanupAtProcExit();

788 789
#ifdef USE_ASSERT_CHECKING
	{
790
		int			i;
791 792 793 794 795 796 797

		/* Last process should have released all locks. */
		for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
			Assert(SHMQueueEmpty(&(MyProc->myProcLocks[i])));
	}
#endif

798
	/*
B
Bruce Momjian 已提交
799 800
	 * Release any LW locks I am holding.  There really shouldn't be any, but
	 * it's cheap to check again before we cut the knees off the LWLock
801
	 * facility by releasing our PGPROC ...
802
	 */
803
	LWLockReleaseAll();
804

R
Robert Haas 已提交
805 806 807 808
	/* Make sure active replication slots are released */
	if (MyReplicationSlot != NULL)
		ReplicationSlotRelease();

809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842
	/*
	 * Detach from any lock group of which we are a member.  If the leader
	 * exist before all other group members, it's PGPROC will remain allocated
	 * until the last group process exits; that process must return the
	 * leader's PGPROC to the appropriate list.
	 */
	if (MyProc->lockGroupLeader != NULL)
	{
		PGPROC	   *leader = MyProc->lockGroupLeader;
		LWLock	   *leader_lwlock = LockHashPartitionLockByProc(leader);

		LWLockAcquire(leader_lwlock, LW_EXCLUSIVE);
		Assert(!dlist_is_empty(&leader->lockGroupMembers));
		dlist_delete(&MyProc->lockGroupLink);
		if (dlist_is_empty(&leader->lockGroupMembers))
		{
			leader->lockGroupLeaderIdentifier = 0;
			leader->lockGroupLeader = NULL;
			if (leader != MyProc)
			{
				procgloballist = leader->procgloballist;

				/* Leader exited first; return its PGPROC. */
				SpinLockAcquire(ProcStructLock);
				leader->links.next = (SHM_QUEUE *) *procgloballist;
				*procgloballist = leader;
				SpinLockRelease(ProcStructLock);
			}
		}
		else if (leader != MyProc)
			MyProc->lockGroupLeader = NULL;
		LWLockRelease(leader_lwlock);
	}

843
	/*
844 845 846 847
	 * Reset MyLatch to the process local one.  This is so that signal
	 * handlers et al can continue using the latch after the shared latch
	 * isn't ours anymore. After that clear MyProc and disown the shared
	 * latch.
848
	 */
849
	SwitchBackToLocalLatch();
850 851 852
	proc = MyProc;
	MyProc = NULL;
	DisownLatch(&proc->procLatch);
853

854
	procgloballist = proc->procgloballist;
855
	SpinLockAcquire(ProcStructLock);
856

857 858 859 860 861 862 863 864 865 866 867 868 869 870
	/*
	 * If we're still a member of a locking group, that means we're a leader
	 * which has somehow exited before its children.  The last remaining child
	 * will release our PGPROC.  Otherwise, release it now.
	 */
	if (proc->lockGroupLeader == NULL)
	{
		/* Since lockGroupLeader is NULL, lockGroupMembers should be empty. */
		Assert(dlist_is_empty(&proc->lockGroupMembers));

		/* Return PGPROC structure (and semaphore) to appropriate freelist */
		proc->links.next = (SHM_QUEUE *) *procgloballist;
		*procgloballist = proc;
	}
871

872
	/* Update shared estimate of spins_per_delay */
873
	ProcGlobal->spins_per_delay = update_spins_per_delay(ProcGlobal->spins_per_delay);
874

875
	SpinLockRelease(ProcStructLock);
876

877 878
	/*
	 * This process is no longer present in shared memory in any meaningful
B
Bruce Momjian 已提交
879 880
	 * way, so tell the postmaster we've cleaned up acceptably well. (XXX
	 * autovac launcher should be included here someday)
881
	 */
882
	if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess())
883 884
		MarkPostmasterChildInactive();

885 886
	/* wake autovac launcher if needed -- see comments in FreeWorkerInfo */
	if (AutovacuumLauncherPid != 0)
887
		kill(AutovacuumLauncherPid, SIGUSR2);
888 889 890
}

/*
891
 * AuxiliaryProcKill() -- Cut-down version of ProcKill for auxiliary
B
Bruce Momjian 已提交
892
 *		processes (bgwriter, etc).  The PGPROC and sema are not released, only
893
 *		marked as not-in-use.
894 895
 */
static void
896
AuxiliaryProcKill(int code, Datum arg)
897
{
B
Bruce Momjian 已提交
898
	int			proctype = DatumGetInt32(arg);
899
	PGPROC	   *auxproc PG_USED_FOR_ASSERTS_ONLY;
900
	PGPROC	   *proc;
J
Jan Wieck 已提交
901

902
	Assert(proctype >= 0 && proctype < NUM_AUXILIARY_PROCS);
J
Jan Wieck 已提交
903

904
	auxproc = &AuxiliaryProcs[proctype];
J
Jan Wieck 已提交
905

906
	Assert(MyProc == auxproc);
907

908
	/* Release any LW locks I am holding (see notes above) */
909 910
	LWLockReleaseAll();

911
	/*
912 913 914 915
	 * Reset MyLatch to the process local one.  This is so that signal
	 * handlers et al can continue using the latch after the shared latch
	 * isn't ours anymore. After that clear MyProc and disown the shared
	 * latch.
916
	 */
917
	SwitchBackToLocalLatch();
918 919 920
	proc = MyProc;
	MyProc = NULL;
	DisownLatch(&proc->procLatch);
921

922 923
	SpinLockAcquire(ProcStructLock);

924
	/* Mark auxiliary proc no longer in use */
925
	proc->pid = 0;
926 927 928 929 930

	/* Update shared estimate of spins_per_delay */
	ProcGlobal->spins_per_delay = update_spins_per_delay(ProcGlobal->spins_per_delay);

	SpinLockRelease(ProcStructLock);
931 932
}

933

934 935
/*
 * ProcQueue package: routines for putting processes to sleep
936
 *		and  waking them up
937 938 939 940 941
 */

/*
 * ProcQueueAlloc -- alloc/attach to a shared memory process queue
 *
942 943
 * Returns: a pointer to the queue
 * Side Effects: Initializes the queue if it wasn't there before
944
 */
945
#ifdef NOT_USED
946
PROC_QUEUE *
947
ProcQueueAlloc(const char *name)
948
{
949
	PROC_QUEUE *queue;
950
	bool		found;
951

952 953 954
	queue = (PROC_QUEUE *)
		ShmemInitStruct(name, sizeof(PROC_QUEUE), &found);

955 956
	if (!found)
		ProcQueueInit(queue);
957

958
	return queue;
959
}
960
#endif
961 962 963 964 965

/*
 * ProcQueueInit -- initialize a shared memory process queue
 */
void
966
ProcQueueInit(PROC_QUEUE *queue)
967
{
968 969
	SHMQueueInit(&(queue->links));
	queue->size = 0;
970 971 972 973
}


/*
974
 * ProcSleep -- put a process to sleep on the specified lock
975
 *
976 977
 * Caller must have set MyProc->heldLocks to reflect locks already held
 * on the lockable object by this process (under all XIDs).
978
 *
979
 * The lock table's partition lock must be held at entry, and will be held
980
 * at exit.
981
 *
982
 * Result: STATUS_OK if we acquired the lock, STATUS_ERROR if not (deadlock).
983
 *
984
 * ASSUME: that no one will fiddle with the queue until after
985
 *		we release the partition lock.
986 987 988 989
 *
 * NOTES: The process queue is now a priority queue for locking.
 */
int
990
ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
991
{
992 993 994
	LOCKMODE	lockmode = locallock->tag.mode;
	LOCK	   *lock = locallock->lock;
	PROCLOCK   *proclock = locallock->proclock;
995
	uint32		hashcode = locallock->hashcode;
996
	LWLock	   *partitionLock = LockHashPartitionLock(hashcode);
997
	PROC_QUEUE *waitQueue = &(lock->waitProcs);
998
	LOCKMASK	myHeldLocks = MyProc->heldLocks;
999
	bool		early_deadlock = false;
B
Bruce Momjian 已提交
1000
	bool		allow_autovacuum_cancel = true;
1001
	int			myWaitStatus;
J
Jan Wieck 已提交
1002
	PGPROC	   *proc;
1003
	PGPROC	   *leader = MyProc->lockGroupLeader;
1004
	int			i;
1005

1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026
	/*
	 * If group locking is in use, locks held my members of my locking group
	 * need to be included in myHeldLocks.
	 */
	if (leader != NULL)
	{
		SHM_QUEUE  *procLocks = &(lock->procLocks);
		PROCLOCK   *otherproclock;

		otherproclock = (PROCLOCK *)
			SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink));
		while (otherproclock != NULL)
		{
			if (otherproclock->groupLeader == leader)
				myHeldLocks |= otherproclock->holdMask;
			otherproclock = (PROCLOCK *)
				SHMQueueNext(procLocks, &otherproclock->lockLink,
							 offsetof(PROCLOCK, lockLink));
		}
	}

1027
	/*
1028 1029
	 * Determine where to add myself in the wait queue.
	 *
1030 1031 1032 1033
	 * Normally I should go at the end of the queue.  However, if I already
	 * hold locks that conflict with the request of any previous waiter, put
	 * myself in the queue just in front of the first such waiter. This is not
	 * a necessary step, since deadlock detection would move me to before that
B
Bruce Momjian 已提交
1034 1035
	 * waiter anyway; but it's relatively cheap to detect such a conflict
	 * immediately, and avoid delaying till deadlock timeout.
1036
	 *
1037 1038
	 * Special case: if I find I should go in front of some waiter, check to
	 * see if I conflict with already-held locks or the requests before that
B
Bruce Momjian 已提交
1039
	 * waiter.  If not, then just grant myself the requested lock immediately.
B
Bruce Momjian 已提交
1040 1041 1042
	 * This is the same as the test for immediate grant in LockAcquire, except
	 * we are only considering the part of the wait queue before my insertion
	 * point.
1043 1044
	 */
	if (myHeldLocks != 0)
V
Vadim B. Mikheev 已提交
1045
	{
1046
		LOCKMASK	aheadRequests = 0;
1047

1048
		proc = (PGPROC *) waitQueue->links.next;
1049
		for (i = 0; i < waitQueue->size; i++)
V
Vadim B. Mikheev 已提交
1050
		{
1051 1052 1053 1054 1055 1056 1057 1058 1059
			/*
			 * If we're part of the same locking group as this waiter, its
			 * locks neither conflict with ours nor contribute to aheadRequsts.
			 */
			if (leader != NULL && leader == proc->lockGroupLeader)
			{
				proc = (PGPROC *) proc->links.next;
				continue;
			}
1060
			/* Must he wait for me? */
B
Bruce Momjian 已提交
1061
			if (lockMethodTable->conflictTab[proc->waitLockMode] & myHeldLocks)
V
Vadim B. Mikheev 已提交
1062
			{
1063
				/* Must I wait for him ? */
B
Bruce Momjian 已提交
1064
				if (lockMethodTable->conflictTab[lockmode] & proc->heldLocks)
1065
				{
1066
					/*
B
Bruce Momjian 已提交
1067
					 * Yes, so we have a deadlock.  Easiest way to clean up
B
Bruce Momjian 已提交
1068 1069 1070 1071
					 * correctly is to call RemoveFromWaitQueue(), but we
					 * can't do that until we are *on* the wait queue. So, set
					 * a flag to check below, and break out of loop.  Also,
					 * record deadlock info for later message.
1072
					 */
1073
					RememberSimpleDeadLock(MyProc, lockmode, lock, proc);
1074 1075
					early_deadlock = true;
					break;
1076
				}
1077
				/* I must go before this waiter.  Check special case. */
B
Bruce Momjian 已提交
1078
				if ((lockMethodTable->conflictTab[lockmode] & aheadRequests) == 0 &&
1079 1080 1081
					LockCheckConflicts(lockMethodTable,
									   lockmode,
									   lock,
1082
									   proclock) == STATUS_OK)
1083
				{
1084
					/* Skip the wait and just grant myself the lock. */
1085
					GrantLock(lock, proclock, lockmode);
1086
					GrantAwaitedLock();
1087
					return STATUS_OK;
1088 1089
				}
				/* Break out of loop to put myself before him */
V
Vadim B. Mikheev 已提交
1090
				break;
1091
			}
1092
			/* Nope, so advance to next waiter */
1093
			aheadRequests |= LOCKBIT_ON(proc->waitLockMode);
1094
			proc = (PGPROC *) proc->links.next;
V
Vadim B. Mikheev 已提交
1095
		}
B
Bruce Momjian 已提交
1096

1097
		/*
B
Bruce Momjian 已提交
1098 1099
		 * If we fall out of loop normally, proc points to waitQueue head, so
		 * we will insert at tail of queue as desired.
1100
		 */
1101 1102 1103 1104
	}
	else
	{
		/* I hold no locks, so I can't push in front of anyone. */
J
Jan Wieck 已提交
1105
		proc = (PGPROC *) &(waitQueue->links);
V
Vadim B. Mikheev 已提交
1106
	}
1107

1108
	/*
B
Bruce Momjian 已提交
1109
	 * Insert self into queue, ahead of the given proc (or at tail of queue).
1110
	 */
1111
	SHMQueueInsertBefore(&(proc->links), &(MyProc->links));
B
Bruce Momjian 已提交
1112
	waitQueue->size++;
1113

1114
	lock->waitMask |= LOCKBIT_ON(lockmode);
1115

J
Jan Wieck 已提交
1116
	/* Set up wait information in PGPROC object, too */
1117
	MyProc->waitLock = lock;
1118
	MyProc->waitProcLock = proclock;
1119 1120
	MyProc->waitLockMode = lockmode;

1121
	MyProc->waitStatus = STATUS_WAITING;
1122 1123

	/*
B
Bruce Momjian 已提交
1124 1125 1126
	 * If we detected deadlock, give up without waiting.  This must agree with
	 * CheckDeadLock's recovery code, except that we shouldn't release the
	 * semaphore since we haven't tried to lock it yet.
1127 1128 1129
	 */
	if (early_deadlock)
	{
1130
		RemoveFromWaitQueue(MyProc, hashcode);
1131 1132
		return STATUS_ERROR;
	}
1133

1134
	/* mark that we are waiting for a lock */
1135
	lockAwaited = locallock;
1136

1137
	/*
1138
	 * Release the lock table's partition lock.
1139
	 *
1140
	 * NOTE: this may also cause us to exit critical-section state, possibly
B
Bruce Momjian 已提交
1141 1142
	 * allowing a cancel/die interrupt to be accepted. This is OK because we
	 * have recorded the fact that we are waiting for a lock, and so
1143
	 * LockErrorCleanup will clean up if cancel/die happens.
1144
	 */
1145
	LWLockRelease(partitionLock);
1146

1147 1148 1149
	/*
	 * Also, now that we will successfully clean up after an ereport, it's
	 * safe to check to see if there's a buffer pin deadlock against the
1150 1151
	 * Startup process.  Of course, that's only necessary if we're doing Hot
	 * Standby and are not the Startup process ourselves.
1152 1153 1154 1155
	 */
	if (RecoveryInProgress() && !InRecovery)
		CheckRecoveryConflictDeadlock();

1156
	/* Reset deadlock_state before enabling the timeout handler */
1157
	deadlock_state = DS_NOT_YET_CHECKED;
1158
	got_deadlock_timeout = false;
1159

1160
	/*
B
Bruce Momjian 已提交
1161 1162 1163 1164
	 * Set timer so we can wake up after awhile and check for a deadlock. If a
	 * deadlock is detected, the handler releases the process's semaphore and
	 * sets MyProc->waitStatus = STATUS_ERROR, allowing us to know that we
	 * must report failure rather than success.
1165
	 *
1166 1167
	 * By delaying the check until we've waited for a bit, we can avoid
	 * running the rather expensive deadlock-check code in most cases.
1168 1169 1170
	 *
	 * If LockTimeout is set, also enable the timeout for that.  We can save a
	 * few cycles by enabling both timeout sources in one call.
1171
	 */
1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185
	if (LockTimeout > 0)
	{
		EnableTimeoutParams timeouts[2];

		timeouts[0].id = DEADLOCK_TIMEOUT;
		timeouts[0].type = TMPARAM_AFTER;
		timeouts[0].delay_ms = DeadlockTimeout;
		timeouts[1].id = LOCK_TIMEOUT;
		timeouts[1].type = TMPARAM_AFTER;
		timeouts[1].delay_ms = LockTimeout;
		enable_timeouts(timeouts, 2);
	}
	else
		enable_timeout_after(DEADLOCK_TIMEOUT, DeadlockTimeout);
1186

1187
	/*
1188 1189 1190 1191
	 * If somebody wakes us between LWLockRelease and WaitLatch, the latch
	 * will not wait. But a set latch does not necessarily mean that the lock
	 * is free now, as there are many other sources for latch sets than
	 * somebody releasing the lock.
1192
	 *
1193 1194 1195 1196 1197 1198 1199
	 * We process interrupts whenever the latch has been set, so cancel/die
	 * interrupts are processed quickly. This means we must not mind losing
	 * control to a cancel/die interrupt here.  We don't, because we have no
	 * shared-state-change work to do after being granted the lock (the
	 * grantor did it all).  We do have to worry about canceling the deadlock
	 * timeout and updating the locallock table, but if we lose control to an
	 * error, LockErrorCleanup will fix that up.
1200
	 */
B
Bruce Momjian 已提交
1201 1202
	do
	{
1203 1204 1205 1206 1207 1208 1209 1210 1211
		WaitLatch(MyLatch, WL_LATCH_SET, 0);
		ResetLatch(MyLatch);
		/* check for deadlocks first, as that's probably log-worthy */
		if (got_deadlock_timeout)
		{
			CheckDeadLock();
			got_deadlock_timeout = false;
		}
		CHECK_FOR_INTERRUPTS();
1212

1213 1214
		/*
		 * waitStatus could change from STATUS_WAITING to something else
B
Bruce Momjian 已提交
1215
		 * asynchronously.  Read it just once per loop to prevent surprising
1216 1217
		 * behavior (such as missing log messages).
		 */
1218
		myWaitStatus = *((volatile int *) &MyProc->waitStatus);
1219

1220 1221
		/*
		 * If we are not deadlocked, but are waiting on an autovacuum-induced
B
Bruce Momjian 已提交
1222
		 * task, send a signal to interrupt it.
1223 1224 1225
		 */
		if (deadlock_state == DS_BLOCKED_BY_AUTOVACUUM && allow_autovacuum_cancel)
		{
B
Bruce Momjian 已提交
1226
			PGPROC	   *autovac = GetBlockingAutoVacuumPgproc();
1227
			PGXACT	   *autovac_pgxact = &ProcGlobal->allPgXact[autovac->pgprocno];
1228 1229 1230 1231 1232 1233 1234

			LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

			/*
			 * Only do it if the worker is not working to protect against Xid
			 * wraparound.
			 */
S
Stephen Frost 已提交
1235
			if ((autovac_pgxact->vacuumFlags & PROC_IS_AUTOVACUUM) &&
1236
				!(autovac_pgxact->vacuumFlags & PROC_VACUUM_FOR_WRAPAROUND))
1237
			{
B
Bruce Momjian 已提交
1238
				int			pid = autovac->pid;
1239
				StringInfoData locktagbuf;
B
Bruce Momjian 已提交
1240
				StringInfoData logbuf;	/* errdetail for server log */
1241 1242 1243 1244 1245

				initStringInfo(&locktagbuf);
				initStringInfo(&logbuf);
				DescribeLockTag(&locktagbuf, &lock->tag);
				appendStringInfo(&logbuf,
B
Bruce Momjian 已提交
1246 1247 1248 1249 1250
								 _("Process %d waits for %s on %s."),
								 MyProcPid,
							  GetLockmodeName(lock->tag.locktag_lockmethodid,
											  lockmode),
								 locktagbuf.data);
1251 1252 1253

				/* release lock as quickly as possible */
				LWLockRelease(ProcArrayLock);
1254

1255 1256
				/* send the autovacuum worker Back to Old Kent Road */
				ereport(DEBUG1,
B
Bruce Momjian 已提交
1257 1258 1259
					  (errmsg("sending cancel to blocking autovacuum PID %d",
							  pid),
					   errdetail_log("%s", logbuf.data)));
1260 1261 1262

				if (kill(pid, SIGINT) < 0)
				{
1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276
					/*
					 * There's a race condition here: once we release the
					 * ProcArrayLock, it's possible for the autovac worker to
					 * close up shop and exit before we can do the kill().
					 * Therefore, we do not whinge about no-such-process.
					 * Other errors such as EPERM could conceivably happen if
					 * the kernel recycles the PID fast enough, but such cases
					 * seem improbable enough that it's probably best to issue
					 * a warning if we see some other errno.
					 */
					if (errno != ESRCH)
						ereport(WARNING,
						   (errmsg("could not send signal to process %d: %m",
								   pid)));
1277
				}
1278 1279 1280

				pfree(logbuf.data);
				pfree(locktagbuf.data);
1281 1282 1283 1284 1285 1286 1287 1288
			}
			else
				LWLockRelease(ProcArrayLock);

			/* prevent signal from being resent more than once */
			allow_autovacuum_cancel = false;
		}

1289 1290 1291 1292
		/*
		 * If awoken after the deadlock check interrupt has run, and
		 * log_lock_waits is on, then report about the wait.
		 */
1293
		if (log_lock_waits && deadlock_state != DS_NOT_YET_CHECKED)
1294
		{
1295 1296 1297
			StringInfoData buf,
						lock_waiters_sbuf,
						lock_holders_sbuf;
1298 1299 1300 1301
			const char *modename;
			long		secs;
			int			usecs;
			long		msecs;
1302 1303 1304 1305 1306
			SHM_QUEUE  *procLocks;
			PROCLOCK   *proclock;
			bool		first_holder = true,
						first_waiter = true;
			int			lockHoldersNum = 0;
1307 1308

			initStringInfo(&buf);
1309 1310 1311
			initStringInfo(&lock_waiters_sbuf);
			initStringInfo(&lock_holders_sbuf);

1312 1313 1314
			DescribeLockTag(&buf, &locallock->tag.lock);
			modename = GetLockmodeName(locallock->tag.lock.locktag_lockmethodid,
									   lockmode);
1315 1316
			TimestampDifference(get_timeout_start_time(DEADLOCK_TIMEOUT),
								GetCurrentTimestamp(),
1317 1318 1319 1320
								&secs, &usecs);
			msecs = secs * 1000 + usecs / 1000;
			usecs = usecs % 1000;

1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374
			/*
			 * we loop over the lock's procLocks to gather a list of all
			 * holders and waiters. Thus we will be able to provide more
			 * detailed information for lock debugging purposes.
			 *
			 * lock->procLocks contains all processes which hold or wait for
			 * this lock.
			 */

			LWLockAcquire(partitionLock, LW_SHARED);

			procLocks = &(lock->procLocks);
			proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
											   offsetof(PROCLOCK, lockLink));

			while (proclock)
			{
				/*
				 * we are a waiter if myProc->waitProcLock == proclock; we are
				 * a holder if it is NULL or something different
				 */
				if (proclock->tag.myProc->waitProcLock == proclock)
				{
					if (first_waiter)
					{
						appendStringInfo(&lock_waiters_sbuf, "%d",
										 proclock->tag.myProc->pid);
						first_waiter = false;
					}
					else
						appendStringInfo(&lock_waiters_sbuf, ", %d",
										 proclock->tag.myProc->pid);
				}
				else
				{
					if (first_holder)
					{
						appendStringInfo(&lock_holders_sbuf, "%d",
										 proclock->tag.myProc->pid);
						first_holder = false;
					}
					else
						appendStringInfo(&lock_holders_sbuf, ", %d",
										 proclock->tag.myProc->pid);

					lockHoldersNum++;
				}

				proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
											   offsetof(PROCLOCK, lockLink));
			}

			LWLockRelease(partitionLock);

1375 1376 1377
			if (deadlock_state == DS_SOFT_DEADLOCK)
				ereport(LOG,
						(errmsg("process %d avoided deadlock for %s on %s by rearranging queue order after %ld.%03d ms",
1378 1379 1380 1381
								MyProcPid, modename, buf.data, msecs, usecs),
						 (errdetail_log_plural("Process holding the lock: %s. Wait queue: %s.",
						   "Processes holding the lock: %s. Wait queue: %s.",
											   lockHoldersNum, lock_holders_sbuf.data, lock_waiters_sbuf.data))));
1382
			else if (deadlock_state == DS_HARD_DEADLOCK)
1383
			{
1384
				/*
B
Bruce Momjian 已提交
1385 1386 1387 1388
				 * This message is a bit redundant with the error that will be
				 * reported subsequently, but in some cases the error report
				 * might not make it to the log (eg, if it's caught by an
				 * exception handler), and we want to ensure all long-wait
1389 1390 1391 1392
				 * events get logged.
				 */
				ereport(LOG,
						(errmsg("process %d detected deadlock while waiting for %s on %s after %ld.%03d ms",
1393 1394 1395 1396
								MyProcPid, modename, buf.data, msecs, usecs),
						 (errdetail_log_plural("Process holding the lock: %s. Wait queue: %s.",
						   "Processes holding the lock: %s. Wait queue: %s.",
											   lockHoldersNum, lock_holders_sbuf.data, lock_waiters_sbuf.data))));
1397
			}
1398 1399 1400 1401

			if (myWaitStatus == STATUS_WAITING)
				ereport(LOG,
						(errmsg("process %d still waiting for %s on %s after %ld.%03d ms",
1402 1403 1404 1405
								MyProcPid, modename, buf.data, msecs, usecs),
						 (errdetail_log_plural("Process holding the lock: %s. Wait queue: %s.",
						   "Processes holding the lock: %s. Wait queue: %s.",
											   lockHoldersNum, lock_holders_sbuf.data, lock_waiters_sbuf.data))));
1406 1407
			else if (myWaitStatus == STATUS_OK)
				ereport(LOG,
B
Bruce Momjian 已提交
1408 1409
					(errmsg("process %d acquired %s on %s after %ld.%03d ms",
							MyProcPid, modename, buf.data, msecs, usecs)));
1410 1411 1412
			else
			{
				Assert(myWaitStatus == STATUS_ERROR);
B
Bruce Momjian 已提交
1413

1414 1415
				/*
				 * Currently, the deadlock checker always kicks its own
B
Bruce Momjian 已提交
1416 1417 1418 1419 1420
				 * process, which means that we'll only see STATUS_ERROR when
				 * deadlock_state == DS_HARD_DEADLOCK, and there's no need to
				 * print redundant messages.  But for completeness and
				 * future-proofing, print a message if it looks like someone
				 * else kicked us off the lock.
1421 1422 1423 1424
				 */
				if (deadlock_state != DS_HARD_DEADLOCK)
					ereport(LOG,
							(errmsg("process %d failed to acquire %s on %s after %ld.%03d ms",
1425 1426 1427 1428
								MyProcPid, modename, buf.data, msecs, usecs),
							 (errdetail_log_plural("Process holding the lock: %s. Wait queue: %s.",
						   "Processes holding the lock: %s. Wait queue: %s.",
												   lockHoldersNum, lock_holders_sbuf.data, lock_waiters_sbuf.data))));
1429 1430 1431
			}

			/*
B
Bruce Momjian 已提交
1432 1433
			 * At this point we might still need to wait for the lock. Reset
			 * state so we don't print the above messages again.
1434 1435 1436 1437
			 */
			deadlock_state = DS_NO_DEADLOCK;

			pfree(buf.data);
1438 1439
			pfree(lock_holders_sbuf.data);
			pfree(lock_waiters_sbuf.data);
1440
		}
1441
	} while (myWaitStatus == STATUS_WAITING);
1442

1443
	/*
1444 1445 1446 1447
	 * Disable the timers, if they are still running.  As in LockErrorCleanup,
	 * we must preserve the LOCK_TIMEOUT indicator flag: if a lock timeout has
	 * already caused QueryCancelPending to become set, we want the cancel to
	 * be reported as a lock timeout, not a user cancel.
B
Bruce Momjian 已提交
1448
	 */
1449 1450 1451 1452 1453 1454 1455
	if (LockTimeout > 0)
	{
		DisableTimeoutParams timeouts[2];

		timeouts[0].id = DEADLOCK_TIMEOUT;
		timeouts[0].keep_indicator = false;
		timeouts[1].id = LOCK_TIMEOUT;
1456
		timeouts[1].keep_indicator = true;
1457 1458 1459 1460
		disable_timeouts(timeouts, 2);
	}
	else
		disable_timeout(DEADLOCK_TIMEOUT, false);
B
Bruce Momjian 已提交
1461

1462
	/*
B
Bruce Momjian 已提交
1463 1464 1465
	 * Re-acquire the lock table's partition lock.  We have to do this to hold
	 * off cancel/die interrupts before we can mess with lockAwaited (else we
	 * might have a missed or duplicated locallock update).
1466
	 */
1467
	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
1468 1469

	/*
1470
	 * We no longer want LockErrorCleanup to do anything.
1471
	 */
1472
	lockAwaited = NULL;
1473

1474
	/*
1475
	 * If we got the lock, be sure to remember it in the locallock table.
1476
	 */
1477
	if (MyProc->waitStatus == STATUS_OK)
1478
		GrantAwaitedLock();
1479

1480 1481 1482 1483
	/*
	 * We don't have to do anything else, because the awaker did all the
	 * necessary update of the lock table and MyProc.
	 */
1484
	return MyProc->waitStatus;
1485 1486 1487 1488 1489 1490
}


/*
 * ProcWakeup -- wake up a process by releasing its private semaphore.
 *
1491
 *	 Also remove the process from the wait queue and set its links invalid.
1492
 *	 RETURN: the next process in the wait queue.
1493
 *
1494 1495
 * The appropriate lock partition lock must be held by caller.
 *
1496 1497 1498
 * XXX: presently, this code is only used for the "success" case, and only
 * works correctly for that case.  To clean up in failure case, would need
 * to twiddle the lock's request counts too --- see RemoveFromWaitQueue.
1499
 * Hence, in practice the waitStatus parameter must be STATUS_OK.
1500
 */
J
Jan Wieck 已提交
1501
PGPROC *
1502
ProcWakeup(PGPROC *proc, int waitStatus)
1503
{
J
Jan Wieck 已提交
1504
	PGPROC	   *retProc;
1505

1506
	/* Proc should be sleeping ... */
1507 1508
	if (proc->links.prev == NULL ||
		proc->links.next == NULL)
1509
		return NULL;
1510
	Assert(proc->waitStatus == STATUS_WAITING);
1511

1512
	/* Save next process before we zap the list link */
1513
	retProc = (PGPROC *) proc->links.next;
1514

1515
	/* Remove process from wait queue */
1516
	SHMQueueDelete(&(proc->links));
1517
	(proc->waitLock->waitProcs.size)--;
1518

1519 1520
	/* Clean up process' state and pass it the ok/fail signal */
	proc->waitLock = NULL;
1521
	proc->waitProcLock = NULL;
1522
	proc->waitStatus = waitStatus;
1523

1524
	/* And awaken it */
1525
	SetLatch(&proc->procLatch);
1526 1527

	return retProc;
1528 1529 1530 1531
}

/*
 * ProcLockWakeup -- routine for waking up processes when a lock is
1532 1533
 *		released (or a prior waiter is aborted).  Scan all waiters
 *		for lock, waken any that are no longer blocked.
1534 1535
 *
 * The appropriate lock partition lock must be held by caller.
1536
 */
1537
void
1538
ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock)
1539
{
1540 1541
	PROC_QUEUE *waitQueue = &(lock->waitProcs);
	int			queue_size = waitQueue->size;
J
Jan Wieck 已提交
1542
	PGPROC	   *proc;
1543
	LOCKMASK	aheadRequests = 0;
M
 
Marc G. Fournier 已提交
1544

1545
	Assert(queue_size >= 0);
1546

1547 1548
	if (queue_size == 0)
		return;
1549

1550
	proc = (PGPROC *) waitQueue->links.next;
1551

1552 1553
	while (queue_size-- > 0)
	{
B
Bruce Momjian 已提交
1554
		LOCKMODE	lockmode = proc->waitLockMode;
M
 
Marc G. Fournier 已提交
1555 1556

		/*
B
Bruce Momjian 已提交
1557 1558
		 * Waken if (a) doesn't conflict with requests of earlier waiters, and
		 * (b) doesn't conflict with already-held locks.
M
 
Marc G. Fournier 已提交
1559
		 */
B
Bruce Momjian 已提交
1560
		if ((lockMethodTable->conflictTab[lockmode] & aheadRequests) == 0 &&
1561 1562 1563
			LockCheckConflicts(lockMethodTable,
							   lockmode,
							   lock,
1564
							   proc->waitProcLock) == STATUS_OK)
M
 
Marc G. Fournier 已提交
1565
		{
1566
			/* OK to waken */
1567
			GrantLock(lock, proc->waitProcLock, lockmode);
1568
			proc = ProcWakeup(proc, STATUS_OK);
B
Bruce Momjian 已提交
1569

1570
			/*
B
Bruce Momjian 已提交
1571 1572 1573
			 * ProcWakeup removes proc from the lock's waiting process queue
			 * and returns the next proc in chain; don't use proc's next-link,
			 * because it's been cleared.
1574
			 */
M
 
Marc G. Fournier 已提交
1575
		}
1576
		else
1577
		{
B
Bruce Momjian 已提交
1578
			/*
B
Bruce Momjian 已提交
1579
			 * Cannot wake this guy. Remember his request for later checks.
B
Bruce Momjian 已提交
1580
			 */
1581
			aheadRequests |= LOCKBIT_ON(lockmode);
1582
			proc = (PGPROC *) proc->links.next;
1583
		}
M
 
Marc G. Fournier 已提交
1584
	}
1585 1586

	Assert(waitQueue->size >= 0);
1587 1588
}

1589 1590 1591
/*
 * CheckDeadLock
 *
1592 1593 1594 1595 1596
 * We only get to this routine, if DEADLOCK_TIMEOUT fired while waiting for a
 * lock to be released by some other process.  Check if there's a deadlock; if
 * not, just return.  (But signal ProcSleep to log a message, if
 * log_lock_waits is true.)  If we have a real deadlock, remove ourselves from
 * the lock's wait queue and signal an error to ProcSleep.
1597
 */
1598
static void
1599
CheckDeadLock(void)
1600
{
1601 1602
	int			i;

1603
	/*
B
Bruce Momjian 已提交
1604 1605
	 * Acquire exclusive lock on the entire shared lock data structures. Must
	 * grab LWLocks in partition-number order to avoid LWLock deadlock.
1606 1607 1608 1609 1610 1611
	 *
	 * Note that the deadlock check interrupt had better not be enabled
	 * anywhere that this process itself holds lock partition locks, else this
	 * will wait forever.  Also note that LWLockAcquire creates a critical
	 * section, so that this routine cannot be interrupted by cancel/die
	 * interrupts.
1612
	 */
1613
	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
1614
		LWLockAcquire(LockHashPartitionLockByIndex(i), LW_EXCLUSIVE);
1615

1616
	/*
1617 1618
	 * Check to see if we've been awoken by anyone in the interim.
	 *
1619
	 * If we have, we can return and resume our transaction -- happy day.
1620 1621
	 * Before we are awoken the process releasing the lock grants it to us so
	 * we know that we don't have to wait anymore.
1622
	 *
1623
	 * We check by looking to see if we've been unlinked from the wait queue.
B
Bruce Momjian 已提交
1624
	 * This is quicker than checking our semaphore's state, since no kernel
1625
	 * call is needed, and it is safe because we hold the lock partition lock.
1626
	 */
1627 1628
	if (MyProc->links.prev == NULL ||
		MyProc->links.next == NULL)
1629 1630 1631 1632 1633 1634 1635 1636 1637 1638
		goto check_done;

#ifdef LOCK_DEBUG
	if (Debug_deadlocks)
		DumpAllLocks();
#endif

	/* Run the deadlock check, and set deadlock_state for use by ProcSleep */
	deadlock_state = DeadLockCheck(MyProc);

1639
	if (deadlock_state == DS_HARD_DEADLOCK)
B
Bruce Momjian 已提交
1640
	{
1641 1642 1643
		/*
		 * Oops.  We have a deadlock.
		 *
1644 1645 1646 1647
		 * Get this process out of wait state. (Note: we could do this more
		 * efficiently by relying on lockAwaited, but use this coding to
		 * preserve the flexibility to kill some other transaction than the
		 * one detecting the deadlock.)
1648 1649
		 *
		 * RemoveFromWaitQueue sets MyProc->waitStatus to STATUS_ERROR, so
1650 1651
		 * ProcSleep will report an error after we return from the signal
		 * handler.
1652 1653 1654
		 */
		Assert(MyProc->waitLock != NULL);
		RemoveFromWaitQueue(MyProc, LockTagHashCode(&(MyProc->waitLock->tag)));
1655

1656
		/*
1657 1658 1659 1660 1661 1662 1663 1664
		 * We're done here.  Transaction abort caused by the error that
		 * ProcSleep will raise will cause any other locks we hold to be
		 * released, thus allowing other processes to wake up; we don't need
		 * to do that here.  NOTE: an exception is that releasing locks we
		 * hold doesn't consider the possibility of waiters that were blocked
		 * behind us on the lock we just failed to get, and might now be
		 * wakable because we're not in front of them anymore.  However,
		 * RemoveFromWaitQueue took care of waking up any such processes.
1665 1666
		 */
	}
1667 1668

	/*
B
Bruce Momjian 已提交
1669 1670 1671 1672 1673
	 * And release locks.  We do this in reverse order for two reasons: (1)
	 * Anyone else who needs more than one of the locks will be trying to lock
	 * them in increasing order; we don't want to release the other process
	 * until it can get all the locks it needs. (2) This avoids O(N^2)
	 * behavior inside LWLockRelease.
1674
	 */
1675
check_done:
B
Bruce Momjian 已提交
1676
	for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
1677
		LWLockRelease(LockHashPartitionLockByIndex(i));
1678 1679
}

1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690
/*
 * CheckDeadLockAlert - Handle the expiry of deadlock_timeout.
 *
 * NB: Runs inside a signal handler, be careful.
 */
void
CheckDeadLockAlert(void)
{
	int			save_errno = errno;

	got_deadlock_timeout = true;
B
Bruce Momjian 已提交
1691

1692 1693 1694 1695 1696 1697 1698 1699
	/*
	 * Have to set the latch again, even if handle_sig_alarm already did. Back
	 * then got_deadlock_timeout wasn't yet set... It's unlikely that this
	 * ever would be a problem, but setting a set latch again is cheap.
	 */
	SetLatch(MyLatch);
	errno = save_errno;
}
1700

1701 1702 1703
/*
 * ProcWaitForSignal - wait for a signal from another backend.
 *
1704 1705 1706
 * As this uses the generic process latch the caller has to be robust against
 * unrelated wakeups: Always check that the desired state has occurred, and
 * wait again if not.
1707 1708 1709 1710
 */
void
ProcWaitForSignal(void)
{
1711 1712 1713
	WaitLatch(MyLatch, WL_LATCH_SET, 0);
	ResetLatch(MyLatch);
	CHECK_FOR_INTERRUPTS();
1714 1715 1716
}

/*
1717
 * ProcSendSignal - send a signal to a backend identified by PID
1718 1719
 */
void
1720
ProcSendSignal(int pid)
1721
{
1722 1723 1724 1725 1726 1727 1728 1729
	PGPROC	   *proc = NULL;

	if (RecoveryInProgress())
	{
		SpinLockAcquire(ProcStructLock);

		/*
		 * Check to see whether it is the Startup process we wish to signal.
B
Bruce Momjian 已提交
1730 1731
		 * This call is made by the buffer manager when it wishes to wake up a
		 * process that has been waiting for a pin in so it can obtain a
1732
		 * cleanup lock using LockBufferForCleanup(). Startup is not a normal
B
Bruce Momjian 已提交
1733 1734
		 * backend, so BackendPidGetProc() will not return any pid at all. So
		 * we remember the information for this special case.
1735
		 */
1736 1737
		if (pid == ProcGlobal->startupProcPid)
			proc = ProcGlobal->startupProc;
1738 1739 1740 1741 1742 1743

		SpinLockRelease(ProcStructLock);
	}

	if (proc == NULL)
		proc = BackendPidGetProc(pid);
1744 1745

	if (proc != NULL)
1746 1747 1748
	{
		SetLatch(&proc->procLatch);
	}
1749
}
1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812

/*
 * BecomeLockGroupLeader - designate process as lock group leader
 *
 * Once this function has returned, other processes can join the lock group
 * by calling BecomeLockGroupMember.
 */
void
BecomeLockGroupLeader(void)
{
	LWLock	   *leader_lwlock;

	/* If we already did it, we don't need to do it again. */
	if (MyProc->lockGroupLeader == MyProc)
		return;

	/* We had better not be a follower. */
	Assert(MyProc->lockGroupLeader == NULL);

	/* Create single-member group, containing only ourselves. */
	leader_lwlock = LockHashPartitionLockByProc(MyProc);
	LWLockAcquire(leader_lwlock, LW_EXCLUSIVE);
	MyProc->lockGroupLeader = MyProc;
	MyProc->lockGroupLeaderIdentifier = MyProcPid;
	dlist_push_head(&MyProc->lockGroupMembers, &MyProc->lockGroupLink);
	LWLockRelease(leader_lwlock);
}

/*
 * BecomeLockGroupMember - designate process as lock group member
 *
 * This is pretty straightforward except for the possibility that the leader
 * whose group we're trying to join might exit before we manage to do so;
 * and the PGPROC might get recycled for an unrelated process.  To avoid
 * that, we require the caller to pass the PID of the intended PGPROC as
 * an interlock.  Returns true if we successfully join the intended lock
 * group, and false if not.
 */
bool
BecomeLockGroupMember(PGPROC *leader, int pid)
{
	LWLock	   *leader_lwlock;
	bool		ok = false;

	/* Group leader can't become member of group */
	Assert(MyProc != leader);

	/* PID must be valid. */
	Assert(pid != 0);

	/* Try to join the group. */
	leader_lwlock = LockHashPartitionLockByProc(MyProc);
	LWLockAcquire(leader_lwlock, LW_EXCLUSIVE);
	if (leader->lockGroupLeaderIdentifier == pid)
	{
		ok = true;
		MyProc->lockGroupLeader = leader;
		dlist_push_tail(&leader->lockGroupMembers, &MyProc->lockGroupLink);
	}
	LWLockRelease(leader_lwlock);

	return ok;
}