procarray.c 91.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12
/*-------------------------------------------------------------------------
 *
 * procarray.c
 *	  POSTGRES process array code.
 *
 *
 * This module maintains an unsorted array of the PGPROC structures for all
 * active backends.  Although there are several uses for this, the principal
 * one is as a means of determining the set of currently running transactions.
 *
 * Because of various subtle race conditions it is critical that a backend
 * hold the correct locks while setting or clearing its MyProc->xid field.
13
 * See notes in src/backend/access/transam/README.
14 15 16
 *
 * The process array now also includes PGPROC structures representing
 * prepared transactions.  The xid and subxids fields of these are valid,
17 18
 * as are the myProcLocks lists.  They can be distinguished from regular
 * backend PGPROCs at need by checking for pid == 0.
B
Bruce Momjian 已提交
19
 *
20 21
 * During hot standby, we also keep a list of XIDs representing transactions
 * that are known to be running in the master (or more precisely, were running
B
Bruce Momjian 已提交
22
 * as of the current point in the WAL stream).	This list is kept in the
23 24 25
 * KnownAssignedXids array, and is updated by watching the sequence of
 * arriving XIDs.  This is necessary because if we leave those XIDs out of
 * snapshots taken for standby queries, then they will appear to be already
B
Bruce Momjian 已提交
26
 * complete, leading to MVCC failures.	Note that in hot standby, the PGPROC
27 28 29 30 31 32 33
 * array represents standby processes, which by definition are not running
 * transactions that have XIDs.
 *
 * It is perhaps possible for a backend on the master to terminate without
 * writing an abort record for its transaction.  While that shouldn't really
 * happen, it would tie up KnownAssignedXids indefinitely, so we protect
 * ourselves by pruning the array when a valid list of running XIDs arrives.
34
 *
B
Bruce Momjian 已提交
35
 * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
36 37 38 39
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
40
 *	  src/backend/storage/ipc/procarray.c
41 42 43 44 45
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

46 47
#include <signal.h>

48
#include "access/clog.h"
49
#include "access/subtrans.h"
50 51
#include "access/transam.h"
#include "access/xact.h"
52
#include "access/twophase.h"
53 54
#include "miscadmin.h"
#include "storage/procarray.h"
55
#include "storage/spin.h"
56 57
#include "storage/standby.h"
#include "utils/builtins.h"
58
#include "utils/snapmgr.h"
59 60 61 62 63 64 65 66


/* Our shared memory area */
typedef struct ProcArrayStruct
{
	int			numProcs;		/* number of valid procs entries */
	int			maxProcs;		/* allocated size of procs array */

67 68 69 70 71 72 73
	/*
	 * Known assigned XIDs handling
	 */
	int			maxKnownAssignedXids;	/* allocated size of array */
	int			numKnownAssignedXids;	/* currrent # of valid entries */
	int			tailKnownAssignedXids;	/* index of oldest valid element */
	int			headKnownAssignedXids;	/* index of newest element, + 1 */
B
Bruce Momjian 已提交
74
	slock_t		known_assigned_xids_lck;		/* protects head/tail pointers */
B
Bruce Momjian 已提交
75

76
	/*
77 78 79 80 81
	 * Highest subxid that has been removed from KnownAssignedXids array to
	 * prevent overflow; or InvalidTransactionId if none.  We track this for
	 * similar reasons to tracking overflowing cached subxids in PGPROC
	 * entries.  Must hold exclusive ProcArrayLock to change this, and shared
	 * lock to read it.
82
	 */
B
Bruce Momjian 已提交
83
	TransactionId lastOverflowedXid;
84

85
	/*
B
Bruce Momjian 已提交
86 87
	 * We declare procs[] as 1 entry because C wants a fixed-size array, but
	 * actually it is maxProcs entries long.
88 89 90 91 92 93
	 */
	PGPROC	   *procs[1];		/* VARIABLE LENGTH ARRAY */
} ProcArrayStruct;

static ProcArrayStruct *procArray;

94 95 96
/*
 * Bookkeeping for tracking emulated transactions in recovery
 */
97 98
static TransactionId *KnownAssignedXids;
static bool *KnownAssignedXidsValid;
B
Bruce Momjian 已提交
99
static TransactionId latestObservedXid = InvalidTransactionId;
100 101 102 103 104 105 106 107

/*
 * If we're in STANDBY_SNAPSHOT_PENDING state, standbySnapshotPendingXmin is
 * the highest xid that might still be running that we don't have in
 * KnownAssignedXids.
 */
static TransactionId standbySnapshotPendingXmin;

108 109 110 111
#ifdef XIDCACHE_DEBUG

/* counters for XidCache measurement */
static long xc_by_recent_xmin = 0;
112
static long xc_by_known_xact = 0;
113
static long xc_by_my_xact = 0;
114
static long xc_by_latest_xid = 0;
115 116
static long xc_by_main_xid = 0;
static long xc_by_child_xid = 0;
117
static long xc_by_known_assigned = 0;
118
static long xc_no_overflow = 0;
119 120 121
static long xc_slow_answer = 0;

#define xc_by_recent_xmin_inc()		(xc_by_recent_xmin++)
122
#define xc_by_known_xact_inc()		(xc_by_known_xact++)
123
#define xc_by_my_xact_inc()			(xc_by_my_xact++)
124
#define xc_by_latest_xid_inc()		(xc_by_latest_xid++)
125 126
#define xc_by_main_xid_inc()		(xc_by_main_xid++)
#define xc_by_child_xid_inc()		(xc_by_child_xid++)
127
#define xc_by_known_assigned_inc()	(xc_by_known_assigned++)
128
#define xc_no_overflow_inc()		(xc_no_overflow++)
129 130 131 132 133 134
#define xc_slow_answer_inc()		(xc_slow_answer++)

static void DisplayXidCache(void);
#else							/* !XIDCACHE_DEBUG */

#define xc_by_recent_xmin_inc()		((void) 0)
135
#define xc_by_known_xact_inc()		((void) 0)
136
#define xc_by_my_xact_inc()			((void) 0)
137
#define xc_by_latest_xid_inc()		((void) 0)
138 139
#define xc_by_main_xid_inc()		((void) 0)
#define xc_by_child_xid_inc()		((void) 0)
140
#define xc_by_known_assigned_inc()	((void) 0)
141
#define xc_no_overflow_inc()		((void) 0)
142 143 144
#define xc_slow_answer_inc()		((void) 0)
#endif   /* XIDCACHE_DEBUG */

145
/* Primitives for KnownAssignedXids array handling for standby */
146 147
static void KnownAssignedXidsCompress(bool force);
static void KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
B
Bruce Momjian 已提交
148
					 bool exclusive_lock);
149 150
static bool KnownAssignedXidsSearch(TransactionId xid, bool remove);
static bool KnownAssignedXidExists(TransactionId xid);
151
static void KnownAssignedXidsRemove(TransactionId xid);
152
static void KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
B
Bruce Momjian 已提交
153
							TransactionId *subxids);
154
static void KnownAssignedXidsRemovePreceding(TransactionId xid);
B
Bruce Momjian 已提交
155
static int	KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax);
156
static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray,
B
Bruce Momjian 已提交
157 158
							   TransactionId *xmin,
							   TransactionId xmax);
159
static TransactionId KnownAssignedXidsGetOldestXmin(void);
160
static void KnownAssignedXidsDisplay(int trace_level);
161 162 163 164

/*
 * Report shared-memory space needed by CreateSharedProcArray.
 */
165
Size
166
ProcArrayShmemSize(void)
167
{
168 169
	Size		size;

170 171
	/* Size of the ProcArray structure itself */
#define PROCARRAY_MAXPROCS	(MaxBackends + max_prepared_xacts)
172

173
	size = offsetof(ProcArrayStruct, procs);
174 175 176
	size = add_size(size, mul_size(sizeof(PGPROC *), PROCARRAY_MAXPROCS));

	/*
177
	 * During Hot Standby processing we have a data structure called
178 179 180 181 182 183
	 * KnownAssignedXids, created in shared memory. Local data structures are
	 * also created in various backends during GetSnapshotData(),
	 * TransactionIdIsInProgress() and GetRunningTransactionData(). All of the
	 * main structures created in those functions must be identically sized,
	 * since we may at times copy the whole of the data structures around. We
	 * refer to this size as TOTAL_MAX_CACHED_SUBXIDS.
184
	 *
B
Bruce Momjian 已提交
185 186 187
	 * Ideally we'd only create this structure if we were actually doing hot
	 * standby in the current run, but we don't know that yet at the time
	 * shared memory is being set up.
188
	 */
189 190 191
#define TOTAL_MAX_CACHED_SUBXIDS \
	((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)

192
	if (EnableHotStandby)
193 194 195 196
	{
		size = add_size(size,
						mul_size(sizeof(TransactionId),
								 TOTAL_MAX_CACHED_SUBXIDS));
197
		size = add_size(size,
198 199
						mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS));
	}
200 201

	return size;
202 203 204 205 206 207
}

/*
 * Initialize the shared PGPROC array during postmaster startup.
 */
void
208
CreateSharedProcArray(void)
209 210 211 212 213
{
	bool		found;

	/* Create or attach to the ProcArray shared structure */
	procArray = (ProcArrayStruct *)
214
		ShmemInitStruct("Proc Array",
215 216 217
						add_size(offsetof(ProcArrayStruct, procs),
								 mul_size(sizeof(PGPROC *),
										  PROCARRAY_MAXPROCS)),
218
						&found);
219 220 221 222 223 224 225

	if (!found)
	{
		/*
		 * We're the first - initialize.
		 */
		procArray->numProcs = 0;
226
		procArray->maxProcs = PROCARRAY_MAXPROCS;
227
		procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS;
228 229 230 231
		procArray->numKnownAssignedXids = 0;
		procArray->tailKnownAssignedXids = 0;
		procArray->headKnownAssignedXids = 0;
		SpinLockInit(&procArray->known_assigned_xids_lck);
232 233
		procArray->lastOverflowedXid = InvalidTransactionId;
	}
234

235
	/* Create or attach to the KnownAssignedXids arrays too, if needed */
236
	if (EnableHotStandby)
237
	{
238 239 240 241 242 243 244 245 246
		KnownAssignedXids = (TransactionId *)
			ShmemInitStruct("KnownAssignedXids",
							mul_size(sizeof(TransactionId),
									 TOTAL_MAX_CACHED_SUBXIDS),
							&found);
		KnownAssignedXidsValid = (bool *)
			ShmemInitStruct("KnownAssignedXidsValid",
							mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS),
							&found);
247 248 249 250
	}
}

/*
251
 * Add the specified PGPROC to the shared array.
252 253
 */
void
254
ProcArrayAdd(PGPROC *proc)
255 256 257 258 259 260 261 262
{
	ProcArrayStruct *arrayP = procArray;

	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

	if (arrayP->numProcs >= arrayP->maxProcs)
	{
		/*
B
Bruce Momjian 已提交
263 264 265
		 * Ooops, no room.	(This really shouldn't happen, since there is a
		 * fixed supply of PGPROC structs too, and so we should have failed
		 * earlier.)
266 267 268 269 270 271 272
		 */
		LWLockRelease(ProcArrayLock);
		ereport(FATAL,
				(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
				 errmsg("sorry, too many clients already")));
	}

273
	arrayP->procs[arrayP->numProcs] = proc;
274 275 276 277 278 279
	arrayP->numProcs++;

	LWLockRelease(ProcArrayLock);
}

/*
280
 * Remove the specified PGPROC from the shared array.
281 282 283 284 285 286 287
 *
 * When latestXid is a valid XID, we are removing a live 2PC gxact from the
 * array, and thus causing it to appear as "not running" anymore.  In this
 * case we must advance latestCompletedXid.  (This is essentially the same
 * as ProcArrayEndTransaction followed by removal of the PGPROC, but we take
 * the ProcArrayLock only once, and don't damage the content of the PGPROC;
 * twophase.c depends on the latter.)
288 289
 */
void
290
ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
291 292 293 294 295
{
	ProcArrayStruct *arrayP = procArray;
	int			index;

#ifdef XIDCACHE_DEBUG
296 297 298
	/* dump stats at backend shutdown, but not prepared-xact end */
	if (proc->pid != 0)
		DisplayXidCache();
299 300 301 302
#endif

	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
	if (TransactionIdIsValid(latestXid))
	{
		Assert(TransactionIdIsValid(proc->xid));

		/* Advance global latestCompletedXid while holding the lock */
		if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
								  latestXid))
			ShmemVariableCache->latestCompletedXid = latestXid;
	}
	else
	{
		/* Shouldn't be trying to remove a live transaction here */
		Assert(!TransactionIdIsValid(proc->xid));
	}

318 319
	for (index = 0; index < arrayP->numProcs; index++)
	{
320
		if (arrayP->procs[index] == proc)
321 322
		{
			arrayP->procs[index] = arrayP->procs[arrayP->numProcs - 1];
323
			arrayP->procs[arrayP->numProcs - 1] = NULL; /* for debugging */
324 325 326 327 328 329 330 331 332
			arrayP->numProcs--;
			LWLockRelease(ProcArrayLock);
			return;
		}
	}

	/* Ooops */
	LWLockRelease(ProcArrayLock);

333
	elog(LOG, "failed to find proc %p in ProcArray", proc);
334 335 336
}


337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355
/*
 * ProcArrayEndTransaction -- mark a transaction as no longer running
 *
 * This is used interchangeably for commit and abort cases.  The transaction
 * commit/abort must already be reported to WAL and pg_clog.
 *
 * proc is currently always MyProc, but we pass it explicitly for flexibility.
 * latestXid is the latest Xid among the transaction's main XID and
 * subtransactions, or InvalidTransactionId if it has no XID.  (We must ask
 * the caller to pass latestXid, instead of computing it from the PGPROC's
 * contents, because the subxid information in the PGPROC might be
 * incomplete.)
 */
void
ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
{
	if (TransactionIdIsValid(latestXid))
	{
		/*
B
Bruce Momjian 已提交
356 357 358
		 * We must lock ProcArrayLock while clearing proc->xid, so that we do
		 * not exit the set of "running" transactions while someone else is
		 * taking a snapshot.  See discussion in
359 360 361 362 363 364 365 366 367
		 * src/backend/access/transam/README.
		 */
		Assert(TransactionIdIsValid(proc->xid));

		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

		proc->xid = InvalidTransactionId;
		proc->lxid = InvalidLocalTransactionId;
		proc->xmin = InvalidTransactionId;
368 369
		/* must be cleared with xid/xmin: */
		proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
B
Bruce Momjian 已提交
370
		proc->inCommit = false; /* be sure this is cleared in abort */
371
		proc->recoveryConflictPending = false;
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386

		/* Clear the subtransaction-XID cache too while holding the lock */
		proc->subxids.nxids = 0;
		proc->subxids.overflowed = false;

		/* Also advance global latestCompletedXid while holding the lock */
		if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
								  latestXid))
			ShmemVariableCache->latestCompletedXid = latestXid;

		LWLockRelease(ProcArrayLock);
	}
	else
	{
		/*
B
Bruce Momjian 已提交
387 388 389
		 * If we have no XID, we don't need to lock, since we won't affect
		 * anyone else's calculation of a snapshot.  We might change their
		 * estimate of global xmin, but that's OK.
390 391 392 393 394
		 */
		Assert(!TransactionIdIsValid(proc->xid));

		proc->lxid = InvalidLocalTransactionId;
		proc->xmin = InvalidTransactionId;
395 396
		/* must be cleared with xid/xmin: */
		proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
B
Bruce Momjian 已提交
397
		proc->inCommit = false; /* be sure this is cleared in abort */
398
		proc->recoveryConflictPending = false;
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418

		Assert(proc->subxids.nxids == 0);
		Assert(proc->subxids.overflowed == false);
	}
}


/*
 * ProcArrayClearTransaction -- clear the transaction fields
 *
 * This is used after successfully preparing a 2-phase transaction.  We are
 * not actually reporting the transaction's XID as no longer running --- it
 * will still appear as running because the 2PC's gxact is in the ProcArray
 * too.  We just have to clear out our own PGPROC.
 */
void
ProcArrayClearTransaction(PGPROC *proc)
{
	/*
	 * We can skip locking ProcArrayLock here, because this action does not
B
Bruce Momjian 已提交
419 420
	 * actually change anyone's view of the set of running XIDs: our entry is
	 * duplicate with the gxact that has already been inserted into the
421 422 423 424 425
	 * ProcArray.
	 */
	proc->xid = InvalidTransactionId;
	proc->lxid = InvalidLocalTransactionId;
	proc->xmin = InvalidTransactionId;
426
	proc->recoveryConflictPending = false;
427 428 429 430

	/* redundant, but just in case */
	proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
	proc->inCommit = false;
431 432 433 434 435 436

	/* Clear the subtransaction-XID cache too */
	proc->subxids.nxids = 0;
	proc->subxids.overflowed = false;
}

437 438 439
/*
 * ProcArrayApplyRecoveryInfo -- apply recovery info about xids
 *
440
 * Takes us through 3 states: Initialized, Pending and Ready.
441 442 443 444
 * Normal case is to go all the way to Ready straight away, though there
 * are atypical cases where we need to take it in steps.
 *
 * Use the data about running transactions on master to create the initial
445
 * state of KnownAssignedXids. We also use these records to regularly prune
446
 * KnownAssignedXids because we know it is possible that some transactions
447
 * with FATAL errors fail to write abort records, which could cause eventual
448 449
 * overflow.
 *
450
 * See comments for LogStandbySnapshot().
451 452 453 454
 */
void
ProcArrayApplyRecoveryInfo(RunningTransactions running)
{
B
Bruce Momjian 已提交
455
	TransactionId *xids;
B
Bruce Momjian 已提交
456
	int			nxids;
457
	TransactionId nextXid;
B
Bruce Momjian 已提交
458
	int			i;
459 460

	Assert(standbyState >= STANDBY_INITIALIZED);
461 462 463
	Assert(TransactionIdIsValid(running->nextXid));
	Assert(TransactionIdIsValid(running->oldestRunningXid));
	Assert(TransactionIdIsNormal(running->latestCompletedXid));
464 465 466 467 468 469 470 471 472 473 474 475 476 477

	/*
	 * Remove stale transactions, if any.
	 */
	ExpireOldKnownAssignedTransactionIds(running->oldestRunningXid);
	StandbyReleaseOldLocks(running->oldestRunningXid);

	/*
	 * If our snapshot is already valid, nothing else to do...
	 */
	if (standbyState == STANDBY_SNAPSHOT_READY)
		return;

	/*
478
	 * If our initial RunningTransactionsData had an overflowed snapshot then we knew
B
Bruce Momjian 已提交
479 480 481 482 483 484 485
	 * we were missing some subxids from our snapshot. We can use this data as
	 * an initial snapshot, but we cannot yet mark it valid. We know that the
	 * missing subxids are equal to or earlier than nextXid. After we
	 * initialise we continue to apply changes during recovery, so once the
	 * oldestRunningXid is later than the nextXid from the initial snapshot we
	 * know that we no longer have missing information and can mark the
	 * snapshot as valid.
486 487 488 489 490 491 492 493
	 */
	if (standbyState == STANDBY_SNAPSHOT_PENDING)
	{
		if (TransactionIdPrecedes(standbySnapshotPendingXmin,
								  running->oldestRunningXid))
		{
			standbyState = STANDBY_SNAPSHOT_READY;
			elog(trace_recovery(DEBUG2),
B
Bruce Momjian 已提交
494
				 "running xact data now proven complete");
495
			elog(trace_recovery(DEBUG2),
B
Bruce Momjian 已提交
496
				 "recovery snapshots are now enabled");
497
		}
498 499 500
		else
			elog(trace_recovery(DEBUG2),
				 "recovery snapshot waiting for %u oldest active xid on standby is %u",
B
Bruce Momjian 已提交
501 502
				 standbySnapshotPendingXmin,
				 running->oldestRunningXid);
503 504 505
		return;
	}

506 507
	Assert(standbyState == STANDBY_INITIALIZED);

508
	/*
509
	 * OK, we need to initialise from the RunningTransactionsData record
510 511 512
	 */

	/*
513 514
	 * Release any locks belonging to old transactions that are not
	 * running according to the running-xacts record.
515
	 */
516
	StandbyReleaseOldLocks(running->nextXid);
517

518 519 520 521
	/*
	 * Nobody else is running yet, but take locks anyhow
	 */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
522 523

	/*
524 525
	 * KnownAssignedXids is sorted so we cannot just add the xids, we have to
	 * sort them first.
526
	 *
B
Bruce Momjian 已提交
527 528 529 530 531 532
	 * Some of the new xids are top-level xids and some are subtransactions.
	 * We don't call SubtransSetParent because it doesn't matter yet. If we
	 * aren't overflowed then all xids will fit in snapshot and so we don't
	 * need subtrans. If we later overflow, an xid assignment record will add
	 * xids to subtrans. If RunningXacts is overflowed then we don't have
	 * enough information to correctly update subtrans anyway.
533
	 */
534
	Assert(procArray->numKnownAssignedXids == 0);
535 536

	/*
537 538
	 * Allocate a temporary array to avoid modifying the array passed as
	 * argument.
539
	 */
540
	xids = palloc(sizeof(TransactionId) * running->xcnt);
541 542

	/*
543
	 * Add to the temp array any xids which have not already completed.
544
	 */
545
	nxids = 0;
546
	for (i = 0; i < running->xcnt; i++)
547
	{
548
		TransactionId xid = running->xids[i];
549 550

		/*
551 552 553 554
		 * The running-xacts snapshot can contain xids that were still visible
		 * in the procarray when the snapshot was taken, but were already
		 * WAL-logged as completed. They're not running anymore, so ignore
		 * them.
555 556 557 558
		 */
		if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
			continue;

559
		xids[nxids++] = xid;
560 561
	}

562 563 564
	if (nxids > 0)
	{
		/*
B
Bruce Momjian 已提交
565 566
		 * Sort the array so that we can add them safely into
		 * KnownAssignedXids.
567 568 569 570 571 572 573
		 */
		qsort(xids, nxids, sizeof(TransactionId), xidComparator);

		/*
		 * Add the sorted snapshot into KnownAssignedXids
		 */
		for (i = 0; i < nxids; i++)
574
			KnownAssignedXidsAdd(xids[i], xids[i], true);
575 576 577 578 579

		KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
	}

	pfree(xids);
580 581

	/*
582 583
	 * Now we've got the running xids we need to set the global values that
	 * are used to track snapshots as they evolve further.
584
	 *
585 586 587
	 * - latestCompletedXid which will be the xmax for snapshots
	 * - lastOverflowedXid which shows whether snapshots overflow
	 * - nextXid
588 589 590
	 *
	 * If the snapshot overflowed, then we still initialise with what we know,
	 * but the recovery snapshot isn't fully valid yet because we know there
B
Bruce Momjian 已提交
591 592
	 * are some subxids missing. We don't know the specific subxids that are
	 * missing, so conservatively assume the last one is latestObservedXid.
593
	 */
594 595 596
	latestObservedXid = running->nextXid;
	TransactionIdRetreat(latestObservedXid);

597 598
	if (running->subxid_overflow)
	{
599 600 601
		standbyState = STANDBY_SNAPSHOT_PENDING;

		standbySnapshotPendingXmin = latestObservedXid;
602
		procArray->lastOverflowedXid = latestObservedXid;
603 604 605 606 607 608
	}
	else
	{
		standbyState = STANDBY_SNAPSHOT_READY;

		standbySnapshotPendingXmin = InvalidTransactionId;
609
		procArray->lastOverflowedXid = InvalidTransactionId;
610 611 612
	}

	/*
613 614 615 616
	 * If a transaction wrote a commit record in the gap between taking and
	 * logging the snapshot then latestCompletedXid may already be higher
	 * than the value from the snapshot, so check before we use the incoming
	 * value.
617 618 619 620
	 */
	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
							  running->latestCompletedXid))
		ShmemVariableCache->latestCompletedXid = running->latestCompletedXid;
621 622

	/* nextXid must be beyond any observed xid */
623 624 625 626
	nextXid = latestObservedXid;
	TransactionIdAdvance(nextXid);
	if (TransactionIdFollows(nextXid, ShmemVariableCache->nextXid))
		ShmemVariableCache->nextXid = nextXid;
627

628 629 630
	Assert(TransactionIdIsNormal(ShmemVariableCache->latestCompletedXid));
	Assert(TransactionIdIsValid(ShmemVariableCache->nextXid));

631 632 633
	LWLockRelease(ProcArrayLock);

	elog(trace_recovery(DEBUG2), "running transaction data initialized");
634
	KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
635
	if (standbyState == STANDBY_SNAPSHOT_READY)
636
		elog(trace_recovery(DEBUG2), "recovery snapshots are now enabled");
637 638 639
	else
		ereport(LOG,
				(errmsg("consistent state delayed because recovery snapshot incomplete")));
640 641
}

642 643 644 645
/*
 * ProcArrayApplyXidAssignment
 *		Process an XLOG_XACT_ASSIGNMENT WAL record
 */
646 647 648 649 650
void
ProcArrayApplyXidAssignment(TransactionId topxid,
							int nsubxids, TransactionId *subxids)
{
	TransactionId max_xid;
B
Bruce Momjian 已提交
651
	int			i;
652

653
	Assert(standbyState >= STANDBY_INITIALIZED);
654 655 656 657 658 659 660 661 662 663 664 665 666 667

	max_xid = TransactionIdLatest(topxid, nsubxids, subxids);

	/*
	 * Mark all the subtransactions as observed.
	 *
	 * NOTE: This will fail if the subxid contains too many previously
	 * unobserved xids to fit into known-assigned-xids. That shouldn't happen
	 * as the code stands, because xid-assignment records should never contain
	 * more than PGPROC_MAX_CACHED_SUBXIDS entries.
	 */
	RecordKnownAssignedTransactionIds(max_xid);

	/*
B
Bruce Momjian 已提交
668 669 670 671 672 673 674 675 676
	 * Notice that we update pg_subtrans with the top-level xid, rather than
	 * the parent xid. This is a difference between normal processing and
	 * recovery, yet is still correct in all cases. The reason is that
	 * subtransaction commit is not marked in clog until commit processing, so
	 * all aborted subtransactions have already been clearly marked in clog.
	 * As a result we are able to refer directly to the top-level
	 * transaction's state rather than skipping through all the intermediate
	 * states in the subtransaction tree. This should be the first time we
	 * have attempted to SubTransSetParent().
677 678 679 680 681 682 683 684 685 686
	 */
	for (i = 0; i < nsubxids; i++)
		SubTransSetParent(subxids[i], topxid, false);

	/*
	 * Uses same locking as transaction commit
	 */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

	/*
687
	 * Remove subxids from known-assigned-xacts.
688
	 */
689
	KnownAssignedXidsRemoveTree(InvalidTransactionId, nsubxids, subxids);
690 691

	/*
692
	 * Advance lastOverflowedXid to be at least the last of these subxids.
693 694 695 696 697 698
	 */
	if (TransactionIdPrecedes(procArray->lastOverflowedXid, max_xid))
		procArray->lastOverflowedXid = max_xid;

	LWLockRelease(ProcArrayLock);
}
699

700 701 702
/*
 * TransactionIdIsInProgress -- is given transaction running in some backend
 *
703
 * Aside from some shortcuts such as checking RecentXmin and our own Xid,
704
 * there are four possibilities for finding a running transaction:
705
 *
706
 * 1. The given Xid is a main transaction Id.  We will find this out cheaply
707 708
 * by looking at the PGPROC struct for each backend.
 *
709
 * 2. The given Xid is one of the cached subxact Xids in the PGPROC array.
710 711
 * We can find this out cheaply too.
 *
712 713 714 715 716 717 718 719
 * 3. In Hot Standby mode, we must search the KnownAssignedXids list to see
 * if the Xid is running on the master.
 *
 * 4. Search the SubTrans tree to find the Xid's topmost parent, and then
 * see if that is running according to PGPROC or KnownAssignedXids.  This is
 * the slowest way, but sadly it has to be done always if the others failed,
 * unless we see that the cached subxact sets are complete (none have
 * overflowed).
720
 *
721 722 723 724
 * ProcArrayLock has to be held while we do 1, 2, 3.  If we save the top Xids
 * while doing 1 and 3, we can release the ProcArrayLock while we do 4.
 * This buys back some concurrency (and we can't retrieve the main Xids from
 * PGPROC again anyway; see GetNewTransactionId).
725 726 727 728
 */
bool
TransactionIdIsInProgress(TransactionId xid)
{
729 730
	static TransactionId *xids = NULL;
	int			nxids = 0;
731
	ProcArrayStruct *arrayP = procArray;
732
	TransactionId topxid;
733 734 735 736
	int			i,
				j;

	/*
B
Bruce Momjian 已提交
737
	 * Don't bother checking a transaction older than RecentXmin; it could not
B
Bruce Momjian 已提交
738 739 740
	 * possibly still be running.  (Note: in particular, this guarantees that
	 * we reject InvalidTransactionId, FrozenTransactionId, etc as not
	 * running.)
741 742 743 744 745 746 747
	 */
	if (TransactionIdPrecedes(xid, RecentXmin))
	{
		xc_by_recent_xmin_inc();
		return false;
	}

748 749 750 751 752 753 754 755 756 757 758
	/*
	 * We may have just checked the status of this transaction, so if it is
	 * already known to be completed, we can fall out without any access to
	 * shared memory.
	 */
	if (TransactionIdIsKnownCompleted(xid))
	{
		xc_by_known_xact_inc();
		return false;
	}

759 760 761 762 763 764 765 766 767 768 769
	/*
	 * Also, we can handle our own transaction (and subtransactions) without
	 * any access to shared memory.
	 */
	if (TransactionIdIsCurrentTransactionId(xid))
	{
		xc_by_my_xact_inc();
		return true;
	}

	/*
B
Bruce Momjian 已提交
770 771
	 * If not first time through, get workspace to remember main XIDs in. We
	 * malloc it permanently to avoid repeated palloc/pfree overhead.
772 773 774
	 */
	if (xids == NULL)
	{
775
		/*
B
Bruce Momjian 已提交
776 777 778
		 * In hot standby mode, reserve enough space to hold all xids in the
		 * known-assigned list. If we later finish recovery, we no longer need
		 * the bigger array, but we don't bother to shrink it.
779
		 */
780
		int			maxxids = RecoveryInProgress() ? TOTAL_MAX_CACHED_SUBXIDS : arrayP->maxProcs;
781 782

		xids = (TransactionId *) malloc(maxxids * sizeof(TransactionId));
783 784 785 786 787
		if (xids == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
	}
788 789 790

	LWLockAcquire(ProcArrayLock, LW_SHARED);

791 792 793 794 795 796 797 798 799 800 801 802
	/*
	 * Now that we have the lock, we can check latestCompletedXid; if the
	 * target Xid is after that, it's surely still running.
	 */
	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, xid))
	{
		LWLockRelease(ProcArrayLock);
		xc_by_latest_xid_inc();
		return true;
	}

	/* No shortcuts, gotta grovel through the array */
803 804
	for (i = 0; i < arrayP->numProcs; i++)
	{
B
Bruce Momjian 已提交
805
		volatile PGPROC *proc = arrayP->procs[i];
806 807 808 809 810
		TransactionId pxid;

		/* Ignore my own proc --- dealt with it above */
		if (proc == MyProc)
			continue;
811 812

		/* Fetch xid just once - see GetNewTransactionId */
813
		pxid = proc->xid;
814 815 816 817 818 819 820 821 822

		if (!TransactionIdIsValid(pxid))
			continue;

		/*
		 * Step 1: check the main Xid
		 */
		if (TransactionIdEquals(pxid, xid))
		{
823
			LWLockRelease(ProcArrayLock);
824
			xc_by_main_xid_inc();
825
			return true;
826 827 828
		}

		/*
B
Bruce Momjian 已提交
829 830
		 * We can ignore main Xids that are younger than the target Xid, since
		 * the target could not possibly be their child.
831 832 833 834 835 836 837 838 839 840 841 842 843 844
		 */
		if (TransactionIdPrecedes(xid, pxid))
			continue;

		/*
		 * Step 2: check the cached child-Xids arrays
		 */
		for (j = proc->subxids.nxids - 1; j >= 0; j--)
		{
			/* Fetch xid just once - see GetNewTransactionId */
			TransactionId cxid = proc->subxids.xids[j];

			if (TransactionIdEquals(cxid, xid))
			{
845
				LWLockRelease(ProcArrayLock);
846
				xc_by_child_xid_inc();
847
				return true;
848 849 850 851
			}
		}

		/*
852
		 * Save the main Xid for step 4.  We only need to remember main Xids
B
Bruce Momjian 已提交
853 854 855 856
		 * that have uncached children.  (Note: there is no race condition
		 * here because the overflowed flag cannot be cleared, only set, while
		 * we hold ProcArrayLock.  So we can't miss an Xid that we need to
		 * worry about.)
857 858 859 860 861
		 */
		if (proc->subxids.overflowed)
			xids[nxids++] = pxid;
	}

862 863 864 865
	/*
	 * Step 3: in hot standby mode, check the known-assigned-xids list.  XIDs
	 * in the list must be treated as running.
	 */
866 867 868 869 870
	if (RecoveryInProgress())
	{
		/* none of the PGPROC entries should have XIDs in hot standby mode */
		Assert(nxids == 0);

871
		if (KnownAssignedXidExists(xid))
872 873
		{
			LWLockRelease(ProcArrayLock);
874
			xc_by_known_assigned_inc();
875 876 877 878
			return true;
		}

		/*
B
Bruce Momjian 已提交
879
		 * If the KnownAssignedXids overflowed, we have to check pg_subtrans
B
Bruce Momjian 已提交
880 881 882 883
		 * too.  Fetch all xids from KnownAssignedXids that are lower than
		 * xid, since if xid is a subtransaction its parent will always have a
		 * lower value.  Note we will collect both main and subXIDs here, but
		 * there's no help for it.
884 885 886 887 888
		 */
		if (TransactionIdPrecedesOrEquals(xid, procArray->lastOverflowedXid))
			nxids = KnownAssignedXidsGet(xids, xid);
	}

889 890 891 892
	LWLockRelease(ProcArrayLock);

	/*
	 * If none of the relevant caches overflowed, we know the Xid is not
893
	 * running without even looking at pg_subtrans.
894 895
	 */
	if (nxids == 0)
896 897
	{
		xc_no_overflow_inc();
898
		return false;
899
	}
900 901

	/*
902
	 * Step 4: have to check pg_subtrans.
903
	 *
904 905 906 907
	 * At this point, we know it's either a subtransaction of one of the Xids
	 * in xids[], or it's not running.  If it's an already-failed
	 * subtransaction, we want to say "not running" even though its parent may
	 * still be running.  So first, check pg_clog to see if it's been aborted.
908 909 910 911
	 */
	xc_slow_answer_inc();

	if (TransactionIdDidAbort(xid))
912
		return false;
913 914

	/*
B
Bruce Momjian 已提交
915
	 * It isn't aborted, so check whether the transaction tree it belongs to
B
Bruce Momjian 已提交
916 917
	 * is still running (or, more precisely, whether it was running when we
	 * held ProcArrayLock).
918 919 920 921 922 923 924 925
	 */
	topxid = SubTransGetTopmostTransaction(xid);
	Assert(TransactionIdIsValid(topxid));
	if (!TransactionIdEquals(topxid, xid))
	{
		for (i = 0; i < nxids; i++)
		{
			if (TransactionIdEquals(xids[i], topxid))
926
				return true;
927 928 929
		}
	}

930
	return false;
931 932
}

933 934 935 936
/*
 * TransactionIdIsActive -- is xid the top-level XID of an active backend?
 *
 * This differs from TransactionIdIsInProgress in that it ignores prepared
937 938
 * transactions, as well as transactions running on the master if we're in
 * hot standby.  Also, we ignore subtransactions since that's not needed
939 940 941 942 943 944 945 946 947 948
 * for current uses.
 */
bool
TransactionIdIsActive(TransactionId xid)
{
	bool		result = false;
	ProcArrayStruct *arrayP = procArray;
	int			i;

	/*
B
Bruce Momjian 已提交
949 950
	 * Don't bother checking a transaction older than RecentXmin; it could not
	 * possibly still be running.
951 952 953 954 955 956 957 958
	 */
	if (TransactionIdPrecedes(xid, RecentXmin))
		return false;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (i = 0; i < arrayP->numProcs; i++)
	{
B
Bruce Momjian 已提交
959
		volatile PGPROC *proc = arrayP->procs[i];
960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982

		/* Fetch xid just once - see GetNewTransactionId */
		TransactionId pxid = proc->xid;

		if (!TransactionIdIsValid(pxid))
			continue;

		if (proc->pid == 0)
			continue;			/* ignore prepared transactions */

		if (TransactionIdEquals(pxid, xid))
		{
			result = true;
			break;
		}
	}

	LWLockRelease(ProcArrayLock);

	return result;
}


983 984 985 986 987 988 989
/*
 * GetOldestXmin -- returns oldest transaction that was running
 *					when any current transaction was started.
 *
 * If allDbs is TRUE then all backends are considered; if allDbs is FALSE
 * then only backends running in my own database are considered.
 *
990 991
 * If ignoreVacuum is TRUE then backends with the PROC_IN_VACUUM flag set are
 * ignored.
992
 *
993 994 995
 * This is used by VACUUM to decide which deleted tuples must be preserved
 * in a table.	allDbs = TRUE is needed for shared relations, but allDbs =
 * FALSE is sufficient for non-shared relations, since only backends in my
B
Bruce Momjian 已提交
996
 * own database could ever see the tuples in them.	Also, we can ignore
997 998
 * concurrently running lazy VACUUMs because (a) they must be working on other
 * tables, and (b) they don't need to do snapshot-based lookups.
999 1000
 *
 * This is also used to determine where to truncate pg_subtrans.  allDbs
1001
 * must be TRUE for that case, and ignoreVacuum FALSE.
1002
 *
1003
 * Note: we include all currently running xids in the set of considered xids.
1004 1005
 * This ensures that if a just-started xact has not yet set its snapshot,
 * when it does set the snapshot it cannot set xmin less than what we compute.
1006
 * See notes in src/backend/access/transam/README.
1007 1008
 */
TransactionId
1009
GetOldestXmin(bool allDbs, bool ignoreVacuum)
1010 1011 1012 1013 1014
{
	ProcArrayStruct *arrayP = procArray;
	TransactionId result;
	int			index;

1015 1016 1017
	/* Cannot look for individual databases during recovery */
	Assert(allDbs || !RecoveryInProgress());

1018 1019
	LWLockAcquire(ProcArrayLock, LW_SHARED);

1020
	/*
B
Bruce Momjian 已提交
1021 1022 1023 1024
	 * We initialize the MIN() calculation with latestCompletedXid + 1. This
	 * is a lower bound for the XIDs that might appear in the ProcArray later,
	 * and so protects us against overestimating the result due to future
	 * additions.
1025
	 */
1026 1027 1028
	result = ShmemVariableCache->latestCompletedXid;
	Assert(TransactionIdIsNormal(result));
	TransactionIdAdvance(result);
1029 1030 1031

	for (index = 0; index < arrayP->numProcs; index++)
	{
B
Bruce Momjian 已提交
1032
		volatile PGPROC *proc = arrayP->procs[index];
1033

1034
		if (ignoreVacuum && (proc->vacuumFlags & PROC_IN_VACUUM))
1035 1036
			continue;

1037 1038 1039 1040 1041
		if (allDbs || proc->databaseId == MyDatabaseId)
		{
			/* Fetch xid just once - see GetNewTransactionId */
			TransactionId xid = proc->xid;

1042 1043 1044 1045 1046 1047 1048 1049 1050
			/* First consider the transaction's own Xid, if any */
			if (TransactionIdIsNormal(xid) &&
				TransactionIdPrecedes(xid, result))
				result = xid;

			/*
			 * Also consider the transaction's Xmin, if set.
			 *
			 * We must check both Xid and Xmin because a transaction might
B
Bruce Momjian 已提交
1051 1052
			 * have an Xmin but not (yet) an Xid; conversely, if it has an
			 * Xid, that could determine some not-yet-set Xmin.
1053 1054 1055 1056 1057
			 */
			xid = proc->xmin;	/* Fetch just once */
			if (TransactionIdIsNormal(xid) &&
				TransactionIdPrecedes(xid, result))
				result = xid;
1058 1059 1060
		}
	}

1061 1062 1063 1064 1065 1066 1067
	if (RecoveryInProgress())
	{
		/*
		 * Check to see whether KnownAssignedXids contains an xid value
		 * older than the main procarray.
		 */
		TransactionId kaxmin = KnownAssignedXidsGetOldestXmin();
1068

1069 1070 1071 1072 1073
		if (TransactionIdIsNormal(kaxmin) &&
			TransactionIdPrecedes(kaxmin, result))
				result = kaxmin;
	}

1074 1075
	LWLockRelease(ProcArrayLock);

1076
	/*
B
Bruce Momjian 已提交
1077 1078
	 * Compute the cutoff XID, being careful not to generate a "permanent"
	 * XID.
1079 1080 1081 1082
	 *
	 * vacuum_defer_cleanup_age provides some additional "slop" for the
	 * benefit of hot standby queries on slave servers.  This is quick and
	 * dirty, and perhaps not all that useful unless the master has a
B
Bruce Momjian 已提交
1083 1084 1085 1086
	 * predictable transaction rate, but it's what we've got.  Note that we
	 * are assuming vacuum_defer_cleanup_age isn't large enough to cause
	 * wraparound --- so guc.c should limit it to no more than the
	 * xidStopLimit threshold in varsup.c.
1087 1088 1089 1090 1091
	 */
	result -= vacuum_defer_cleanup_age;
	if (!TransactionIdIsNormal(result))
		result = FirstNormalTransactionId;

1092 1093 1094
	return result;
}

1095
/*
1096 1097 1098
 * GetSnapshotData -- returns information about running transactions.
 *
 * The returned snapshot includes xmin (lowest still-running xact ID),
1099
 * xmax (highest completed xact ID + 1), and a list of running xact IDs
1100 1101 1102 1103 1104 1105 1106 1107
 * in the range xmin <= xid < xmax.  It is used as follows:
 *		All xact IDs < xmin are considered finished.
 *		All xact IDs >= xmax are considered still running.
 *		For an xact ID xmin <= xid < xmax, consult list to see whether
 *		it is considered running or not.
 * This ensures that the set of transactions seen as "running" by the
 * current xact will not change after it takes the snapshot.
 *
1108 1109 1110 1111 1112
 * All running top-level XIDs are included in the snapshot, except for lazy
 * VACUUM processes.  We also try to include running subtransaction XIDs,
 * but since PGPROC has only a limited cache area for subxact XIDs, full
 * information may not be available.  If we find any overflowed subxid arrays,
 * we have to mark the snapshot's subxid data as overflowed, and extra work
1113
 * *may* need to be done to determine what's running (see XidInMVCCSnapshot()
1114
 * in tqual.c).
1115 1116 1117
 *
 * We also update the following backend-global variables:
 *		TransactionXmin: the oldest xmin of any snapshot in use in the
1118
 *			current transaction (this is the same as MyProc->xmin).
1119 1120 1121
 *		RecentXmin: the xmin computed for the most recent snapshot.  XIDs
 *			older than this are known not running any more.
 *		RecentGlobalXmin: the global xmin (oldest TransactionXmin across all
1122
 *			running transactions, except those running LAZY VACUUM).  This is
T
Tom Lane 已提交
1123
 *			the same computation done by GetOldestXmin(true, true).
1124 1125 1126
 *
 * Note: this function should probably not be called with an argument that's
 * not statically allocated (see xip allocation below).
1127 1128
 */
Snapshot
1129
GetSnapshotData(Snapshot snapshot)
1130 1131 1132 1133 1134 1135 1136
{
	ProcArrayStruct *arrayP = procArray;
	TransactionId xmin;
	TransactionId xmax;
	TransactionId globalxmin;
	int			index;
	int			count = 0;
1137
	int			subcount = 0;
1138
	bool		suboverflowed = false;
1139 1140 1141 1142

	Assert(snapshot != NULL);

	/*
B
Bruce Momjian 已提交
1143 1144
	 * Allocating space for maxProcs xids is usually overkill; numProcs would
	 * be sufficient.  But it seems better to do the malloc while not holding
1145 1146
	 * the lock, so we can't look at numProcs.  Likewise, we allocate much
	 * more subxip storage than is probably needed.
1147 1148
	 *
	 * This does open a possibility for avoiding repeated malloc/free: since
B
Bruce Momjian 已提交
1149
	 * maxProcs does not change at runtime, we can simply reuse the previous
1150
	 * xip arrays if any.  (This relies on the fact that all callers pass
B
Bruce Momjian 已提交
1151
	 * static SnapshotData structs.)
1152 1153 1154 1155
	 */
	if (snapshot->xip == NULL)
	{
		/*
B
Bruce Momjian 已提交
1156 1157
		 * First call for this snapshot. Snapshot is same size whether or not
		 * we are in recovery, see later comments.
1158 1159
		 */
		snapshot->xip = (TransactionId *)
1160
			malloc(arrayP->maxProcs * sizeof(TransactionId));
1161 1162 1163 1164
		if (snapshot->xip == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
1165 1166
		Assert(snapshot->subxip == NULL);
		snapshot->subxip = (TransactionId *)
1167
			malloc(TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
1168 1169 1170 1171
		if (snapshot->subxip == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
1172 1173 1174
	}

	/*
B
Bruce Momjian 已提交
1175
	 * It is sufficient to get shared lock on ProcArrayLock, even if we are
1176
	 * going to set MyProc->xmin.
1177
	 */
1178
	LWLockAcquire(ProcArrayLock, LW_SHARED);
1179

1180 1181 1182 1183
	/* xmax is always latestCompletedXid + 1 */
	xmax = ShmemVariableCache->latestCompletedXid;
	Assert(TransactionIdIsNormal(xmax));
	TransactionIdAdvance(xmax);
1184

1185 1186 1187
	/* initialize xmin calculation with xmax */
	globalxmin = xmin = xmax;

B
Bruce Momjian 已提交
1188
	/*
1189
	 * If we're in recovery then snapshot data comes from a different place,
B
Bruce Momjian 已提交
1190 1191
	 * so decide which route we take before grab the lock. It is possible for
	 * recovery to end before we finish taking snapshot, and for newly
1192 1193 1194
	 * assigned transaction ids to be added to the procarray. Xmax cannot
	 * change while we hold ProcArrayLock, so those newly added transaction
	 * ids would be filtered away, so we need not be concerned about them.
B
Bruce Momjian 已提交
1195
	 */
1196 1197
	snapshot->takenDuringRecovery = RecoveryInProgress();

1198
	if (!snapshot->takenDuringRecovery)
1199 1200
	{
		/*
B
Bruce Momjian 已提交
1201 1202
		 * Spin over procArray checking xid, xmin, and subxids.  The goal is
		 * to gather all active xids, find the lowest xmin, and try to record
1203 1204 1205 1206
		 * subxids. During recovery no xids will be assigned, so all normal
		 * backends can be ignored, nor are there any VACUUMs running. All
		 * prepared transaction xids are held in KnownAssignedXids, so these
		 * will be seen without needing to loop through procs here.
1207
		 */
1208
		for (index = 0; index < arrayP->numProcs; index++)
1209
		{
1210 1211 1212 1213 1214
			volatile PGPROC *proc = arrayP->procs[index];
			TransactionId xid;

			/* Ignore procs running LAZY VACUUM */
			if (proc->vacuumFlags & PROC_IN_VACUUM)
1215
				continue;
1216

1217
			/* Update globalxmin to be the smallest valid xmin */
B
Bruce Momjian 已提交
1218
			xid = proc->xmin;	/* fetch just once */
1219 1220 1221 1222 1223 1224 1225 1226
			if (TransactionIdIsNormal(xid) &&
				TransactionIdPrecedes(xid, globalxmin))
				globalxmin = xid;

			/* Fetch xid just once - see GetNewTransactionId */
			xid = proc->xid;

			/*
B
Bruce Momjian 已提交
1227 1228 1229 1230
			 * If the transaction has been assigned an xid < xmax we add it to
			 * the snapshot, and update xmin if necessary.	There's no need to
			 * store XIDs >= xmax, since we'll treat them as running anyway.
			 * We don't bother to examine their subxids either.
1231
			 *
B
Bruce Momjian 已提交
1232 1233
			 * We don't include our own XID (if any) in the snapshot, but we
			 * must include it into xmin.
1234 1235
			 */
			if (TransactionIdIsNormal(xid))
1236
			{
1237 1238 1239 1240 1241 1242 1243
				if (TransactionIdFollowsOrEquals(xid, xmax))
					continue;
				if (proc != MyProc)
					snapshot->xip[count++] = xid;
				if (TransactionIdPrecedes(xid, xmin))
					xmin = xid;
			}
1244

1245
			/*
B
Bruce Momjian 已提交
1246 1247 1248 1249 1250
			 * Save subtransaction XIDs if possible (if we've already
			 * overflowed, there's no point).  Note that the subxact XIDs must
			 * be later than their parent, so no need to check them against
			 * xmin.  We could filter against xmax, but it seems better not to
			 * do that much work while holding the ProcArrayLock.
1251 1252
			 *
			 * The other backend can add more subxids concurrently, but cannot
B
Bruce Momjian 已提交
1253 1254 1255 1256
			 * remove any.	Hence it's important to fetch nxids just once.
			 * Should be safe to use memcpy, though.  (We needn't worry about
			 * missing any xids added concurrently, because they must postdate
			 * xmax.)
1257 1258 1259 1260 1261 1262 1263 1264
			 *
			 * Again, our own XIDs are not included in the snapshot.
			 */
			if (!suboverflowed && proc != MyProc)
			{
				if (proc->subxids.overflowed)
					suboverflowed = true;
				else
1265
				{
1266 1267 1268 1269 1270 1271 1272 1273 1274
					int			nxids = proc->subxids.nxids;

					if (nxids > 0)
					{
						memcpy(snapshot->subxip + subcount,
							   (void *) proc->subxids.xids,
							   nxids * sizeof(TransactionId));
						subcount += nxids;
					}
1275 1276 1277
				}
			}
		}
1278
	}
1279
	else
1280 1281
	{
		/*
1282 1283
		 * We're in hot standby, so get XIDs from KnownAssignedXids.
		 *
1284 1285 1286 1287 1288
		 * We store all xids directly into subxip[]. Here's why:
		 *
		 * In recovery we don't know which xids are top-level and which are
		 * subxacts, a design choice that greatly simplifies xid processing.
		 *
B
Bruce Momjian 已提交
1289 1290 1291 1292
		 * It seems like we would want to try to put xids into xip[] only, but
		 * that is fairly small. We would either need to make that bigger or
		 * to increase the rate at which we WAL-log xid assignment; neither is
		 * an appealing choice.
1293 1294 1295 1296
		 *
		 * We could try to store xids into xip[] first and then into subxip[]
		 * if there are too many xids. That only works if the snapshot doesn't
		 * overflow because we do not search subxip[] in that case. A simpler
B
Bruce Momjian 已提交
1297 1298
		 * way is to just store all xids in the subxact array because this is
		 * by far the bigger array. We just leave the xip array empty.
1299 1300 1301 1302 1303
		 *
		 * Either way we need to change the way XidInMVCCSnapshot() works
		 * depending upon when the snapshot was taken, or change normal
		 * snapshot processing so it matches.
		 */
1304 1305
		subcount = KnownAssignedXidsGetAndSetXmin(snapshot->subxip, &xmin,
												  xmax);
1306

1307
		if (TransactionIdPrecedesOrEquals(xmin, procArray->lastOverflowedXid))
1308 1309 1310
			suboverflowed = true;
	}

1311
	if (!TransactionIdIsValid(MyProc->xmin))
1312 1313 1314 1315 1316
		MyProc->xmin = TransactionXmin = xmin;

	LWLockRelease(ProcArrayLock);

	/*
B
Bruce Momjian 已提交
1317 1318 1319
	 * Update globalxmin to include actual process xids.  This is a slightly
	 * different way of computing it than GetOldestXmin uses, but should give
	 * the same result.
1320 1321 1322 1323 1324
	 */
	if (TransactionIdPrecedes(xmin, globalxmin))
		globalxmin = xmin;

	/* Update global variables too */
1325 1326 1327
	RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
	if (!TransactionIdIsNormal(RecentGlobalXmin))
		RecentGlobalXmin = FirstNormalTransactionId;
1328 1329 1330 1331 1332
	RecentXmin = xmin;

	snapshot->xmin = xmin;
	snapshot->xmax = xmax;
	snapshot->xcnt = count;
1333
	snapshot->subxcnt = subcount;
1334
	snapshot->suboverflowed = suboverflowed;
1335

1336
	snapshot->curcid = GetCurrentCommandId(false);
1337

1338
	/*
1339 1340
	 * This is a new snapshot, so set both refcounts are zero, and mark it as
	 * not copied in persistent memory.
1341 1342 1343 1344 1345
	 */
	snapshot->active_count = 0;
	snapshot->regd_count = 0;
	snapshot->copied = false;

1346 1347 1348
	return snapshot;
}

1349 1350 1351
/*
 * GetRunningTransactionData -- returns information about running transactions.
 *
1352
 * Similar to GetSnapshotData but returns more information. We include
1353 1354
 * all PGPROCs with an assigned TransactionId, even VACUUM processes.
 *
1355 1356 1357 1358
 * We acquire XidGenLock, but the caller is responsible for releasing it.
 * This ensures that no new XIDs enter the proc array until the caller has
 * WAL-logged this snapshot, and releases the lock.
 *
1359 1360 1361
 * The returned data structure is statically allocated; caller should not
 * modify it, and must not assume it is valid past the next call.
 *
1362 1363 1364 1365 1366 1367 1368 1369 1370 1371
 * This is never executed during recovery so there is no need to look at
 * KnownAssignedXids.
 *
 * We don't worry about updating other counters, we want to keep this as
 * simple as possible and leave GetSnapshotData() as the primary code for
 * that bookkeeping.
 */
RunningTransactions
GetRunningTransactionData(void)
{
1372 1373 1374
	/* result workspace */
	static RunningTransactionsData CurrentRunningXactsData;

1375
	ProcArrayStruct *arrayP = procArray;
1376
	RunningTransactions CurrentRunningXacts = &CurrentRunningXactsData;
1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392
	TransactionId latestCompletedXid;
	TransactionId oldestRunningXid;
	TransactionId *xids;
	int			index;
	int			count;
	int			subcount;
	bool		suboverflowed;

	Assert(!RecoveryInProgress());

	/*
	 * Allocating space for maxProcs xids is usually overkill; numProcs would
	 * be sufficient.  But it seems better to do the malloc while not holding
	 * the lock, so we can't look at numProcs.  Likewise, we allocate much
	 * more subxip storage than is probably needed.
	 *
1393
	 * Should only be allocated in bgwriter, since only ever executed during
B
Bruce Momjian 已提交
1394
	 * checkpoints.
1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423
	 */
	if (CurrentRunningXacts->xids == NULL)
	{
		/*
		 * First call
		 */
		CurrentRunningXacts->xids = (TransactionId *)
			malloc(TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
		if (CurrentRunningXacts->xids == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
	}

	xids = CurrentRunningXacts->xids;

	count = subcount = 0;
	suboverflowed = false;

	/*
	 * Ensure that no xids enter or leave the procarray while we obtain
	 * snapshot.
	 */
	LWLockAcquire(ProcArrayLock, LW_SHARED);
	LWLockAcquire(XidGenLock, LW_SHARED);

	latestCompletedXid = ShmemVariableCache->latestCompletedXid;

	oldestRunningXid = ShmemVariableCache->nextXid;
B
Bruce Momjian 已提交
1424

1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449
	/*
	 * Spin over procArray collecting all xids and subxids.
	 */
	for (index = 0; index < arrayP->numProcs; index++)
	{
		volatile PGPROC *proc = arrayP->procs[index];
		TransactionId xid;
		int			nxids;

		/* Fetch xid just once - see GetNewTransactionId */
		xid = proc->xid;

		/*
		 * We don't need to store transactions that don't have a TransactionId
		 * yet because they will not show as running on a standby server.
		 */
		if (!TransactionIdIsValid(xid))
			continue;

		xids[count++] = xid;

		if (TransactionIdPrecedes(xid, oldestRunningXid))
			oldestRunningXid = xid;

		/*
B
Bruce Momjian 已提交
1450 1451
		 * Save subtransaction XIDs. Other backends can't add or remove
		 * entries while we're holding XidGenLock.
1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464
		 */
		nxids = proc->subxids.nxids;
		if (nxids > 0)
		{
			memcpy(&xids[count], (void *) proc->subxids.xids,
				   nxids * sizeof(TransactionId));
			count += nxids;
			subcount += nxids;

			if (proc->subxids.overflowed)
				suboverflowed = true;

			/*
1465
			 * Top-level XID of a transaction is always less than any of
1466 1467 1468 1469 1470 1471 1472 1473 1474 1475
			 * its subxids, so we don't need to check if any of the subxids
			 * are smaller than oldestRunningXid
			 */
		}
	}

	CurrentRunningXacts->xcnt = count;
	CurrentRunningXacts->subxid_overflow = suboverflowed;
	CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid;
	CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
1476
	CurrentRunningXacts->latestCompletedXid = latestCompletedXid;
1477

1478
	/* We don't release XidGenLock here, the caller is responsible for that */
1479 1480
	LWLockRelease(ProcArrayLock);

1481 1482 1483 1484
	Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid));
	Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid));
	Assert(TransactionIdIsNormal(CurrentRunningXacts->latestCompletedXid));

1485 1486 1487
	return CurrentRunningXacts;
}

1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509
/*
 * GetTransactionsInCommit -- Get the XIDs of transactions that are committing
 *
 * Constructs an array of XIDs of transactions that are currently in commit
 * critical sections, as shown by having inCommit set in their PGPROC entries.
 *
 * *xids_p is set to a palloc'd array that should be freed by the caller.
 * The return value is the number of valid entries.
 *
 * Note that because backends set or clear inCommit without holding any lock,
 * the result is somewhat indeterminate, but we don't really care.  Even in
 * a multiprocessor with delayed writes to shared memory, it should be certain
 * that setting of inCommit will propagate to shared memory when the backend
 * takes the WALInsertLock, so we cannot fail to see an xact as inCommit if
 * it's already inserted its commit record.  Whether it takes a little while
 * for clearing of inCommit to propagate is unimportant for correctness.
 */
int
GetTransactionsInCommit(TransactionId **xids_p)
{
	ProcArrayStruct *arrayP = procArray;
	TransactionId *xids;
B
Bruce Momjian 已提交
1510 1511
	int			nxids;
	int			index;
1512 1513 1514 1515 1516 1517 1518 1519

	xids = (TransactionId *) palloc(arrayP->maxProcs * sizeof(TransactionId));
	nxids = 0;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
B
Bruce Momjian 已提交
1520 1521
		volatile PGPROC *proc = arrayP->procs[index];

1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546
		/* Fetch xid just once - see GetNewTransactionId */
		TransactionId pxid = proc->xid;

		if (proc->inCommit && TransactionIdIsValid(pxid))
			xids[nxids++] = pxid;
	}

	LWLockRelease(ProcArrayLock);

	*xids_p = xids;
	return nxids;
}

/*
 * HaveTransactionsInCommit -- Are any of the specified XIDs in commit?
 *
 * This is used with the results of GetTransactionsInCommit to see if any
 * of the specified XIDs are still in their commit critical sections.
 *
 * Note: this is O(N^2) in the number of xacts that are/were in commit, but
 * those numbers should be small enough for it not to be a problem.
 */
bool
HaveTransactionsInCommit(TransactionId *xids, int nxids)
{
B
Bruce Momjian 已提交
1547
	bool		result = false;
1548
	ProcArrayStruct *arrayP = procArray;
B
Bruce Momjian 已提交
1549
	int			index;
1550 1551 1552 1553 1554

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
B
Bruce Momjian 已提交
1555 1556
		volatile PGPROC *proc = arrayP->procs[index];

1557 1558 1559 1560 1561
		/* Fetch xid just once - see GetNewTransactionId */
		TransactionId pxid = proc->xid;

		if (proc->inCommit && TransactionIdIsValid(pxid))
		{
B
Bruce Momjian 已提交
1562
			int			i;
1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581

			for (i = 0; i < nxids; i++)
			{
				if (xids[i] == pxid)
				{
					result = true;
					break;
				}
			}
			if (result)
				break;
		}
	}

	LWLockRelease(ProcArrayLock);

	return result;
}

1582 1583
/*
 * BackendPidGetProc -- get a backend's PGPROC given its PID
1584 1585 1586 1587
 *
 * Returns NULL if not found.  Note that it is up to the caller to be
 * sure that the question remains meaningful for long enough for the
 * answer to be used ...
1588
 */
1589
PGPROC *
1590 1591 1592 1593 1594 1595
BackendPidGetProc(int pid)
{
	PGPROC	   *result = NULL;
	ProcArrayStruct *arrayP = procArray;
	int			index;

1596 1597 1598
	if (pid == 0)				/* never match dummy PGPROCs */
		return NULL;

1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616
	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
		PGPROC	   *proc = arrayP->procs[index];

		if (proc->pid == pid)
		{
			result = proc;
			break;
		}
	}

	LWLockRelease(ProcArrayLock);

	return result;
}

T
Tatsuo Ishii 已提交
1617 1618 1619 1620 1621 1622
/*
 * BackendXidGetPid -- get a backend's pid given its XID
 *
 * Returns 0 if not found or it's a prepared transaction.  Note that
 * it is up to the caller to be sure that the question remains
 * meaningful for long enough for the answer to be used ...
B
Bruce Momjian 已提交
1623
 *
T
Tatsuo Ishii 已提交
1624 1625
 * Only main transaction Ids are considered.  This function is mainly
 * useful for determining what backend owns a lock.
1626
 *
B
Bruce Momjian 已提交
1627
 * Beware that not every xact has an XID assigned.	However, as long as you
1628
 * only call this using an XID found on disk, you're safe.
T
Tatsuo Ishii 已提交
1629 1630 1631 1632
 */
int
BackendXidGetPid(TransactionId xid)
{
B
Bruce Momjian 已提交
1633
	int			result = 0;
T
Tatsuo Ishii 已提交
1634 1635 1636 1637 1638 1639 1640 1641 1642 1643
	ProcArrayStruct *arrayP = procArray;
	int			index;

	if (xid == InvalidTransactionId)	/* never match invalid xid */
		return 0;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
B
Bruce Momjian 已提交
1644
		volatile PGPROC *proc = arrayP->procs[index];
T
Tatsuo Ishii 已提交
1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657

		if (proc->xid == xid)
		{
			result = proc->pid;
			break;
		}
	}

	LWLockRelease(ProcArrayLock);

	return result;
}

1658 1659 1660 1661 1662 1663 1664 1665 1666
/*
 * IsBackendPid -- is a given pid a running backend
 */
bool
IsBackendPid(int pid)
{
	return (BackendPidGetProc(pid) != NULL);
}

1667 1668 1669 1670

/*
 * GetCurrentVirtualXIDs -- returns an array of currently active VXIDs.
 *
1671
 * The array is palloc'd. The number of valid entries is returned into *nvxids.
1672
 *
1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686
 * The arguments allow filtering the set of VXIDs returned.  Our own process
 * is always skipped.  In addition:
 *	If limitXmin is not InvalidTransactionId, skip processes with
 *		xmin > limitXmin.
 *	If excludeXmin0 is true, skip processes with xmin = 0.
 *	If allDbs is false, skip processes attached to other databases.
 *	If excludeVacuum isn't zero, skip processes for which
 *		(vacuumFlags & excludeVacuum) is not zero.
 *
 * Note: the purpose of the limitXmin and excludeXmin0 parameters is to
 * allow skipping backends whose oldest live snapshot is no older than
 * some snapshot we have.  Since we examine the procarray with only shared
 * lock, there are race conditions: a backend could set its xmin just after
 * we look.  Indeed, on multiprocessors with weak memory ordering, the
1687
 * other backend could have set its xmin *before* we look.	We know however
1688 1689 1690 1691 1692
 * that such a backend must have held shared ProcArrayLock overlapping our
 * own hold of ProcArrayLock, else we would see its xmin update.  Therefore,
 * any snapshot the other backend is taking concurrently with our scan cannot
 * consider any transactions as still running that we think are committed
 * (since backends must hold ProcArrayLock exclusive to commit).
1693 1694
 */
VirtualTransactionId *
1695 1696 1697
GetCurrentVirtualXIDs(TransactionId limitXmin, bool excludeXmin0,
					  bool allDbs, int excludeVacuum,
					  int *nvxids)
1698 1699 1700 1701 1702 1703
{
	VirtualTransactionId *vxids;
	ProcArrayStruct *arrayP = procArray;
	int			count = 0;
	int			index;

1704
	/* allocate what's certainly enough result space */
1705
	vxids = (VirtualTransactionId *)
1706
		palloc(sizeof(VirtualTransactionId) * arrayP->maxProcs);
1707 1708 1709 1710 1711

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
B
Bruce Momjian 已提交
1712
		volatile PGPROC *proc = arrayP->procs[index];
1713 1714 1715 1716

		if (proc == MyProc)
			continue;

1717 1718 1719
		if (excludeVacuum & proc->vacuumFlags)
			continue;

1720
		if (allDbs || proc->databaseId == MyDatabaseId)
1721
		{
1722
			/* Fetch xmin just once - might change on us */
1723 1724
			TransactionId pxmin = proc->xmin;

1725 1726 1727
			if (excludeXmin0 && !TransactionIdIsValid(pxmin))
				continue;

1728
			/*
1729 1730
			 * InvalidTransactionId precedes all other XIDs, so a proc that
			 * hasn't set xmin yet will not be rejected by this test.
1731 1732
			 */
			if (!TransactionIdIsValid(limitXmin) ||
1733
				TransactionIdPrecedesOrEquals(pxmin, limitXmin))
1734 1735
			{
				VirtualTransactionId vxid;
1736

1737 1738 1739 1740
				GET_VXID_FROM_PGPROC(vxid, *proc);
				if (VirtualTransactionIdIsValid(vxid))
					vxids[count++] = vxid;
			}
1741 1742 1743 1744 1745
		}
	}

	LWLockRelease(ProcArrayLock);

1746
	*nvxids = count;
1747 1748 1749
	return vxids;
}

1750 1751 1752 1753 1754 1755
/*
 * GetConflictingVirtualXIDs -- returns an array of currently active VXIDs.
 *
 * Usage is limited to conflict resolution during recovery on standby servers.
 * limitXmin is supplied as either latestRemovedXid, or InvalidTransactionId
 * in cases where we cannot accurately determine a value for latestRemovedXid.
1756
 *
1757 1758 1759
 * If limitXmin is InvalidTransactionId then we want to kill everybody,
 * so we're not worried if they have a snapshot or not, nor does it really
 * matter what type of lock we hold.
1760
 *
1761 1762 1763 1764 1765 1766 1767 1768 1769 1770
 * All callers that are checking xmins always now supply a valid and useful
 * value for limitXmin. The limitXmin is always lower than the lowest
 * numbered KnownAssignedXid that is not already a FATAL error. This is
 * because we only care about cleanup records that are cleaning up tuple
 * versions from committed transactions. In that case they will only occur
 * at the point where the record is less than the lowest running xid. That
 * allows us to say that if any backend takes a snapshot concurrently with
 * us then the conflict assessment made here would never include the snapshot
 * that is being derived. So we take LW_SHARED on the ProcArray and allow
 * concurrent snapshots when limitXmin is valid. We might think about adding
B
Bruce Momjian 已提交
1771
 *	 Assert(limitXmin < lowest(KnownAssignedXids))
1772 1773
 * but that would not be true in the case of FATAL errors lagging in array,
 * but we already know those are bogus anyway, so we skip that test.
1774
 *
1775
 * If dbOid is valid we skip backends attached to other databases.
1776 1777 1778 1779 1780
 *
 * Be careful to *not* pfree the result from this function. We reuse
 * this array sufficiently often that we use malloc for the result.
 */
VirtualTransactionId *
1781
GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
1782 1783 1784 1785 1786 1787 1788 1789
{
	static VirtualTransactionId *vxids;
	ProcArrayStruct *arrayP = procArray;
	int			count = 0;
	int			index;

	/*
	 * If not first time through, get workspace to remember main XIDs in. We
B
Bruce Momjian 已提交
1790 1791
	 * malloc it permanently to avoid repeated palloc/pfree overhead. Allow
	 * result space, remembering room for a terminator.
1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802
	 */
	if (vxids == NULL)
	{
		vxids = (VirtualTransactionId *)
			malloc(sizeof(VirtualTransactionId) * (arrayP->maxProcs + 1));
		if (vxids == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
	}

1803
	LWLockAcquire(ProcArrayLock, LW_SHARED);
1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819

	for (index = 0; index < arrayP->numProcs; index++)
	{
		volatile PGPROC *proc = arrayP->procs[index];

		/* Exclude prepared transactions */
		if (proc->pid == 0)
			continue;

		if (!OidIsValid(dbOid) ||
			proc->databaseId == dbOid)
		{
			/* Fetch xmin just once - can't change on us, but good coding */
			TransactionId pxmin = proc->xmin;

			/*
B
Bruce Momjian 已提交
1820 1821 1822
			 * We ignore an invalid pxmin because this means that backend has
			 * no snapshot and cannot get another one while we hold exclusive
			 * lock.
1823
			 */
1824 1825
			if (!TransactionIdIsValid(limitXmin) ||
				(TransactionIdIsValid(pxmin) && !TransactionIdFollows(pxmin, limitXmin)))
1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850
			{
				VirtualTransactionId vxid;

				GET_VXID_FROM_PGPROC(vxid, *proc);
				if (VirtualTransactionIdIsValid(vxid))
					vxids[count++] = vxid;
			}
		}
	}

	LWLockRelease(ProcArrayLock);

	/* add the terminator */
	vxids[count].backendId = InvalidBackendId;
	vxids[count].localTransactionId = InvalidLocalTransactionId;

	return vxids;
}

/*
 * CancelVirtualTransaction - used in recovery conflict processing
 *
 * Returns pid of the process signaled, or 0 if not found.
 */
pid_t
1851
CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode)
1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868
{
	ProcArrayStruct *arrayP = procArray;
	int			index;
	pid_t		pid = 0;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
		VirtualTransactionId procvxid;
		PGPROC	   *proc = arrayP->procs[index];

		GET_VXID_FROM_PGPROC(procvxid, *proc);

		if (procvxid.backendId == vxid.backendId &&
			procvxid.localTransactionId == vxid.localTransactionId)
		{
1869
			proc->recoveryConflictPending = true;
1870
			pid = proc->pid;
1871 1872 1873
			if (pid != 0)
			{
				/*
B
Bruce Momjian 已提交
1874 1875
				 * Kill the pid if it's still here. If not, that's what we
				 * wanted so ignore any errors.
1876 1877 1878
				 */
				(void) SendProcSignal(pid, sigmode, vxid.backendId);
			}
1879 1880 1881 1882 1883 1884 1885 1886
			break;
		}
	}

	LWLockRelease(ProcArrayLock);

	return pid;
}
1887

1888
/*
1889 1890 1891
 * MinimumActiveBackends --- count backends (other than myself) that are
 *		in active transactions.  Return true if the count exceeds the
 *		minimum threshold passed.  This is used as a heuristic to decide if
1892 1893
 *		a pre-XLOG-flush delay is worthwhile during commit.
 *
1894 1895
 * Do not count backends that are blocked waiting for locks, since they are
 * not going to get to run until someone else commits.
1896
 */
1897 1898
bool
MinimumActiveBackends(int min)
1899 1900 1901 1902 1903
{
	ProcArrayStruct *arrayP = procArray;
	int			count = 0;
	int			index;

1904 1905 1906 1907
	/* Quick short-circuit if no minimum is specified */
	if (min == 0)
		return true;

1908 1909
	/*
	 * Note: for speed, we don't acquire ProcArrayLock.  This is a little bit
B
Bruce Momjian 已提交
1910 1911
	 * bogus, but since we are only testing fields for zero or nonzero, it
	 * should be OK.  The result is only used for heuristic purposes anyway...
1912 1913 1914
	 */
	for (index = 0; index < arrayP->numProcs; index++)
	{
B
Bruce Momjian 已提交
1915
		volatile PGPROC *proc = arrayP->procs[index];
1916

1917 1918 1919 1920 1921 1922 1923
		/*
		 * Since we're not holding a lock, need to check that the pointer is
		 * valid. Someone holding the lock could have incremented numProcs
		 * already, but not yet inserted a valid pointer to the array.
		 *
		 * If someone just decremented numProcs, 'proc' could also point to a
		 * PGPROC entry that's no longer in the array. It still points to a
1924 1925 1926
		 * PGPROC struct, though, because freed PGPPROC entries just go to the
		 * free list and are recycled. Its contents are nonsense in that case,
		 * but that's acceptable for this function.
1927
		 */
1928
		if (proc == NULL)
1929 1930
			continue;

1931 1932
		if (proc == MyProc)
			continue;			/* do not count myself */
1933 1934 1935
		if (proc->pid == 0)
			continue;			/* do not count prepared xacts */
		if (proc->xid == InvalidTransactionId)
1936
			continue;			/* do not count if no XID assigned */
1937 1938 1939
		if (proc->waitLock != NULL)
			continue;			/* do not count if blocked on a lock */
		count++;
1940 1941
		if (count >= min)
			break;
1942 1943
	}

1944
	return count >= min;
1945 1946
}

1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960
/*
 * CountDBBackends --- count backends that are using specified database
 */
int
CountDBBackends(Oid databaseid)
{
	ProcArrayStruct *arrayP = procArray;
	int			count = 0;
	int			index;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
B
Bruce Momjian 已提交
1961
		volatile PGPROC *proc = arrayP->procs[index];
1962 1963 1964

		if (proc->pid == 0)
			continue;			/* do not count prepared xacts */
1965 1966
		if (!OidIsValid(databaseid) ||
			proc->databaseId == databaseid)
1967 1968 1969 1970 1971 1972 1973 1974
			count++;
	}

	LWLockRelease(ProcArrayLock);

	return count;
}

1975 1976 1977 1978
/*
 * CancelDBBackends --- cancel backends that are using specified database
 */
void
1979
CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPending)
1980 1981 1982
{
	ProcArrayStruct *arrayP = procArray;
	int			index;
1983
	pid_t		pid = 0;
1984 1985 1986 1987 1988 1989 1990 1991

	/* tell all backends to die */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

	for (index = 0; index < arrayP->numProcs; index++)
	{
		volatile PGPROC *proc = arrayP->procs[index];

1992
		if (databaseid == InvalidOid || proc->databaseId == databaseid)
1993
		{
1994 1995 1996 1997
			VirtualTransactionId procvxid;

			GET_VXID_FROM_PGPROC(procvxid, *proc);

1998
			proc->recoveryConflictPending = conflictPending;
1999 2000 2001 2002
			pid = proc->pid;
			if (pid != 0)
			{
				/*
B
Bruce Momjian 已提交
2003 2004
				 * Kill the pid if it's still here. If not, that's what we
				 * wanted so ignore any errors.
2005
				 */
2006
				(void) SendProcSignal(pid, sigmode, procvxid.backendId);
2007
			}
2008 2009 2010 2011 2012 2013
		}
	}

	LWLockRelease(ProcArrayLock);
}

2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027
/*
 * CountUserBackends --- count backends that are used by specified user
 */
int
CountUserBackends(Oid roleid)
{
	ProcArrayStruct *arrayP = procArray;
	int			count = 0;
	int			index;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
B
Bruce Momjian 已提交
2028
		volatile PGPROC *proc = arrayP->procs[index];
2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040

		if (proc->pid == 0)
			continue;			/* do not count prepared xacts */
		if (proc->roleId == roleid)
			count++;
	}

	LWLockRelease(ProcArrayLock);

	return count;
}

2041
/*
2042
 * CountOtherDBBackends -- check for other backends running in the given DB
2043 2044 2045
 *
 * If there are other backends in the DB, we will wait a maximum of 5 seconds
 * for them to exit.  Autovacuum backends are encouraged to exit early by
2046
 * sending them SIGTERM, but normal user backends are just waited for.
2047 2048 2049 2050 2051
 *
 * The current backend is always ignored; it is caller's responsibility to
 * check whether the current backend uses the given DB, if it's important.
 *
 * Returns TRUE if there are (still) other backends in the DB, FALSE if not.
2052 2053
 * Also, *nbackends and *nprepared are set to the number of other backends
 * and prepared transactions in the DB, respectively.
2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064
 *
 * This function is used to interlock DROP DATABASE and related commands
 * against there being any active backends in the target DB --- dropping the
 * DB while active backends remain would be a Bad Thing.  Note that we cannot
 * detect here the possibility of a newly-started backend that is trying to
 * connect to the doomed database, so additional interlocking is needed during
 * backend startup.  The caller should normally hold an exclusive lock on the
 * target DB before calling this, which is one reason we mustn't wait
 * indefinitely.
 */
bool
2065
CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
2066 2067
{
	ProcArrayStruct *arrayP = procArray;
2068 2069

#define MAXAUTOVACPIDS	10		/* max autovacs to SIGTERM per iteration */
2070
	int			autovac_pids[MAXAUTOVACPIDS];
2071 2072 2073 2074 2075
	int			tries;

	/* 50 tries with 100ms sleep between tries makes 5 sec total wait */
	for (tries = 0; tries < 50; tries++)
	{
2076
		int			nautovacs = 0;
2077 2078 2079 2080 2081
		bool		found = false;
		int			index;

		CHECK_FOR_INTERRUPTS();

2082 2083
		*nbackends = *nprepared = 0;

2084 2085 2086 2087
		LWLockAcquire(ProcArrayLock, LW_SHARED);

		for (index = 0; index < arrayP->numProcs; index++)
		{
B
Bruce Momjian 已提交
2088
			volatile PGPROC *proc = arrayP->procs[index];
2089 2090 2091 2092 2093 2094 2095 2096

			if (proc->databaseId != databaseId)
				continue;
			if (proc == MyProc)
				continue;

			found = true;

2097 2098
			if (proc->pid == 0)
				(*nprepared)++;
2099 2100
			else
			{
2101 2102 2103 2104
				(*nbackends)++;
				if ((proc->vacuumFlags & PROC_IS_AUTOVACUUM) &&
					nautovacs < MAXAUTOVACPIDS)
					autovac_pids[nautovacs++] = proc->pid;
2105 2106 2107
			}
		}

2108 2109
		LWLockRelease(ProcArrayLock);

2110
		if (!found)
B
Bruce Momjian 已提交
2111
			return false;		/* no conflicting backends, so done */
2112

2113
		/*
2114 2115 2116 2117
		 * Send SIGTERM to any conflicting autovacuums before sleeping. We
		 * postpone this step until after the loop because we don't want to
		 * hold ProcArrayLock while issuing kill(). We have no idea what might
		 * block kill() inside the kernel...
2118 2119 2120 2121 2122
		 */
		for (index = 0; index < nautovacs; index++)
			(void) kill(autovac_pids[index], SIGTERM);	/* ignore any error */

		/* sleep, then try again */
B
Bruce Momjian 已提交
2123
		pg_usleep(100 * 1000L); /* 100ms */
2124 2125
	}

B
Bruce Momjian 已提交
2126
	return true;				/* timed out, still conflicts */
2127 2128
}

2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141

#define XidCacheRemove(i) \
	do { \
		MyProc->subxids.xids[i] = MyProc->subxids.xids[MyProc->subxids.nxids - 1]; \
		MyProc->subxids.nxids--; \
	} while (0)

/*
 * XidCacheRemoveRunningXids
 *
 * Remove a bunch of TransactionIds from the list of known-running
 * subtransactions for my backend.	Both the specified xid and those in
 * the xids[] array (of length nxids) are removed from the subxids cache.
2142
 * latestXid must be the latest XID among the group.
2143 2144
 */
void
2145 2146 2147
XidCacheRemoveRunningXids(TransactionId xid,
						  int nxids, const TransactionId *xids,
						  TransactionId latestXid)
2148 2149 2150 2151
{
	int			i,
				j;

2152
	Assert(TransactionIdIsValid(xid));
2153 2154 2155

	/*
	 * We must hold ProcArrayLock exclusively in order to remove transactions
2156 2157 2158 2159
	 * from the PGPROC array.  (See src/backend/access/transam/README.)  It's
	 * possible this could be relaxed since we know this routine is only used
	 * to abort subtransactions, but pending closer analysis we'd best be
	 * conservative.
2160 2161 2162 2163
	 */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

	/*
B
Bruce Momjian 已提交
2164 2165 2166
	 * Under normal circumstances xid and xids[] will be in increasing order,
	 * as will be the entries in subxids.  Scan backwards to avoid O(N^2)
	 * behavior when removing a lot of xids.
2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179
	 */
	for (i = nxids - 1; i >= 0; i--)
	{
		TransactionId anxid = xids[i];

		for (j = MyProc->subxids.nxids - 1; j >= 0; j--)
		{
			if (TransactionIdEquals(MyProc->subxids.xids[j], anxid))
			{
				XidCacheRemove(j);
				break;
			}
		}
B
Bruce Momjian 已提交
2180

2181
		/*
B
Bruce Momjian 已提交
2182 2183 2184 2185 2186
		 * Ordinarily we should have found it, unless the cache has
		 * overflowed. However it's also possible for this routine to be
		 * invoked multiple times for the same subtransaction, in case of an
		 * error during AbortSubTransaction.  So instead of Assert, emit a
		 * debug warning.
2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203
		 */
		if (j < 0 && !MyProc->subxids.overflowed)
			elog(WARNING, "did not find subXID %u in MyProc", anxid);
	}

	for (j = MyProc->subxids.nxids - 1; j >= 0; j--)
	{
		if (TransactionIdEquals(MyProc->subxids.xids[j], xid))
		{
			XidCacheRemove(j);
			break;
		}
	}
	/* Ordinarily we should have found it, unless the cache has overflowed */
	if (j < 0 && !MyProc->subxids.overflowed)
		elog(WARNING, "did not find subXID %u in MyProc", xid);

2204 2205 2206 2207 2208
	/* Also advance global latestCompletedXid while holding the lock */
	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
							  latestXid))
		ShmemVariableCache->latestCompletedXid = latestXid;

2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220
	LWLockRelease(ProcArrayLock);
}

#ifdef XIDCACHE_DEBUG

/*
 * Print stats about effectiveness of XID cache
 */
static void
DisplayXidCache(void)
{
	fprintf(stderr,
2221
			"XidCache: xmin: %ld, known: %ld, myxact: %ld, latest: %ld, mainxid: %ld, childxid: %ld, knownassigned: %ld, nooflo: %ld, slow: %ld\n",
2222
			xc_by_recent_xmin,
2223
			xc_by_known_xact,
2224
			xc_by_my_xact,
2225
			xc_by_latest_xid,
2226 2227
			xc_by_main_xid,
			xc_by_child_xid,
2228
			xc_by_known_assigned,
2229
			xc_no_overflow,
2230 2231 2232
			xc_slow_answer);
}
#endif   /* XIDCACHE_DEBUG */
2233

2234

2235
/* ----------------------------------------------
B
Bruce Momjian 已提交
2236
 *		KnownAssignedTransactions sub-module
2237 2238 2239 2240 2241
 * ----------------------------------------------
 */

/*
 * In Hot Standby mode, we maintain a list of transactions that are (or were)
2242 2243 2244 2245
 * running in the master at the current point in WAL.  These XIDs must be
 * treated as running by standby transactions, even though they are not in
 * the standby server's PGPROC array.
 *
B
Bruce Momjian 已提交
2246
 * We record all XIDs that we know have been assigned.	That includes all the
2247 2248 2249 2250 2251 2252 2253 2254
 * XIDs seen in WAL records, plus all unobserved XIDs that we can deduce have
 * been assigned.  We can deduce the existence of unobserved XIDs because we
 * know XIDs are assigned in sequence, with no gaps.  The KnownAssignedXids
 * list expands as new XIDs are observed or inferred, and contracts when
 * transaction completion records arrive.
 *
 * During hot standby we do not fret too much about the distinction between
 * top-level XIDs and subtransaction XIDs. We store both together in the
B
Bruce Momjian 已提交
2255
 * KnownAssignedXids list.	In backends, this is copied into snapshots in
2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285
 * GetSnapshotData(), taking advantage of the fact that XidInMVCCSnapshot()
 * doesn't care about the distinction either.  Subtransaction XIDs are
 * effectively treated as top-level XIDs and in the typical case pg_subtrans
 * links are *not* maintained (which does not affect visibility).
 *
 * We have room in KnownAssignedXids and in snapshots to hold maxProcs *
 * (1 + PGPROC_MAX_CACHED_SUBXIDS) XIDs, so every master transaction must
 * report its subtransaction XIDs in a WAL XLOG_XACT_ASSIGNMENT record at
 * least every PGPROC_MAX_CACHED_SUBXIDS.  When we receive one of these
 * records, we mark the subXIDs as children of the top XID in pg_subtrans,
 * and then remove them from KnownAssignedXids.  This prevents overflow of
 * KnownAssignedXids and snapshots, at the cost that status checks for these
 * subXIDs will take a slower path through TransactionIdIsInProgress().
 * This means that KnownAssignedXids is not necessarily complete for subXIDs,
 * though it should be complete for top-level XIDs; this is the same situation
 * that holds with respect to the PGPROC entries in normal running.
 *
 * When we throw away subXIDs from KnownAssignedXids, we need to keep track of
 * that, similarly to tracking overflow of a PGPROC's subxids array.  We do
 * that by remembering the lastOverflowedXID, ie the last thrown-away subXID.
 * As long as that is within the range of interesting XIDs, we have to assume
 * that subXIDs are missing from snapshots.  (Note that subXID overflow occurs
 * on primary when 65th subXID arrives, whereas on standby it occurs when 64th
 * subXID arrives - that is not an error.)
 *
 * Should a backend on primary somehow disappear before it can write an abort
 * record, then we just leave those XIDs in KnownAssignedXids. They actually
 * aborted but we think they were running; the distinction is irrelevant
 * because either way any changes done by the transaction are not visible to
 * backends in the standby.  We prune KnownAssignedXids when
2286
 * XLOG_RUNNING_XACTS arrives, to forestall possible overflow of the
2287 2288 2289 2290 2291 2292 2293
 * array due to such dead XIDs.
 */

/*
 * RecordKnownAssignedTransactionIds
 *		Record the given XID in KnownAssignedXids, as well as any preceding
 *		unobserved XIDs.
2294 2295
 *
 * RecordKnownAssignedTransactionIds() should be run for *every* WAL record
2296 2297
 * associated with a transaction. Must be called for each record after we
 * have executed StartupCLOG() et al, since we must ExtendCLOG() etc..
2298
 *
2299
 * Called during recovery in analogy with and in place of GetNewTransactionId()
2300 2301 2302 2303
 */
void
RecordKnownAssignedTransactionIds(TransactionId xid)
{
2304
	Assert(standbyState >= STANDBY_INITIALIZED);
2305
	Assert(TransactionIdIsValid(xid));
2306

2307
	elog(trace_recovery(DEBUG4), "record known xact %u latestObservedXid %u",
B
Bruce Momjian 已提交
2308
		 xid, latestObservedXid);
2309

2310 2311 2312 2313 2314 2315 2316 2317
	/*
	 * If the KnownAssignedXids machinery isn't up yet, do nothing.
	 */
	if (standbyState <= STANDBY_INITIALIZED)
		return;

	Assert(TransactionIdIsValid(latestObservedXid));

2318
	/*
B
Bruce Momjian 已提交
2319 2320 2321
	 * When a newly observed xid arrives, it is frequently the case that it is
	 * *not* the next xid in sequence. When this occurs, we must treat the
	 * intervening xids as running also.
2322 2323 2324
	 */
	if (TransactionIdFollows(xid, latestObservedXid))
	{
2325
		TransactionId next_expected_xid;
2326 2327

		/*
B
Bruce Momjian 已提交
2328 2329 2330
		 * Extend clog and subtrans like we do in GetNewTransactionId() during
		 * normal operation using individual extend steps. Typical case
		 * requires almost no activity.
2331
		 */
2332 2333
		next_expected_xid = latestObservedXid;
		TransactionIdAdvance(next_expected_xid);
2334 2335 2336 2337 2338 2339 2340 2341
		while (TransactionIdPrecedesOrEquals(next_expected_xid, xid))
		{
			ExtendCLOG(next_expected_xid);
			ExtendSUBTRANS(next_expected_xid);

			TransactionIdAdvance(next_expected_xid);
		}

2342 2343 2344 2345 2346 2347
		/*
		 * Add the new xids onto the KnownAssignedXids array.
		 */
		next_expected_xid = latestObservedXid;
		TransactionIdAdvance(next_expected_xid);
		KnownAssignedXidsAdd(next_expected_xid, xid, false);
2348

2349 2350 2351
		/*
		 * Now we can advance latestObservedXid
		 */
2352 2353
		latestObservedXid = xid;

2354 2355 2356 2357
		/* ShmemVariableCache->nextXid must be beyond any observed xid */
		next_expected_xid = latestObservedXid;
		TransactionIdAdvance(next_expected_xid);
		ShmemVariableCache->nextXid = next_expected_xid;
2358 2359 2360
	}
}

2361 2362 2363
/*
 * ExpireTreeKnownAssignedTransactionIds
 *		Remove the given XIDs from KnownAssignedXids.
2364 2365
 *
 * Called during recovery in analogy with and in place of ProcArrayEndTransaction()
2366
 */
2367 2368
void
ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
B
Bruce Momjian 已提交
2369
							   TransactionId *subxids, TransactionId max_xid)
2370
{
2371
	Assert(standbyState >= STANDBY_INITIALIZED);
2372 2373 2374 2375 2376 2377

	/*
	 * Uses same locking as transaction commit
	 */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

2378
	KnownAssignedXidsRemoveTree(xid, nsubxids, subxids);
2379

2380 2381 2382
	/* As in ProcArrayEndTransaction, advance latestCompletedXid */
	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
							  max_xid))
2383 2384 2385 2386 2387
		ShmemVariableCache->latestCompletedXid = max_xid;

	LWLockRelease(ProcArrayLock);
}

2388 2389 2390 2391
/*
 * ExpireAllKnownAssignedTransactionIds
 *		Remove all entries in KnownAssignedXids
 */
2392 2393 2394 2395
void
ExpireAllKnownAssignedTransactionIds(void)
{
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
2396
	KnownAssignedXidsRemovePreceding(InvalidTransactionId);
2397 2398 2399
	LWLockRelease(ProcArrayLock);
}

2400 2401 2402 2403
/*
 * ExpireOldKnownAssignedTransactionIds
 *		Remove KnownAssignedXids entries preceding the given XID
 */
2404 2405 2406 2407
void
ExpireOldKnownAssignedTransactionIds(TransactionId xid)
{
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
2408
	KnownAssignedXidsRemovePreceding(xid);
2409 2410 2411
	LWLockRelease(ProcArrayLock);
}

2412

2413 2414 2415
/*
 * Private module functions to manipulate KnownAssignedXids
 *
2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462
 * There are 5 main uses of the KnownAssignedXids data structure:
 *
 *	* backends taking snapshots - all valid XIDs need to be copied out
 *	* backends seeking to determine presence of a specific XID
 *	* startup process adding new known-assigned XIDs
 *	* startup process removing specific XIDs as transactions end
 *	* startup process pruning array when special WAL records arrive
 *
 * This data structure is known to be a hot spot during Hot Standby, so we
 * go to some lengths to make these operations as efficient and as concurrent
 * as possible.
 *
 * The XIDs are stored in an array in sorted order --- TransactionIdPrecedes
 * order, to be exact --- to allow binary search for specific XIDs.  Note:
 * in general TransactionIdPrecedes would not provide a total order, but
 * we know that the entries present at any instant should not extend across
 * a large enough fraction of XID space to wrap around (the master would
 * shut down for fear of XID wrap long before that happens).  So it's OK to
 * use TransactionIdPrecedes as a binary-search comparator.
 *
 * It's cheap to maintain the sortedness during insertions, since new known
 * XIDs are always reported in XID order; we just append them at the right.
 *
 * To keep individual deletions cheap, we need to allow gaps in the array.
 * This is implemented by marking array elements as valid or invalid using
 * the parallel boolean array KnownAssignedXidsValid[].  A deletion is done
 * by setting KnownAssignedXidsValid[i] to false, *without* clearing the
 * XID entry itself.  This preserves the property that the XID entries are
 * sorted, so we can do binary searches easily.  Periodically we compress
 * out the unused entries; that's much cheaper than having to compress the
 * array immediately on every deletion.
 *
 * The actually valid items in KnownAssignedXids[] and KnownAssignedXidsValid[]
 * are those with indexes tail <= i < head; items outside this subscript range
 * have unspecified contents.  When head reaches the end of the array, we
 * force compression of unused entries rather than wrapping around, since
 * allowing wraparound would greatly complicate the search logic.  We maintain
 * an explicit tail pointer so that pruning of old XIDs can be done without
 * immediately moving the array contents.  In most cases only a small fraction
 * of the array contains valid entries at any instant.
 *
 * Although only the startup process can ever change the KnownAssignedXids
 * data structure, we still need interlocking so that standby backends will
 * not observe invalid intermediate states.  The convention is that backends
 * must hold shared ProcArrayLock to examine the array.  To remove XIDs from
 * the array, the startup process must hold ProcArrayLock exclusively, for
 * the usual transactional reasons (compare commit/abort of a transaction
B
Bruce Momjian 已提交
2463
 * during normal running).	Compressing unused entries out of the array
2464 2465 2466 2467 2468 2469
 * likewise requires exclusive lock.  To add XIDs to the array, we just insert
 * them into slots to the right of the head pointer and then advance the head
 * pointer.  This wouldn't require any lock at all, except that on machines
 * with weak memory ordering we need to be careful that other processors
 * see the array element changes before they see the head pointer change.
 * We handle this by using a spinlock to protect reads and writes of the
B
Bruce Momjian 已提交
2470
 * head/tail pointers.	(We could dispense with the spinlock if we were to
2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494
 * create suitable memory access barrier primitives and use those instead.)
 * The spinlock must be taken to read or write the head/tail pointers unless
 * the caller holds ProcArrayLock exclusively.
 *
 * Algorithmic analysis:
 *
 * If we have a maximum of M slots, with N XIDs currently spread across
 * S elements then we have N <= S <= M always.
 *
 *	* Adding a new XID is O(1) and needs little locking (unless compression
 *		must happen)
 *	* Compressing the array is O(S) and requires exclusive lock
 *	* Removing an XID is O(logS) and requires exclusive lock
 *	* Taking a snapshot is O(S) and requires shared lock
 *	* Checking for an XID is O(logS) and requires shared lock
 *
 * In comparison, using a hash table for KnownAssignedXids would mean that
 * taking snapshots would be O(M). If we can maintain S << M then the
 * sorted array technique will deliver significantly faster snapshots.
 * If we try to keep S too small then we will spend too much time compressing,
 * so there is an optimal point for any workload mix. We use a heuristic to
 * decide when to compress the array, though trimming also helps reduce
 * frequency of compressing. The heuristic requires us to track the number of
 * currently valid XIDs in the array.
2495 2496
 */

2497

2498
/*
2499 2500 2501 2502 2503
 * Compress KnownAssignedXids by shifting valid data down to the start of the
 * array, removing any gaps.
 *
 * A compression step is forced if "force" is true, otherwise we do it
 * only if a heuristic indicates it's a good time to do it.
2504
 *
2505
 * Caller must hold ProcArrayLock in exclusive mode.
2506 2507
 */
static void
2508
KnownAssignedXidsCompress(bool force)
2509
{
2510 2511
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
B
Bruce Momjian 已提交
2512 2513 2514 2515
	int			head,
				tail;
	int			compress_index;
	int			i;
2516

2517 2518 2519 2520 2521
	/* no spinlock required since we hold ProcArrayLock exclusively */
	head = pArray->headKnownAssignedXids;
	tail = pArray->tailKnownAssignedXids;

	if (!force)
2522
	{
2523
		/*
B
Bruce Momjian 已提交
2524 2525
		 * If we can choose how much to compress, use a heuristic to avoid
		 * compressing too often or not often enough.
2526
		 *
B
Bruce Momjian 已提交
2527 2528 2529 2530 2531
		 * Heuristic is if we have a large enough current spread and less than
		 * 50% of the elements are currently in use, then compress. This
		 * should ensure we compress fairly infrequently. We could compress
		 * less often though the virtual array would spread out more and
		 * snapshots would become more expensive.
2532
		 */
B
Bruce Momjian 已提交
2533
		int			nelements = head - tail;
2534

2535 2536 2537 2538
		if (nelements < 4 * PROCARRAY_MAXPROCS ||
			nelements < 2 * pArray->numKnownAssignedXids)
			return;
	}
2539

2540
	/*
B
Bruce Momjian 已提交
2541 2542
	 * We compress the array by reading the valid values from tail to head,
	 * re-aligning data to 0th element.
2543 2544 2545 2546 2547
	 */
	compress_index = 0;
	for (i = tail; i < head; i++)
	{
		if (KnownAssignedXidsValid[i])
2548
		{
2549 2550 2551
			KnownAssignedXids[compress_index] = KnownAssignedXids[i];
			KnownAssignedXidsValid[compress_index] = true;
			compress_index++;
2552
		}
2553
	}
2554

2555 2556 2557
	pArray->tailKnownAssignedXids = 0;
	pArray->headKnownAssignedXids = compress_index;
}
2558

2559 2560 2561 2562 2563 2564 2565 2566
/*
 * Add xids into KnownAssignedXids at the head of the array.
 *
 * xids from from_xid to to_xid, inclusive, are added to the array.
 *
 * If exclusive_lock is true then caller already holds ProcArrayLock in
 * exclusive mode, so we need no extra locking here.  Else caller holds no
 * lock, so we need to be sure we maintain sufficient interlocks against
B
Bruce Momjian 已提交
2567
 * concurrent readers.	(Only the startup process ever calls this, so no need
2568 2569 2570 2571 2572 2573 2574 2575
 * to worry about concurrent writers.)
 */
static void
KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
					 bool exclusive_lock)
{
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
B
Bruce Momjian 已提交
2576 2577 2578
	TransactionId next_xid;
	int			head,
				tail;
2579 2580 2581 2582 2583 2584
	int			nxids;
	int			i;

	Assert(TransactionIdPrecedesOrEquals(from_xid, to_xid));

	/*
B
Bruce Momjian 已提交
2585 2586 2587
	 * Calculate how many array slots we'll need.  Normally this is cheap; in
	 * the unusual case where the XIDs cross the wrap point, we do it the hard
	 * way.
2588 2589 2590 2591 2592 2593 2594 2595
	 */
	if (to_xid >= from_xid)
		nxids = to_xid - from_xid + 1;
	else
	{
		nxids = 1;
		next_xid = from_xid;
		while (TransactionIdPrecedes(next_xid, to_xid))
2596
		{
2597 2598 2599 2600 2601 2602
			nxids++;
			TransactionIdAdvance(next_xid);
		}
	}

	/*
B
Bruce Momjian 已提交
2603 2604
	 * Since only the startup process modifies the head/tail pointers, we
	 * don't need a lock to read them here.
2605 2606 2607 2608 2609 2610 2611 2612
	 */
	head = pArray->headKnownAssignedXids;
	tail = pArray->tailKnownAssignedXids;

	Assert(head >= 0 && head <= pArray->maxKnownAssignedXids);
	Assert(tail >= 0 && tail < pArray->maxKnownAssignedXids);

	/*
B
Bruce Momjian 已提交
2613 2614 2615
	 * Verify that insertions occur in TransactionId sequence.	Note that even
	 * if the last existing element is marked invalid, it must still have a
	 * correctly sequenced XID value.
2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638
	 */
	if (head > tail &&
		TransactionIdFollowsOrEquals(KnownAssignedXids[head - 1], from_xid))
	{
		KnownAssignedXidsDisplay(LOG);
		elog(ERROR, "out-of-order XID insertion in KnownAssignedXids");
	}

	/*
	 * If our xids won't fit in the remaining space, compress out free space
	 */
	if (head + nxids > pArray->maxKnownAssignedXids)
	{
		/* must hold lock to compress */
		if (!exclusive_lock)
			LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

		KnownAssignedXidsCompress(true);

		head = pArray->headKnownAssignedXids;
		/* note: we no longer care about the tail pointer */

		if (!exclusive_lock)
2639
			LWLockRelease(ProcArrayLock);
2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666

		/*
		 * If it still won't fit then we're out of memory
		 */
		if (head + nxids > pArray->maxKnownAssignedXids)
			elog(ERROR, "too many KnownAssignedXids");
	}

	/* Now we can insert the xids into the space starting at head */
	next_xid = from_xid;
	for (i = 0; i < nxids; i++)
	{
		KnownAssignedXids[head] = next_xid;
		KnownAssignedXidsValid[head] = true;
		TransactionIdAdvance(next_xid);
		head++;
	}

	/* Adjust count of number of valid entries */
	pArray->numKnownAssignedXids += nxids;

	/*
	 * Now update the head pointer.  We use a spinlock to protect this
	 * pointer, not because the update is likely to be non-atomic, but to
	 * ensure that other processors see the above array updates before they
	 * see the head pointer change.
	 *
B
Bruce Momjian 已提交
2667 2668
	 * If we're holding ProcArrayLock exclusively, there's no need to take the
	 * spinlock.
2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693
	 */
	if (exclusive_lock)
		pArray->headKnownAssignedXids = head;
	else
	{
		SpinLockAcquire(&pArray->known_assigned_xids_lck);
		pArray->headKnownAssignedXids = head;
		SpinLockRelease(&pArray->known_assigned_xids_lck);
	}
}

/*
 * KnownAssignedXidsSearch
 *
 * Searches KnownAssignedXids for a specific xid and optionally removes it.
 * Returns true if it was found, false if not.
 *
 * Caller must hold ProcArrayLock in shared or exclusive mode.
 * Exclusive lock must be held for remove = true.
 */
static bool
KnownAssignedXidsSearch(TransactionId xid, bool remove)
{
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
B
Bruce Momjian 已提交
2694 2695 2696 2697 2698
	int			first,
				last;
	int			head;
	int			tail;
	int			result_index = -1;
2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715

	if (remove)
	{
		/* we hold ProcArrayLock exclusively, so no need for spinlock */
		tail = pArray->tailKnownAssignedXids;
		head = pArray->headKnownAssignedXids;
	}
	else
	{
		/* take spinlock to ensure we see up-to-date array contents */
		SpinLockAcquire(&pArray->known_assigned_xids_lck);
		tail = pArray->tailKnownAssignedXids;
		head = pArray->headKnownAssignedXids;
		SpinLockRelease(&pArray->known_assigned_xids_lck);
	}

	/*
B
Bruce Momjian 已提交
2716
	 * Standard binary search.	Note we can ignore the KnownAssignedXidsValid
2717 2718 2719 2720 2721 2722
	 * array here, since even invalid entries will contain sorted XIDs.
	 */
	first = tail;
	last = head - 1;
	while (first <= last)
	{
B
Bruce Momjian 已提交
2723 2724
		int			mid_index;
		TransactionId mid_xid;
2725 2726 2727 2728 2729 2730 2731 2732

		mid_index = (first + last) / 2;
		mid_xid = KnownAssignedXids[mid_index];

		if (xid == mid_xid)
		{
			result_index = mid_index;
			break;
2733
		}
2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748
		else if (TransactionIdPrecedes(xid, mid_xid))
			last = mid_index - 1;
		else
			first = mid_index + 1;
	}

	if (result_index < 0)
		return false;			/* not in array */

	if (!KnownAssignedXidsValid[result_index])
		return false;			/* in array, but invalid */

	if (remove)
	{
		KnownAssignedXidsValid[result_index] = false;
2749

2750 2751 2752 2753 2754 2755 2756 2757
		pArray->numKnownAssignedXids--;
		Assert(pArray->numKnownAssignedXids >= 0);

		/*
		 * If we're removing the tail element then advance tail pointer over
		 * any invalid elements.  This will speed future searches.
		 */
		if (result_index == tail)
2758
		{
2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771
			tail++;
			while (tail < head && !KnownAssignedXidsValid[tail])
				tail++;
			if (tail >= head)
			{
				/* Array is empty, so we can reset both pointers */
				pArray->headKnownAssignedXids = 0;
				pArray->tailKnownAssignedXids = 0;
			}
			else
			{
				pArray->tailKnownAssignedXids = tail;
			}
2772 2773
		}
	}
2774 2775

	return true;
2776 2777 2778
}

/*
2779
 * Is the specified XID present in KnownAssignedXids[]?
2780
 *
2781
 * Caller must hold ProcArrayLock in shared or exclusive mode.
2782 2783
 */
static bool
2784
KnownAssignedXidExists(TransactionId xid)
2785
{
2786
	Assert(TransactionIdIsValid(xid));
B
Bruce Momjian 已提交
2787

2788
	return KnownAssignedXidsSearch(xid, false);
2789 2790 2791
}

/*
2792
 * Remove the specified XID from KnownAssignedXids[].
2793
 *
2794
 * Caller must hold ProcArrayLock in exclusive mode.
2795 2796 2797 2798 2799 2800 2801 2802 2803
 */
static void
KnownAssignedXidsRemove(TransactionId xid)
{
	Assert(TransactionIdIsValid(xid));

	elog(trace_recovery(DEBUG4), "remove KnownAssignedXid %u", xid);

	/*
2804 2805
	 * Note: we cannot consider it an error to remove an XID that's not
	 * present.  We intentionally remove subxact IDs while processing
B
Bruce Momjian 已提交
2806 2807
	 * XLOG_XACT_ASSIGNMENT, to avoid array overflow.  Then those XIDs will be
	 * removed again when the top-level xact commits or aborts.
2808
	 *
B
Bruce Momjian 已提交
2809 2810 2811
	 * It might be possible to track such XIDs to distinguish this case from
	 * actual errors, but it would be complicated and probably not worth it.
	 * So, just ignore the search result.
2812
	 */
2813
	(void) KnownAssignedXidsSearch(xid, true);
2814 2815 2816
}

/*
2817 2818
 * KnownAssignedXidsRemoveTree
 *		Remove xid (if it's not InvalidTransactionId) and all the subxids.
2819
 *
2820
 * Caller must hold ProcArrayLock in exclusive mode.
2821
 */
2822 2823 2824
static void
KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
							TransactionId *subxids)
2825
{
B
Bruce Momjian 已提交
2826
	int			i;
2827

2828 2829 2830 2831 2832 2833 2834 2835
	if (TransactionIdIsValid(xid))
		KnownAssignedXidsRemove(xid);

	for (i = 0; i < nsubxids; i++)
		KnownAssignedXidsRemove(subxids[i]);

	/* Opportunistically compress the array */
	KnownAssignedXidsCompress(false);
2836 2837 2838
}

/*
2839 2840
 * Prune KnownAssignedXids up to, but *not* including xid. If xid is invalid
 * then clear the whole table.
2841
 *
2842
 * Caller must hold ProcArrayLock in exclusive mode.
2843
 */
2844 2845
static void
KnownAssignedXidsRemovePreceding(TransactionId removeXid)
2846
{
2847 2848
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
B
Bruce Momjian 已提交
2849 2850 2851 2852
	int			count = 0;
	int			head,
				tail,
				i;
2853

2854
	if (!TransactionIdIsValid(removeXid))
2855
	{
2856 2857 2858 2859 2860
		elog(trace_recovery(DEBUG4), "removing all KnownAssignedXids");
		pArray->numKnownAssignedXids = 0;
		pArray->headKnownAssignedXids = pArray->tailKnownAssignedXids = 0;
		return;
	}
2861

2862
	elog(trace_recovery(DEBUG4), "prune KnownAssignedXids to %u", removeXid);
2863

2864
	/*
B
Bruce Momjian 已提交
2865 2866
	 * Mark entries invalid starting at the tail.  Since array is sorted, we
	 * can stop as soon as we reach a entry >= removeXid.
2867 2868 2869 2870 2871 2872 2873 2874
	 */
	tail = pArray->tailKnownAssignedXids;
	head = pArray->headKnownAssignedXids;

	for (i = tail; i < head; i++)
	{
		if (KnownAssignedXidsValid[i])
		{
B
Bruce Momjian 已提交
2875
			TransactionId knownXid = KnownAssignedXids[i];
2876 2877 2878 2879 2880 2881 2882 2883 2884 2885

			if (TransactionIdFollowsOrEquals(knownXid, removeXid))
				break;

			if (!StandbyTransactionIdIsPrepared(knownXid))
			{
				KnownAssignedXidsValid[i] = false;
				count++;
			}
		}
2886 2887
	}

2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911
	pArray->numKnownAssignedXids -= count;
	Assert(pArray->numKnownAssignedXids >= 0);

	/*
	 * Advance the tail pointer if we've marked the tail item invalid.
	 */
	for (i = tail; i < head; i++)
	{
		if (KnownAssignedXidsValid[i])
			break;
	}
	if (i >= head)
	{
		/* Array is empty, so we can reset both pointers */
		pArray->headKnownAssignedXids = 0;
		pArray->tailKnownAssignedXids = 0;
	}
	else
	{
		pArray->tailKnownAssignedXids = i;
	}

	/* Opportunistically compress the array */
	KnownAssignedXidsCompress(false);
2912 2913 2914
}

/*
2915 2916 2917 2918 2919
 * KnownAssignedXidsGet - Get an array of xids by scanning KnownAssignedXids.
 * We filter out anything >= xmax.
 *
 * Returns the number of XIDs stored into xarray[].  Caller is responsible
 * that array is large enough.
2920
 *
2921
 * Caller must hold ProcArrayLock in (at least) shared mode.
2922
 */
2923 2924
static int
KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax)
2925
{
2926
	TransactionId xtmp = InvalidTransactionId;
2927

2928 2929
	return KnownAssignedXidsGetAndSetXmin(xarray, &xtmp, xmax);
}
2930

2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941 2942 2943
/*
 * KnownAssignedXidsGetAndSetXmin - as KnownAssignedXidsGet, plus
 * we reduce *xmin to the lowest xid value seen if not already lower.
 *
 * Caller must hold ProcArrayLock in (at least) shared mode.
 */
static int
KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
							   TransactionId xmax)
{
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
	int			count = 0;
B
Bruce Momjian 已提交
2944 2945
	int			head,
				tail;
2946 2947 2948
	int			i;

	/*
B
Bruce Momjian 已提交
2949 2950 2951 2952 2953
	 * Fetch head just once, since it may change while we loop. We can stop
	 * once we reach the initially seen head, since we are certain that an xid
	 * cannot enter and then leave the array while we hold ProcArrayLock.  We
	 * might miss newly-added xids, but they should be >= xmax so irrelevant
	 * anyway.
2954 2955 2956 2957
	 *
	 * Must take spinlock to ensure we see up-to-date array contents.
	 */
	SpinLockAcquire(&pArray->known_assigned_xids_lck);
2958 2959
	tail = pArray->tailKnownAssignedXids;
	head = pArray->headKnownAssignedXids;
2960 2961 2962 2963 2964 2965
	SpinLockRelease(&pArray->known_assigned_xids_lck);

	for (i = tail; i < head; i++)
	{
		/* Skip any gaps in the array */
		if (KnownAssignedXidsValid[i])
2966
		{
2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979 2980
			TransactionId knownXid = KnownAssignedXids[i];

			/*
			 * Update xmin if required.  Only the first XID need be checked,
			 * since the array is sorted.
			 */
			if (count == 0 &&
				TransactionIdPrecedes(knownXid, *xmin))
				*xmin = knownXid;

			/*
			 * Filter out anything >= xmax, again relying on sorted property
			 * of array.
			 */
2981 2982
			if (TransactionIdIsValid(xmax) &&
				TransactionIdFollowsOrEquals(knownXid, xmax))
2983 2984 2985 2986
				break;

			/* Add knownXid into output array */
			xarray[count++] = knownXid;
2987 2988
		}
	}
2989 2990

	return count;
2991 2992
}

2993 2994 2995 2996 2997
/*
 * Get oldest XID in the KnownAssignedXids array, or InvalidTransactionId
 * if nothing there.
 */
static TransactionId
2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023
KnownAssignedXidsGetOldestXmin(void)
{
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
	int			head,
				tail;
	int			i;

	/*
	 * Fetch head just once, since it may change while we loop.
	 */
	SpinLockAcquire(&pArray->known_assigned_xids_lck);
	tail = pArray->tailKnownAssignedXids;
	head = pArray->headKnownAssignedXids;
	SpinLockRelease(&pArray->known_assigned_xids_lck);

	for (i = tail; i < head; i++)
	{
		/* Skip any gaps in the array */
		if (KnownAssignedXidsValid[i])
			return KnownAssignedXids[i];
	}

	return InvalidTransactionId;
}

3024 3025 3026
/*
 * Display KnownAssignedXids to provide debug trail
 *
3027 3028 3029 3030 3031 3032
 * Currently this is only called within startup process, so we need no
 * special locking.
 *
 * Note this is pretty expensive, and much of the expense will be incurred
 * even if the elog message will get discarded.  It's not currently called
 * in any performance-critical places, however, so no need to be tenser.
3033
 */
T
Tom Lane 已提交
3034
static void
3035 3036
KnownAssignedXidsDisplay(int trace_level)
{
3037 3038
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
B
Bruce Momjian 已提交
3039 3040 3041 3042 3043
	StringInfoData buf;
	int			head,
				tail,
				i;
	int			nxids = 0;
3044

3045 3046
	tail = pArray->tailKnownAssignedXids;
	head = pArray->headKnownAssignedXids;
3047 3048 3049

	initStringInfo(&buf);

3050 3051 3052 3053 3054
	for (i = tail; i < head; i++)
	{
		if (KnownAssignedXidsValid[i])
		{
			nxids++;
3055
			appendStringInfo(&buf, "[%d]=%u ", i, KnownAssignedXids[i]);
3056 3057
		}
	}
3058

3059
	elog(trace_level, "%d KnownAssignedXids (num=%d tail=%d head=%d) %s",
3060 3061 3062 3063 3064
		 nxids,
		 pArray->numKnownAssignedXids,
		 pArray->tailKnownAssignedXids,
		 pArray->headKnownAssignedXids,
		 buf.data);
3065 3066 3067

	pfree(buf.data);
}