procarray.c 111.9 KB
Newer Older
1 2 3 4 5 6
/*-------------------------------------------------------------------------
 *
 * procarray.c
 *	  POSTGRES process array code.
 *
 *
7
 * This module maintains arrays of the PGPROC and PGXACT structures for all
8 9 10 11
 * active backends.  Although there are several uses for this, the principal
 * one is as a means of determining the set of currently running transactions.
 *
 * Because of various subtle race conditions it is critical that a backend
12
 * hold the correct locks while setting or clearing its MyPgXact->xid field.
13
 * See notes in src/backend/access/transam/README.
14
 *
15 16 17 18
 * The process arrays now also include structures representing prepared
 * transactions.  The xid and subxids fields of these are valid, as are the
 * myProcLocks lists.  They can be distinguished from regular backend PGPROCs
 * at need by checking for pid == 0.
B
Bruce Momjian 已提交
19
 *
20 21
 * During hot standby, we also keep a list of XIDs representing transactions
 * that are known to be running in the master (or more precisely, were running
B
Bruce Momjian 已提交
22
 * as of the current point in the WAL stream).  This list is kept in the
23 24 25
 * KnownAssignedXids array, and is updated by watching the sequence of
 * arriving XIDs.  This is necessary because if we leave those XIDs out of
 * snapshots taken for standby queries, then they will appear to be already
B
Bruce Momjian 已提交
26
 * complete, leading to MVCC failures.  Note that in hot standby, the PGPROC
27 28 29 30 31 32 33
 * array represents standby processes, which by definition are not running
 * transactions that have XIDs.
 *
 * It is perhaps possible for a backend on the master to terminate without
 * writing an abort record for its transaction.  While that shouldn't really
 * happen, it would tie up KnownAssignedXids indefinitely, so we protect
 * ourselves by pruning the array when a valid list of running XIDs arrives.
34
 *
B
Bruce Momjian 已提交
35
 * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
36 37 38 39
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
40
 *	  src/backend/storage/ipc/procarray.c
41 42 43 44 45
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

46 47
#include <signal.h>

48
#include "access/clog.h"
49
#include "access/subtrans.h"
50
#include "access/transam.h"
51
#include "access/twophase.h"
52 53
#include "access/xact.h"
#include "access/xlog.h"
R
Robert Haas 已提交
54
#include "catalog/catalog.h"
55
#include "miscadmin.h"
56
#include "storage/proc.h"
57
#include "storage/procarray.h"
T
Tom Lane 已提交
58
#include "storage/spin.h"
59
#include "utils/builtins.h"
R
Robert Haas 已提交
60
#include "utils/rel.h"
61
#include "utils/snapmgr.h"
62 63 64 65 66 67 68 69


/* Our shared memory area */
typedef struct ProcArrayStruct
{
	int			numProcs;		/* number of valid procs entries */
	int			maxProcs;		/* allocated size of procs array */

70 71 72 73 74 75 76
	/*
	 * Known assigned XIDs handling
	 */
	int			maxKnownAssignedXids;	/* allocated size of array */
	int			numKnownAssignedXids;	/* currrent # of valid entries */
	int			tailKnownAssignedXids;	/* index of oldest valid element */
	int			headKnownAssignedXids;	/* index of newest element, + 1 */
B
Bruce Momjian 已提交
77
	slock_t		known_assigned_xids_lck;		/* protects head/tail pointers */
B
Bruce Momjian 已提交
78

79
	/*
80 81
	 * Highest subxid that has been removed from KnownAssignedXids array to
	 * prevent overflow; or InvalidTransactionId if none.  We track this for
82
	 * similar reasons to tracking overflowing cached subxids in PGXACT
83 84
	 * entries.  Must hold exclusive ProcArrayLock to change this, and shared
	 * lock to read it.
85
	 */
B
Bruce Momjian 已提交
86
	TransactionId lastOverflowedXid;
87

R
Robert Haas 已提交
88 89
	/* oldest xmin of any replication slot */
	TransactionId replication_slot_xmin;
R
Robert Haas 已提交
90 91
	/* oldest catalog xmin of any replication slot */
	TransactionId replication_slot_catalog_xmin;
R
Robert Haas 已提交
92

93 94
	/* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */
	int			pgprocnos[FLEXIBLE_ARRAY_MEMBER];
95 96 97 98
} ProcArrayStruct;

static ProcArrayStruct *procArray;

99 100 101
static PGPROC *allProcs;
static PGXACT *allPgXact;

102 103 104
/*
 * Bookkeeping for tracking emulated transactions in recovery
 */
105 106
static TransactionId *KnownAssignedXids;
static bool *KnownAssignedXidsValid;
B
Bruce Momjian 已提交
107
static TransactionId latestObservedXid = InvalidTransactionId;
108 109 110 111 112 113 114 115

/*
 * If we're in STANDBY_SNAPSHOT_PENDING state, standbySnapshotPendingXmin is
 * the highest xid that might still be running that we don't have in
 * KnownAssignedXids.
 */
static TransactionId standbySnapshotPendingXmin;

116 117 118 119
#ifdef XIDCACHE_DEBUG

/* counters for XidCache measurement */
static long xc_by_recent_xmin = 0;
120
static long xc_by_known_xact = 0;
121
static long xc_by_my_xact = 0;
122
static long xc_by_latest_xid = 0;
123 124
static long xc_by_main_xid = 0;
static long xc_by_child_xid = 0;
125
static long xc_by_known_assigned = 0;
126
static long xc_no_overflow = 0;
127 128 129
static long xc_slow_answer = 0;

#define xc_by_recent_xmin_inc()		(xc_by_recent_xmin++)
130
#define xc_by_known_xact_inc()		(xc_by_known_xact++)
131
#define xc_by_my_xact_inc()			(xc_by_my_xact++)
132
#define xc_by_latest_xid_inc()		(xc_by_latest_xid++)
133 134
#define xc_by_main_xid_inc()		(xc_by_main_xid++)
#define xc_by_child_xid_inc()		(xc_by_child_xid++)
135
#define xc_by_known_assigned_inc()	(xc_by_known_assigned++)
136
#define xc_no_overflow_inc()		(xc_no_overflow++)
137 138 139 140 141 142
#define xc_slow_answer_inc()		(xc_slow_answer++)

static void DisplayXidCache(void);
#else							/* !XIDCACHE_DEBUG */

#define xc_by_recent_xmin_inc()		((void) 0)
143
#define xc_by_known_xact_inc()		((void) 0)
144
#define xc_by_my_xact_inc()			((void) 0)
145
#define xc_by_latest_xid_inc()		((void) 0)
146 147
#define xc_by_main_xid_inc()		((void) 0)
#define xc_by_child_xid_inc()		((void) 0)
148
#define xc_by_known_assigned_inc()	((void) 0)
149
#define xc_no_overflow_inc()		((void) 0)
150 151 152
#define xc_slow_answer_inc()		((void) 0)
#endif   /* XIDCACHE_DEBUG */

153
/* Primitives for KnownAssignedXids array handling for standby */
154 155
static void KnownAssignedXidsCompress(bool force);
static void KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
B
Bruce Momjian 已提交
156
					 bool exclusive_lock);
157 158
static bool KnownAssignedXidsSearch(TransactionId xid, bool remove);
static bool KnownAssignedXidExists(TransactionId xid);
159
static void KnownAssignedXidsRemove(TransactionId xid);
160
static void KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
B
Bruce Momjian 已提交
161
							TransactionId *subxids);
162
static void KnownAssignedXidsRemovePreceding(TransactionId xid);
B
Bruce Momjian 已提交
163
static int	KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax);
164
static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray,
B
Bruce Momjian 已提交
165 166
							   TransactionId *xmin,
							   TransactionId xmax);
167
static TransactionId KnownAssignedXidsGetOldestXmin(void);
168
static void KnownAssignedXidsDisplay(int trace_level);
169
static void KnownAssignedXidsReset(void);
170 171 172 173

/*
 * Report shared-memory space needed by CreateSharedProcArray.
 */
174
Size
175
ProcArrayShmemSize(void)
176
{
177 178
	Size		size;

179 180
	/* Size of the ProcArray structure itself */
#define PROCARRAY_MAXPROCS	(MaxBackends + max_prepared_xacts)
181

182 183
	size = offsetof(ProcArrayStruct, pgprocnos);
	size = add_size(size, mul_size(sizeof(int), PROCARRAY_MAXPROCS));
184 185

	/*
186
	 * During Hot Standby processing we have a data structure called
187 188 189 190 191 192
	 * KnownAssignedXids, created in shared memory. Local data structures are
	 * also created in various backends during GetSnapshotData(),
	 * TransactionIdIsInProgress() and GetRunningTransactionData(). All of the
	 * main structures created in those functions must be identically sized,
	 * since we may at times copy the whole of the data structures around. We
	 * refer to this size as TOTAL_MAX_CACHED_SUBXIDS.
193
	 *
B
Bruce Momjian 已提交
194 195 196
	 * Ideally we'd only create this structure if we were actually doing hot
	 * standby in the current run, but we don't know that yet at the time
	 * shared memory is being set up.
197
	 */
198 199 200
#define TOTAL_MAX_CACHED_SUBXIDS \
	((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)

201
	if (EnableHotStandby)
202 203 204 205
	{
		size = add_size(size,
						mul_size(sizeof(TransactionId),
								 TOTAL_MAX_CACHED_SUBXIDS));
206
		size = add_size(size,
207 208
						mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS));
	}
209 210

	return size;
211 212 213 214 215 216
}

/*
 * Initialize the shared PGPROC array during postmaster startup.
 */
void
217
CreateSharedProcArray(void)
218 219 220 221 222
{
	bool		found;

	/* Create or attach to the ProcArray shared structure */
	procArray = (ProcArrayStruct *)
223
		ShmemInitStruct("Proc Array",
224 225
						add_size(offsetof(ProcArrayStruct, pgprocnos),
								 mul_size(sizeof(int),
226
										  PROCARRAY_MAXPROCS)),
227
						&found);
228 229 230 231 232 233 234

	if (!found)
	{
		/*
		 * We're the first - initialize.
		 */
		procArray->numProcs = 0;
235
		procArray->maxProcs = PROCARRAY_MAXPROCS;
R
Robert Haas 已提交
236
		procArray->replication_slot_xmin = InvalidTransactionId;
237
		procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS;
238 239 240 241
		procArray->numKnownAssignedXids = 0;
		procArray->tailKnownAssignedXids = 0;
		procArray->headKnownAssignedXids = 0;
		SpinLockInit(&procArray->known_assigned_xids_lck);
242 243
		procArray->lastOverflowedXid = InvalidTransactionId;
	}
244

245 246 247
	allProcs = ProcGlobal->allProcs;
	allPgXact = ProcGlobal->allPgXact;

248
	/* Create or attach to the KnownAssignedXids arrays too, if needed */
249
	if (EnableHotStandby)
250
	{
251 252 253 254 255 256 257 258 259
		KnownAssignedXids = (TransactionId *)
			ShmemInitStruct("KnownAssignedXids",
							mul_size(sizeof(TransactionId),
									 TOTAL_MAX_CACHED_SUBXIDS),
							&found);
		KnownAssignedXidsValid = (bool *)
			ShmemInitStruct("KnownAssignedXidsValid",
							mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS),
							&found);
260 261 262 263
	}
}

/*
264
 * Add the specified PGPROC to the shared array.
265 266
 */
void
267
ProcArrayAdd(PGPROC *proc)
268 269
{
	ProcArrayStruct *arrayP = procArray;
270
	int			index;
271 272 273 274 275 276

	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

	if (arrayP->numProcs >= arrayP->maxProcs)
	{
		/*
B
Bruce Momjian 已提交
277
		 * Ooops, no room.  (This really shouldn't happen, since there is a
B
Bruce Momjian 已提交
278 279
		 * fixed supply of PGPROC structs too, and so we should have failed
		 * earlier.)
280 281 282 283 284 285 286
		 */
		LWLockRelease(ProcArrayLock);
		ereport(FATAL,
				(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
				 errmsg("sorry, too many clients already")));
	}

287 288 289
	/*
	 * Keep the procs array sorted by (PGPROC *) so that we can utilize
	 * locality of references much better. This is useful while traversing the
R
Robert Haas 已提交
290
	 * ProcArray because there is a increased likelihood of finding the next
291
	 * PGPROC structure in the cache.
292
	 *
R
Robert Haas 已提交
293
	 * Since the occurrence of adding/removing a proc is much lower than the
294 295 296 297 298
	 * access to the ProcArray itself, the overhead should be marginal
	 */
	for (index = 0; index < arrayP->numProcs; index++)
	{
		/*
299 300
		 * If we are the first PGPROC or if we have found our right position
		 * in the array, break
301 302 303 304 305 306
		 */
		if ((arrayP->pgprocnos[index] == -1) || (arrayP->pgprocnos[index] > proc->pgprocno))
			break;
	}

	memmove(&arrayP->pgprocnos[index + 1], &arrayP->pgprocnos[index],
307
			(arrayP->numProcs - index) * sizeof(int));
308
	arrayP->pgprocnos[index] = proc->pgprocno;
309 310 311 312 313 314
	arrayP->numProcs++;

	LWLockRelease(ProcArrayLock);
}

/*
315
 * Remove the specified PGPROC from the shared array.
316 317 318 319 320 321 322
 *
 * When latestXid is a valid XID, we are removing a live 2PC gxact from the
 * array, and thus causing it to appear as "not running" anymore.  In this
 * case we must advance latestCompletedXid.  (This is essentially the same
 * as ProcArrayEndTransaction followed by removal of the PGPROC, but we take
 * the ProcArrayLock only once, and don't damage the content of the PGPROC;
 * twophase.c depends on the latter.)
323 324
 */
void
325
ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
326 327 328 329 330
{
	ProcArrayStruct *arrayP = procArray;
	int			index;

#ifdef XIDCACHE_DEBUG
331 332 333
	/* dump stats at backend shutdown, but not prepared-xact end */
	if (proc->pid != 0)
		DisplayXidCache();
334 335 336 337
#endif

	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

338 339
	if (TransactionIdIsValid(latestXid))
	{
340
		Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
341 342 343 344 345 346 347 348 349

		/* Advance global latestCompletedXid while holding the lock */
		if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
								  latestXid))
			ShmemVariableCache->latestCompletedXid = latestXid;
	}
	else
	{
		/* Shouldn't be trying to remove a live transaction here */
350
		Assert(!TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
351 352
	}

353 354
	for (index = 0; index < arrayP->numProcs; index++)
	{
355
		if (arrayP->pgprocnos[index] == proc->pgprocno)
356
		{
357 358
			/* Keep the PGPROC array sorted. See notes above */
			memmove(&arrayP->pgprocnos[index], &arrayP->pgprocnos[index + 1],
359 360
					(arrayP->numProcs - index - 1) * sizeof(int));
			arrayP->pgprocnos[arrayP->numProcs - 1] = -1;		/* for debugging */
361 362 363 364 365 366 367 368 369
			arrayP->numProcs--;
			LWLockRelease(ProcArrayLock);
			return;
		}
	}

	/* Ooops */
	LWLockRelease(ProcArrayLock);

370
	elog(LOG, "failed to find proc %p in ProcArray", proc);
371 372 373
}


374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389
/*
 * ProcArrayEndTransaction -- mark a transaction as no longer running
 *
 * This is used interchangeably for commit and abort cases.  The transaction
 * commit/abort must already be reported to WAL and pg_clog.
 *
 * proc is currently always MyProc, but we pass it explicitly for flexibility.
 * latestXid is the latest Xid among the transaction's main XID and
 * subtransactions, or InvalidTransactionId if it has no XID.  (We must ask
 * the caller to pass latestXid, instead of computing it from the PGPROC's
 * contents, because the subxid information in the PGPROC might be
 * incomplete.)
 */
void
ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
{
390
	PGXACT	   *pgxact = &allPgXact[proc->pgprocno];
391

392 393 394
	if (TransactionIdIsValid(latestXid))
	{
		/*
395 396 397
		 * We must lock ProcArrayLock while clearing our advertised XID, so
		 * that we do not exit the set of "running" transactions while someone
		 * else is taking a snapshot.  See discussion in
398 399
		 * src/backend/access/transam/README.
		 */
400
		Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
401 402 403

		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

404
		pgxact->xid = InvalidTransactionId;
405
		proc->lxid = InvalidLocalTransactionId;
406
		pgxact->xmin = InvalidTransactionId;
407
		/* must be cleared with xid/xmin: */
408
		pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
B
Bruce Momjian 已提交
409
		pgxact->delayChkpt = false;		/* be sure this is cleared in abort */
410
		proc->recoveryConflictPending = false;
411 412

		/* Clear the subtransaction-XID cache too while holding the lock */
413 414
		pgxact->nxids = 0;
		pgxact->overflowed = false;
415 416 417 418 419 420 421 422 423 424 425

		/* Also advance global latestCompletedXid while holding the lock */
		if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
								  latestXid))
			ShmemVariableCache->latestCompletedXid = latestXid;

		LWLockRelease(ProcArrayLock);
	}
	else
	{
		/*
B
Bruce Momjian 已提交
426 427 428
		 * If we have no XID, we don't need to lock, since we won't affect
		 * anyone else's calculation of a snapshot.  We might change their
		 * estimate of global xmin, but that's OK.
429
		 */
430
		Assert(!TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
431 432

		proc->lxid = InvalidLocalTransactionId;
433
		pgxact->xmin = InvalidTransactionId;
434
		/* must be cleared with xid/xmin: */
435
		pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
B
Bruce Momjian 已提交
436
		pgxact->delayChkpt = false;		/* be sure this is cleared in abort */
437
		proc->recoveryConflictPending = false;
438

439 440
		Assert(pgxact->nxids == 0);
		Assert(pgxact->overflowed == false);
441 442 443 444 445 446 447 448 449 450
	}
}


/*
 * ProcArrayClearTransaction -- clear the transaction fields
 *
 * This is used after successfully preparing a 2-phase transaction.  We are
 * not actually reporting the transaction's XID as no longer running --- it
 * will still appear as running because the 2PC's gxact is in the ProcArray
451
 * too.  We just have to clear out our own PGXACT.
452 453 454 455
 */
void
ProcArrayClearTransaction(PGPROC *proc)
{
456
	PGXACT	   *pgxact = &allPgXact[proc->pgprocno];
457

458 459
	/*
	 * We can skip locking ProcArrayLock here, because this action does not
B
Bruce Momjian 已提交
460 461
	 * actually change anyone's view of the set of running XIDs: our entry is
	 * duplicate with the gxact that has already been inserted into the
462 463
	 * ProcArray.
	 */
464
	pgxact->xid = InvalidTransactionId;
465
	proc->lxid = InvalidLocalTransactionId;
466
	pgxact->xmin = InvalidTransactionId;
467
	proc->recoveryConflictPending = false;
468 469

	/* redundant, but just in case */
470
	pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
471
	pgxact->delayChkpt = false;
472 473

	/* Clear the subtransaction-XID cache too */
474 475
	pgxact->nxids = 0;
	pgxact->overflowed = false;
476 477
}

478 479 480 481
/*
 * ProcArrayInitRecovery -- initialize recovery xid mgmt environment
 *
 * Remember up to where the startup process initialized the CLOG and subtrans
482
 * so we can ensure it's initialized gaplessly up to the point where necessary
483 484 485 486 487 488 489 490 491
 * while in recovery.
 */
void
ProcArrayInitRecovery(TransactionId initializedUptoXID)
{
	Assert(standbyState == STANDBY_INITIALIZED);
	Assert(TransactionIdIsNormal(initializedUptoXID));

	/*
492 493 494 495
	 * we set latestObservedXid to the xid SUBTRANS has been initialized upto,
	 * so we can extend it from that point onwards in
	 * RecordKnownAssignedTransactionIds, and when we get consistent in
	 * ProcArrayApplyRecoveryInfo().
496 497 498 499 500
	 */
	latestObservedXid = initializedUptoXID;
	TransactionIdRetreat(latestObservedXid);
}

501 502 503
/*
 * ProcArrayApplyRecoveryInfo -- apply recovery info about xids
 *
504
 * Takes us through 3 states: Initialized, Pending and Ready.
505 506 507 508
 * Normal case is to go all the way to Ready straight away, though there
 * are atypical cases where we need to take it in steps.
 *
 * Use the data about running transactions on master to create the initial
509
 * state of KnownAssignedXids. We also use these records to regularly prune
510
 * KnownAssignedXids because we know it is possible that some transactions
511
 * with FATAL errors fail to write abort records, which could cause eventual
512 513
 * overflow.
 *
514
 * See comments for LogStandbySnapshot().
515 516 517 518
 */
void
ProcArrayApplyRecoveryInfo(RunningTransactions running)
{
B
Bruce Momjian 已提交
519
	TransactionId *xids;
B
Bruce Momjian 已提交
520
	int			nxids;
521
	TransactionId nextXid;
B
Bruce Momjian 已提交
522
	int			i;
523 524

	Assert(standbyState >= STANDBY_INITIALIZED);
525 526 527
	Assert(TransactionIdIsValid(running->nextXid));
	Assert(TransactionIdIsValid(running->oldestRunningXid));
	Assert(TransactionIdIsNormal(running->latestCompletedXid));
528 529 530 531 532

	/*
	 * Remove stale transactions, if any.
	 */
	ExpireOldKnownAssignedTransactionIds(running->oldestRunningXid);
533 534 535 536 537 538 539

	/*
	 * Remove stale locks, if any.
	 *
	 * Locks are always assigned to the toplevel xid so we don't need to care
	 * about subxcnt/subxids (and by extension not about ->suboverflowed).
	 */
540
	StandbyReleaseOldLocks(running->xcnt, running->xids);
541 542 543 544 545 546 547 548

	/*
	 * If our snapshot is already valid, nothing else to do...
	 */
	if (standbyState == STANDBY_SNAPSHOT_READY)
		return;

	/*
549
	 * If our initial RunningTransactionsData had an overflowed snapshot then
550
	 * we knew we were missing some subxids from our snapshot. If we continue
551 552 553
	 * to see overflowed snapshots then we might never be able to start up, so
	 * we make another test to see if our snapshot is now valid. We know that
	 * the missing subxids are equal to or earlier than nextXid. After we
B
Bruce Momjian 已提交
554 555 556 557
	 * initialise we continue to apply changes during recovery, so once the
	 * oldestRunningXid is later than the nextXid from the initial snapshot we
	 * know that we no longer have missing information and can mark the
	 * snapshot as valid.
558 559 560
	 */
	if (standbyState == STANDBY_SNAPSHOT_PENDING)
	{
561
		/*
562 563
		 * If the snapshot isn't overflowed or if its empty we can reset our
		 * pending state and use this snapshot instead.
564 565
		 */
		if (!running->subxid_overflow || running->xcnt == 0)
566
		{
567 568 569 570 571
			/*
			 * If we have already collected known assigned xids, we need to
			 * throw them away before we apply the recovery snapshot.
			 */
			KnownAssignedXidsReset();
572
			standbyState = STANDBY_INITIALIZED;
573
		}
574
		else
575 576 577 578 579 580 581 582 583 584
		{
			if (TransactionIdPrecedes(standbySnapshotPendingXmin,
									  running->oldestRunningXid))
			{
				standbyState = STANDBY_SNAPSHOT_READY;
				elog(trace_recovery(DEBUG1),
					 "recovery snapshots are now enabled");
			}
			else
				elog(trace_recovery(DEBUG1),
585 586
				  "recovery snapshot waiting for non-overflowed snapshot or "
				"until oldest active xid on standby is at least %u (now %u)",
587 588 589 590
					 standbySnapshotPendingXmin,
					 running->oldestRunningXid);
			return;
		}
591 592
	}

593 594
	Assert(standbyState == STANDBY_INITIALIZED);

595
	/*
596 597 598 599
	 * OK, we need to initialise from the RunningTransactionsData record.
	 *
	 * NB: this can be reached at least twice, so make sure new code can deal
	 * with that.
600 601
	 */

602 603 604 605
	/*
	 * Nobody else is running yet, but take locks anyhow
	 */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
606 607

	/*
608 609
	 * KnownAssignedXids is sorted so we cannot just add the xids, we have to
	 * sort them first.
610
	 *
B
Bruce Momjian 已提交
611 612 613 614 615 616
	 * Some of the new xids are top-level xids and some are subtransactions.
	 * We don't call SubtransSetParent because it doesn't matter yet. If we
	 * aren't overflowed then all xids will fit in snapshot and so we don't
	 * need subtrans. If we later overflow, an xid assignment record will add
	 * xids to subtrans. If RunningXacts is overflowed then we don't have
	 * enough information to correctly update subtrans anyway.
617 618 619
	 */

	/*
620 621
	 * Allocate a temporary array to avoid modifying the array passed as
	 * argument.
622
	 */
623
	xids = palloc(sizeof(TransactionId) * (running->xcnt + running->subxcnt));
624 625

	/*
626
	 * Add to the temp array any xids which have not already completed.
627
	 */
628
	nxids = 0;
629
	for (i = 0; i < running->xcnt + running->subxcnt; i++)
630
	{
631
		TransactionId xid = running->xids[i];
632 633

		/*
634 635 636 637
		 * The running-xacts snapshot can contain xids that were still visible
		 * in the procarray when the snapshot was taken, but were already
		 * WAL-logged as completed. They're not running anymore, so ignore
		 * them.
638 639 640 641
		 */
		if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
			continue;

642
		xids[nxids++] = xid;
643 644
	}

645 646
	if (nxids > 0)
	{
647 648 649 650 651 652
		if (procArray->numKnownAssignedXids != 0)
		{
			LWLockRelease(ProcArrayLock);
			elog(ERROR, "KnownAssignedXids is not empty");
		}

653
		/*
B
Bruce Momjian 已提交
654 655
		 * Sort the array so that we can add them safely into
		 * KnownAssignedXids.
656 657 658 659 660 661 662
		 */
		qsort(xids, nxids, sizeof(TransactionId), xidComparator);

		/*
		 * Add the sorted snapshot into KnownAssignedXids
		 */
		for (i = 0; i < nxids; i++)
663
			KnownAssignedXidsAdd(xids[i], xids[i], true);
664 665 666 667 668

		KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
	}

	pfree(xids);
669 670

	/*
671
	 * latestObservedXid is at least set to the point where SUBTRANS was
672 673 674 675 676 677 678
	 * started up to (c.f. ProcArrayInitRecovery()) or to the biggest xid
	 * RecordKnownAssignedTransactionIds() was called for.  Initialize
	 * subtrans from thereon, up to nextXid - 1.
	 *
	 * We need to duplicate parts of RecordKnownAssignedTransactionId() here,
	 * because we've just added xids to the known assigned xids machinery that
	 * haven't gone through RecordKnownAssignedTransactionId().
679 680
	 */
	Assert(TransactionIdIsNormal(latestObservedXid));
681
	TransactionIdAdvance(latestObservedXid);
682 683 684 685 686
	while (TransactionIdPrecedes(latestObservedXid, running->nextXid))
	{
		ExtendSUBTRANS(latestObservedXid);
		TransactionIdAdvance(latestObservedXid);
	}
B
Bruce Momjian 已提交
687
	TransactionIdRetreat(latestObservedXid);	/* = running->nextXid - 1 */
688 689

	/* ----------
690 691
	 * Now we've got the running xids we need to set the global values that
	 * are used to track snapshots as they evolve further.
692
	 *
693 694 695
	 * - latestCompletedXid which will be the xmax for snapshots
	 * - lastOverflowedXid which shows whether snapshots overflow
	 * - nextXid
696 697 698
	 *
	 * If the snapshot overflowed, then we still initialise with what we know,
	 * but the recovery snapshot isn't fully valid yet because we know there
B
Bruce Momjian 已提交
699 700
	 * are some subxids missing. We don't know the specific subxids that are
	 * missing, so conservatively assume the last one is latestObservedXid.
701
	 * ----------
702
	 */
703 704
	if (running->subxid_overflow)
	{
705 706 707
		standbyState = STANDBY_SNAPSHOT_PENDING;

		standbySnapshotPendingXmin = latestObservedXid;
708
		procArray->lastOverflowedXid = latestObservedXid;
709 710 711 712 713 714 715 716 717
	}
	else
	{
		standbyState = STANDBY_SNAPSHOT_READY;

		standbySnapshotPendingXmin = InvalidTransactionId;
	}

	/*
718
	 * If a transaction wrote a commit record in the gap between taking and
719 720
	 * logging the snapshot then latestCompletedXid may already be higher than
	 * the value from the snapshot, so check before we use the incoming value.
721 722 723 724
	 */
	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
							  running->latestCompletedXid))
		ShmemVariableCache->latestCompletedXid = running->latestCompletedXid;
725

726 727 728 729 730 731 732 733
	Assert(TransactionIdIsNormal(ShmemVariableCache->latestCompletedXid));

	LWLockRelease(ProcArrayLock);

	/*
	 * ShmemVariableCache->nextXid must be beyond any observed xid.
	 *
	 * We don't expect anyone else to modify nextXid, hence we don't need to
B
Bruce Momjian 已提交
734
	 * hold a lock while examining it.  We still acquire the lock to modify
735 736
	 * it, though.
	 */
737 738 739
	nextXid = latestObservedXid;
	TransactionIdAdvance(nextXid);
	if (TransactionIdFollows(nextXid, ShmemVariableCache->nextXid))
740 741
	{
		LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
742
		ShmemVariableCache->nextXid = nextXid;
743 744
		LWLockRelease(XidGenLock);
	}
745

746 747
	Assert(TransactionIdIsValid(ShmemVariableCache->nextXid));

748
	KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
749
	if (standbyState == STANDBY_SNAPSHOT_READY)
750
		elog(trace_recovery(DEBUG1), "recovery snapshots are now enabled");
751
	else
752 753 754 755 756
		elog(trace_recovery(DEBUG1),
			 "recovery snapshot waiting for non-overflowed snapshot or "
			 "until oldest active xid on standby is at least %u (now %u)",
			 standbySnapshotPendingXmin,
			 running->oldestRunningXid);
757 758
}

759 760 761 762
/*
 * ProcArrayApplyXidAssignment
 *		Process an XLOG_XACT_ASSIGNMENT WAL record
 */
763 764 765 766 767
void
ProcArrayApplyXidAssignment(TransactionId topxid,
							int nsubxids, TransactionId *subxids)
{
	TransactionId max_xid;
B
Bruce Momjian 已提交
768
	int			i;
769

770
	Assert(standbyState >= STANDBY_INITIALIZED);
771 772 773 774 775 776 777 778 779 780 781 782 783 784

	max_xid = TransactionIdLatest(topxid, nsubxids, subxids);

	/*
	 * Mark all the subtransactions as observed.
	 *
	 * NOTE: This will fail if the subxid contains too many previously
	 * unobserved xids to fit into known-assigned-xids. That shouldn't happen
	 * as the code stands, because xid-assignment records should never contain
	 * more than PGPROC_MAX_CACHED_SUBXIDS entries.
	 */
	RecordKnownAssignedTransactionIds(max_xid);

	/*
B
Bruce Momjian 已提交
785 786 787 788 789 790 791 792 793
	 * Notice that we update pg_subtrans with the top-level xid, rather than
	 * the parent xid. This is a difference between normal processing and
	 * recovery, yet is still correct in all cases. The reason is that
	 * subtransaction commit is not marked in clog until commit processing, so
	 * all aborted subtransactions have already been clearly marked in clog.
	 * As a result we are able to refer directly to the top-level
	 * transaction's state rather than skipping through all the intermediate
	 * states in the subtransaction tree. This should be the first time we
	 * have attempted to SubTransSetParent().
794 795 796 797
	 */
	for (i = 0; i < nsubxids; i++)
		SubTransSetParent(subxids[i], topxid, false);

798 799 800 801
	/* KnownAssignedXids isn't maintained yet, so we're done for now */
	if (standbyState == STANDBY_INITIALIZED)
		return;

802 803 804 805 806 807
	/*
	 * Uses same locking as transaction commit
	 */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

	/*
808
	 * Remove subxids from known-assigned-xacts.
809
	 */
810
	KnownAssignedXidsRemoveTree(InvalidTransactionId, nsubxids, subxids);
811 812

	/*
813
	 * Advance lastOverflowedXid to be at least the last of these subxids.
814 815 816 817 818 819
	 */
	if (TransactionIdPrecedes(procArray->lastOverflowedXid, max_xid))
		procArray->lastOverflowedXid = max_xid;

	LWLockRelease(ProcArrayLock);
}
820

821 822 823
/*
 * TransactionIdIsInProgress -- is given transaction running in some backend
 *
824
 * Aside from some shortcuts such as checking RecentXmin and our own Xid,
825
 * there are four possibilities for finding a running transaction:
826
 *
827
 * 1. The given Xid is a main transaction Id.  We will find this out cheaply
828
 * by looking at the PGXACT struct for each backend.
829
 *
830
 * 2. The given Xid is one of the cached subxact Xids in the PGPROC array.
831 832
 * We can find this out cheaply too.
 *
833 834 835
 * 3. In Hot Standby mode, we must search the KnownAssignedXids list to see
 * if the Xid is running on the master.
 *
836 837 838
 * 4. Search the SubTrans tree to find the Xid's topmost parent, and then see
 * if that is running according to PGXACT or KnownAssignedXids.  This is the
 * slowest way, but sadly it has to be done always if the others failed,
839 840
 * unless we see that the cached subxact sets are complete (none have
 * overflowed).
841
 *
842 843 844
 * ProcArrayLock has to be held while we do 1, 2, 3.  If we save the top Xids
 * while doing 1 and 3, we can release the ProcArrayLock while we do 4.
 * This buys back some concurrency (and we can't retrieve the main Xids from
845
 * PGXACT again anyway; see GetNewTransactionId).
846 847 848 849
 */
bool
TransactionIdIsInProgress(TransactionId xid)
{
850 851
	static TransactionId *xids = NULL;
	int			nxids = 0;
852
	ProcArrayStruct *arrayP = procArray;
853
	TransactionId topxid;
854 855 856 857
	int			i,
				j;

	/*
B
Bruce Momjian 已提交
858
	 * Don't bother checking a transaction older than RecentXmin; it could not
B
Bruce Momjian 已提交
859 860 861
	 * possibly still be running.  (Note: in particular, this guarantees that
	 * we reject InvalidTransactionId, FrozenTransactionId, etc as not
	 * running.)
862 863 864 865 866 867 868
	 */
	if (TransactionIdPrecedes(xid, RecentXmin))
	{
		xc_by_recent_xmin_inc();
		return false;
	}

869 870 871 872 873 874 875 876 877 878 879
	/*
	 * We may have just checked the status of this transaction, so if it is
	 * already known to be completed, we can fall out without any access to
	 * shared memory.
	 */
	if (TransactionIdIsKnownCompleted(xid))
	{
		xc_by_known_xact_inc();
		return false;
	}

880 881 882 883 884 885 886 887 888 889 890
	/*
	 * Also, we can handle our own transaction (and subtransactions) without
	 * any access to shared memory.
	 */
	if (TransactionIdIsCurrentTransactionId(xid))
	{
		xc_by_my_xact_inc();
		return true;
	}

	/*
891
	 * If first time through, get workspace to remember main XIDs in. We
B
Bruce Momjian 已提交
892
	 * malloc it permanently to avoid repeated palloc/pfree overhead.
893 894 895
	 */
	if (xids == NULL)
	{
896
		/*
B
Bruce Momjian 已提交
897 898 899
		 * In hot standby mode, reserve enough space to hold all xids in the
		 * known-assigned list. If we later finish recovery, we no longer need
		 * the bigger array, but we don't bother to shrink it.
900
		 */
901
		int			maxxids = RecoveryInProgress() ? TOTAL_MAX_CACHED_SUBXIDS : arrayP->maxProcs;
902 903

		xids = (TransactionId *) malloc(maxxids * sizeof(TransactionId));
904 905 906 907 908
		if (xids == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
	}
909 910 911

	LWLockAcquire(ProcArrayLock, LW_SHARED);

912 913 914 915 916 917 918 919 920 921 922 923
	/*
	 * Now that we have the lock, we can check latestCompletedXid; if the
	 * target Xid is after that, it's surely still running.
	 */
	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, xid))
	{
		LWLockRelease(ProcArrayLock);
		xc_by_latest_xid_inc();
		return true;
	}

	/* No shortcuts, gotta grovel through the array */
924 925
	for (i = 0; i < arrayP->numProcs; i++)
	{
926 927 928 929
		int			pgprocno = arrayP->pgprocnos[i];
		volatile PGPROC *proc = &allProcs[pgprocno];
		volatile PGXACT *pgxact = &allPgXact[pgprocno];
		TransactionId pxid;
930 931 932 933

		/* Ignore my own proc --- dealt with it above */
		if (proc == MyProc)
			continue;
934 935

		/* Fetch xid just once - see GetNewTransactionId */
936
		pxid = pgxact->xid;
937 938 939 940 941 942 943 944 945

		if (!TransactionIdIsValid(pxid))
			continue;

		/*
		 * Step 1: check the main Xid
		 */
		if (TransactionIdEquals(pxid, xid))
		{
946
			LWLockRelease(ProcArrayLock);
947
			xc_by_main_xid_inc();
948
			return true;
949 950 951
		}

		/*
B
Bruce Momjian 已提交
952 953
		 * We can ignore main Xids that are younger than the target Xid, since
		 * the target could not possibly be their child.
954 955 956 957 958 959 960
		 */
		if (TransactionIdPrecedes(xid, pxid))
			continue;

		/*
		 * Step 2: check the cached child-Xids arrays
		 */
961
		for (j = pgxact->nxids - 1; j >= 0; j--)
962 963 964 965 966 967
		{
			/* Fetch xid just once - see GetNewTransactionId */
			TransactionId cxid = proc->subxids.xids[j];

			if (TransactionIdEquals(cxid, xid))
			{
968
				LWLockRelease(ProcArrayLock);
969
				xc_by_child_xid_inc();
970
				return true;
971 972 973 974
			}
		}

		/*
975
		 * Save the main Xid for step 4.  We only need to remember main Xids
B
Bruce Momjian 已提交
976 977 978 979
		 * that have uncached children.  (Note: there is no race condition
		 * here because the overflowed flag cannot be cleared, only set, while
		 * we hold ProcArrayLock.  So we can't miss an Xid that we need to
		 * worry about.)
980
		 */
981
		if (pgxact->overflowed)
982 983 984
			xids[nxids++] = pxid;
	}

985 986 987 988
	/*
	 * Step 3: in hot standby mode, check the known-assigned-xids list.  XIDs
	 * in the list must be treated as running.
	 */
989 990
	if (RecoveryInProgress())
	{
991
		/* none of the PGXACT entries should have XIDs in hot standby mode */
992 993
		Assert(nxids == 0);

994
		if (KnownAssignedXidExists(xid))
995 996
		{
			LWLockRelease(ProcArrayLock);
997
			xc_by_known_assigned_inc();
998 999 1000 1001
			return true;
		}

		/*
B
Bruce Momjian 已提交
1002
		 * If the KnownAssignedXids overflowed, we have to check pg_subtrans
B
Bruce Momjian 已提交
1003 1004 1005 1006
		 * too.  Fetch all xids from KnownAssignedXids that are lower than
		 * xid, since if xid is a subtransaction its parent will always have a
		 * lower value.  Note we will collect both main and subXIDs here, but
		 * there's no help for it.
1007 1008 1009 1010 1011
		 */
		if (TransactionIdPrecedesOrEquals(xid, procArray->lastOverflowedXid))
			nxids = KnownAssignedXidsGet(xids, xid);
	}

1012 1013 1014 1015
	LWLockRelease(ProcArrayLock);

	/*
	 * If none of the relevant caches overflowed, we know the Xid is not
1016
	 * running without even looking at pg_subtrans.
1017 1018
	 */
	if (nxids == 0)
1019 1020
	{
		xc_no_overflow_inc();
1021
		return false;
1022
	}
1023 1024

	/*
1025
	 * Step 4: have to check pg_subtrans.
1026
	 *
1027 1028 1029 1030
	 * At this point, we know it's either a subtransaction of one of the Xids
	 * in xids[], or it's not running.  If it's an already-failed
	 * subtransaction, we want to say "not running" even though its parent may
	 * still be running.  So first, check pg_clog to see if it's been aborted.
1031 1032 1033 1034
	 */
	xc_slow_answer_inc();

	if (TransactionIdDidAbort(xid))
1035
		return false;
1036 1037

	/*
B
Bruce Momjian 已提交
1038
	 * It isn't aborted, so check whether the transaction tree it belongs to
B
Bruce Momjian 已提交
1039 1040
	 * is still running (or, more precisely, whether it was running when we
	 * held ProcArrayLock).
1041 1042 1043 1044 1045 1046 1047 1048
	 */
	topxid = SubTransGetTopmostTransaction(xid);
	Assert(TransactionIdIsValid(topxid));
	if (!TransactionIdEquals(topxid, xid))
	{
		for (i = 0; i < nxids; i++)
		{
			if (TransactionIdEquals(xids[i], topxid))
1049
				return true;
1050 1051 1052
		}
	}

1053
	return false;
1054 1055
}

1056 1057 1058 1059
/*
 * TransactionIdIsActive -- is xid the top-level XID of an active backend?
 *
 * This differs from TransactionIdIsInProgress in that it ignores prepared
1060 1061
 * transactions, as well as transactions running on the master if we're in
 * hot standby.  Also, we ignore subtransactions since that's not needed
1062 1063 1064 1065 1066 1067 1068 1069 1070 1071
 * for current uses.
 */
bool
TransactionIdIsActive(TransactionId xid)
{
	bool		result = false;
	ProcArrayStruct *arrayP = procArray;
	int			i;

	/*
B
Bruce Momjian 已提交
1072 1073
	 * Don't bother checking a transaction older than RecentXmin; it could not
	 * possibly still be running.
1074 1075 1076 1077 1078 1079 1080 1081
	 */
	if (TransactionIdPrecedes(xid, RecentXmin))
		return false;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (i = 0; i < arrayP->numProcs; i++)
	{
1082 1083 1084 1085
		int			pgprocno = arrayP->pgprocnos[i];
		volatile PGPROC *proc = &allProcs[pgprocno];
		volatile PGXACT *pgxact = &allPgXact[pgprocno];
		TransactionId pxid;
1086 1087

		/* Fetch xid just once - see GetNewTransactionId */
1088
		pxid = pgxact->xid;
1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108

		if (!TransactionIdIsValid(pxid))
			continue;

		if (proc->pid == 0)
			continue;			/* ignore prepared transactions */

		if (TransactionIdEquals(pxid, xid))
		{
			result = true;
			break;
		}
	}

	LWLockRelease(ProcArrayLock);

	return result;
}


1109 1110 1111 1112
/*
 * GetOldestXmin -- returns oldest transaction that was running
 *					when any current transaction was started.
 *
R
Robert Haas 已提交
1113 1114
 * If rel is NULL or a shared relation, all backends are considered, otherwise
 * only backends running in this database are considered.
1115
 *
1116 1117
 * If ignoreVacuum is TRUE then backends with the PROC_IN_VACUUM flag set are
 * ignored.
1118
 *
R
Robert Haas 已提交
1119 1120 1121 1122 1123 1124
 * This is used by VACUUM to decide which deleted tuples must be preserved in
 * the passed in table. For shared relations backends in all databases must be
 * considered, but for non-shared relations that's not required, since only
 * backends in my own database could ever see the tuples in them. Also, we can
 * ignore concurrently running lazy VACUUMs because (a) they must be working
 * on other tables, and (b) they don't need to do snapshot-based lookups.
1125
 *
R
Robert Haas 已提交
1126 1127 1128
 * This is also used to determine where to truncate pg_subtrans.  For that
 * backends in all databases have to be considered, so rel = NULL has to be
 * passed in.
1129
 *
1130
 * Note: we include all currently running xids in the set of considered xids.
1131 1132
 * This ensures that if a just-started xact has not yet set its snapshot,
 * when it does set the snapshot it cannot set xmin less than what we compute.
1133
 * See notes in src/backend/access/transam/README.
1134 1135 1136 1137 1138
 *
 * Note: despite the above, it's possible for the calculated value to move
 * backwards on repeated calls. The calculated value is conservative, so that
 * anything older is definitely not considered as running by anyone anymore,
 * but the exact value calculated depends on a number of things. For example,
R
Robert Haas 已提交
1139
 * if rel = NULL and there are no transactions running in the current
1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155
 * database, GetOldestXmin() returns latestCompletedXid. If a transaction
 * begins after that, its xmin will include in-progress transactions in other
 * databases that started earlier, so another call will return a lower value.
 * Nonetheless it is safe to vacuum a table in the current database with the
 * first result.  There are also replication-related effects: a walsender
 * process can set its xmin based on transactions that are no longer running
 * in the master but are still being replayed on the standby, thus possibly
 * making the GetOldestXmin reading go backwards.  In this case there is a
 * possibility that we lose data that the standby would like to have, but
 * there is little we can do about that --- data is only protected if the
 * walsender runs continuously while queries are executed on the standby.
 * (The Hot Standby code deals with such cases by failing standby queries
 * that needed to access already-removed data, so there's no integrity bug.)
 * The return value is also adjusted with vacuum_defer_cleanup_age, so
 * increasing that setting on the fly is another easy way to make
 * GetOldestXmin() move backwards, with no consequences for data integrity.
1156 1157
 */
TransactionId
R
Robert Haas 已提交
1158
GetOldestXmin(Relation rel, bool ignoreVacuum)
1159 1160 1161 1162
{
	ProcArrayStruct *arrayP = procArray;
	TransactionId result;
	int			index;
R
Robert Haas 已提交
1163 1164
	bool		allDbs;

R
Robert Haas 已提交
1165
	volatile TransactionId replication_slot_xmin = InvalidTransactionId;
R
Robert Haas 已提交
1166 1167 1168 1169 1170 1171 1172 1173
	volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;

	/*
	 * If we're not computing a relation specific limit, or if a shared
	 * relation has been passed in, backends in all databases have to be
	 * considered.
	 */
	allDbs = rel == NULL || rel->rd_rel->relisshared;
1174

1175 1176 1177
	/* Cannot look for individual databases during recovery */
	Assert(allDbs || !RecoveryInProgress());

1178 1179
	LWLockAcquire(ProcArrayLock, LW_SHARED);

1180
	/*
B
Bruce Momjian 已提交
1181 1182 1183 1184
	 * We initialize the MIN() calculation with latestCompletedXid + 1. This
	 * is a lower bound for the XIDs that might appear in the ProcArray later,
	 * and so protects us against overestimating the result due to future
	 * additions.
1185
	 */
1186 1187 1188
	result = ShmemVariableCache->latestCompletedXid;
	Assert(TransactionIdIsNormal(result));
	TransactionIdAdvance(result);
1189 1190 1191

	for (index = 0; index < arrayP->numProcs; index++)
	{
1192 1193 1194
		int			pgprocno = arrayP->pgprocnos[index];
		volatile PGPROC *proc = &allProcs[pgprocno];
		volatile PGXACT *pgxact = &allPgXact[pgprocno];
1195

R
Robert Haas 已提交
1196 1197 1198 1199 1200 1201 1202
		/*
		 * Backend is doing logical decoding which manages xmin separately,
		 * check below.
		 */
		if (pgxact->vacuumFlags & PROC_IN_LOGICAL_DECODING)
			continue;

1203
		if (ignoreVacuum && (pgxact->vacuumFlags & PROC_IN_VACUUM))
1204 1205
			continue;

1206 1207
		if (allDbs ||
			proc->databaseId == MyDatabaseId ||
1208
			proc->databaseId == 0)		/* always include WalSender */
1209 1210
		{
			/* Fetch xid just once - see GetNewTransactionId */
1211
			TransactionId xid = pgxact->xid;
1212

1213 1214 1215 1216 1217 1218 1219 1220 1221
			/* First consider the transaction's own Xid, if any */
			if (TransactionIdIsNormal(xid) &&
				TransactionIdPrecedes(xid, result))
				result = xid;

			/*
			 * Also consider the transaction's Xmin, if set.
			 *
			 * We must check both Xid and Xmin because a transaction might
B
Bruce Momjian 已提交
1222 1223
			 * have an Xmin but not (yet) an Xid; conversely, if it has an
			 * Xid, that could determine some not-yet-set Xmin.
1224
			 */
1225
			xid = pgxact->xmin; /* Fetch just once */
1226 1227 1228
			if (TransactionIdIsNormal(xid) &&
				TransactionIdPrecedes(xid, result))
				result = xid;
1229 1230 1231
		}
	}

R
Robert Haas 已提交
1232 1233
	/* fetch into volatile var while ProcArrayLock is held */
	replication_slot_xmin = procArray->replication_slot_xmin;
R
Robert Haas 已提交
1234
	replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
R
Robert Haas 已提交
1235

1236 1237 1238
	if (RecoveryInProgress())
	{
		/*
1239 1240
		 * Check to see whether KnownAssignedXids contains an xid value older
		 * than the main procarray.
1241 1242
		 */
		TransactionId kaxmin = KnownAssignedXidsGetOldestXmin();
1243

1244 1245
		LWLockRelease(ProcArrayLock);

1246 1247
		if (TransactionIdIsNormal(kaxmin) &&
			TransactionIdPrecedes(kaxmin, result))
1248
			result = kaxmin;
1249
	}
1250 1251 1252 1253 1254 1255
	else
	{
		/*
		 * No other information needed, so release the lock immediately.
		 */
		LWLockRelease(ProcArrayLock);
1256

1257
		/*
1258 1259
		 * Compute the cutoff XID by subtracting vacuum_defer_cleanup_age,
		 * being careful not to generate a "permanent" XID.
1260 1261 1262 1263
		 *
		 * vacuum_defer_cleanup_age provides some additional "slop" for the
		 * benefit of hot standby queries on slave servers.  This is quick and
		 * dirty, and perhaps not all that useful unless the master has a
1264 1265 1266 1267 1268 1269
		 * predictable transaction rate, but it offers some protection when
		 * there's no walsender connection.  Note that we are assuming
		 * vacuum_defer_cleanup_age isn't large enough to cause wraparound ---
		 * so guc.c should limit it to no more than the xidStopLimit threshold
		 * in varsup.c.  Also note that we intentionally don't apply
		 * vacuum_defer_cleanup_age on standby servers.
1270 1271 1272 1273 1274
		 */
		result -= vacuum_defer_cleanup_age;
		if (!TransactionIdIsNormal(result))
			result = FirstNormalTransactionId;
	}
1275

R
Robert Haas 已提交
1276 1277 1278 1279 1280 1281 1282
	/*
	 * Check whether there are replication slots requiring an older xmin.
	 */
	if (TransactionIdIsValid(replication_slot_xmin) &&
		NormalTransactionIdPrecedes(replication_slot_xmin, result))
		result = replication_slot_xmin;

R
Robert Haas 已提交
1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294
	/*
	 * After locks have been released and defer_cleanup_age has been applied,
	 * check whether we need to back up further to make logical decoding
	 * possible. We need to do so if we're computing the global limit (rel =
	 * NULL) or if the passed relation is a catalog relation of some kind.
	 */
	if ((rel == NULL ||
		 RelationIsAccessibleInLogicalDecoding(rel)) &&
		TransactionIdIsValid(replication_slot_catalog_xmin) &&
		NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
		result = replication_slot_catalog_xmin;

1295 1296 1297
	return result;
}

1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319
/*
 * GetMaxSnapshotXidCount -- get max size for snapshot XID array
 *
 * We have to export this for use by snapmgr.c.
 */
int
GetMaxSnapshotXidCount(void)
{
	return procArray->maxProcs;
}

/*
 * GetMaxSnapshotSubxidCount -- get max size for snapshot sub-XID array
 *
 * We have to export this for use by snapmgr.c.
 */
int
GetMaxSnapshotSubxidCount(void)
{
	return TOTAL_MAX_CACHED_SUBXIDS;
}

1320
/*
1321 1322 1323
 * GetSnapshotData -- returns information about running transactions.
 *
 * The returned snapshot includes xmin (lowest still-running xact ID),
1324
 * xmax (highest completed xact ID + 1), and a list of running xact IDs
1325 1326 1327 1328 1329 1330 1331 1332
 * in the range xmin <= xid < xmax.  It is used as follows:
 *		All xact IDs < xmin are considered finished.
 *		All xact IDs >= xmax are considered still running.
 *		For an xact ID xmin <= xid < xmax, consult list to see whether
 *		it is considered running or not.
 * This ensures that the set of transactions seen as "running" by the
 * current xact will not change after it takes the snapshot.
 *
1333 1334 1335 1336 1337
 * All running top-level XIDs are included in the snapshot, except for lazy
 * VACUUM processes.  We also try to include running subtransaction XIDs,
 * but since PGPROC has only a limited cache area for subxact XIDs, full
 * information may not be available.  If we find any overflowed subxid arrays,
 * we have to mark the snapshot's subxid data as overflowed, and extra work
1338
 * *may* need to be done to determine what's running (see XidInMVCCSnapshot()
1339
 * in tqual.c).
1340 1341 1342
 *
 * We also update the following backend-global variables:
 *		TransactionXmin: the oldest xmin of any snapshot in use in the
1343
 *			current transaction (this is the same as MyPgXact->xmin).
1344 1345 1346
 *		RecentXmin: the xmin computed for the most recent snapshot.  XIDs
 *			older than this are known not running any more.
 *		RecentGlobalXmin: the global xmin (oldest TransactionXmin across all
1347
 *			running transactions, except those running LAZY VACUUM).  This is
T
Tom Lane 已提交
1348
 *			the same computation done by GetOldestXmin(true, true).
R
Robert Haas 已提交
1349 1350
 *		RecentGlobalDataXmin: the global xmin for non-catalog tables
 *			>= RecentGlobalXmin
1351 1352 1353
 *
 * Note: this function should probably not be called with an argument that's
 * not statically allocated (see xip allocation below).
1354 1355
 */
Snapshot
1356
GetSnapshotData(Snapshot snapshot)
1357 1358 1359 1360 1361 1362 1363
{
	ProcArrayStruct *arrayP = procArray;
	TransactionId xmin;
	TransactionId xmax;
	TransactionId globalxmin;
	int			index;
	int			count = 0;
1364
	int			subcount = 0;
1365
	bool		suboverflowed = false;
R
Robert Haas 已提交
1366
	volatile TransactionId replication_slot_xmin = InvalidTransactionId;
R
Robert Haas 已提交
1367
	volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
1368 1369 1370 1371

	Assert(snapshot != NULL);

	/*
B
Bruce Momjian 已提交
1372 1373
	 * Allocating space for maxProcs xids is usually overkill; numProcs would
	 * be sufficient.  But it seems better to do the malloc while not holding
1374 1375
	 * the lock, so we can't look at numProcs.  Likewise, we allocate much
	 * more subxip storage than is probably needed.
1376 1377
	 *
	 * This does open a possibility for avoiding repeated malloc/free: since
B
Bruce Momjian 已提交
1378
	 * maxProcs does not change at runtime, we can simply reuse the previous
1379
	 * xip arrays if any.  (This relies on the fact that all callers pass
B
Bruce Momjian 已提交
1380
	 * static SnapshotData structs.)
1381 1382 1383 1384
	 */
	if (snapshot->xip == NULL)
	{
		/*
B
Bruce Momjian 已提交
1385 1386
		 * First call for this snapshot. Snapshot is same size whether or not
		 * we are in recovery, see later comments.
1387 1388
		 */
		snapshot->xip = (TransactionId *)
1389
			malloc(GetMaxSnapshotXidCount() * sizeof(TransactionId));
1390 1391 1392 1393
		if (snapshot->xip == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
1394 1395
		Assert(snapshot->subxip == NULL);
		snapshot->subxip = (TransactionId *)
1396
			malloc(GetMaxSnapshotSubxidCount() * sizeof(TransactionId));
1397 1398 1399 1400
		if (snapshot->subxip == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
1401 1402 1403
	}

	/*
B
Bruce Momjian 已提交
1404
	 * It is sufficient to get shared lock on ProcArrayLock, even if we are
1405
	 * going to set MyPgXact->xmin.
1406
	 */
1407
	LWLockAcquire(ProcArrayLock, LW_SHARED);
1408

1409 1410 1411 1412
	/* xmax is always latestCompletedXid + 1 */
	xmax = ShmemVariableCache->latestCompletedXid;
	Assert(TransactionIdIsNormal(xmax));
	TransactionIdAdvance(xmax);
1413

1414 1415 1416
	/* initialize xmin calculation with xmax */
	globalxmin = xmin = xmax;

1417 1418
	snapshot->takenDuringRecovery = RecoveryInProgress();

1419
	if (!snapshot->takenDuringRecovery)
1420
	{
1421
		int		   *pgprocnos = arrayP->pgprocnos;
1422 1423
		int			numProcs;

1424
		/*
B
Bruce Momjian 已提交
1425 1426
		 * Spin over procArray checking xid, xmin, and subxids.  The goal is
		 * to gather all active xids, find the lowest xmin, and try to record
1427
		 * subxids.
1428
		 */
1429 1430
		numProcs = arrayP->numProcs;
		for (index = 0; index < numProcs; index++)
1431
		{
1432 1433 1434
			int			pgprocno = pgprocnos[index];
			volatile PGXACT *pgxact = &allPgXact[pgprocno];
			TransactionId xid;
1435

R
Robert Haas 已提交
1436 1437 1438 1439 1440 1441 1442
			/*
			 * Backend is doing logical decoding which manages xmin
			 * separately, check below.
			 */
			if (pgxact->vacuumFlags & PROC_IN_LOGICAL_DECODING)
				continue;

1443
			/* Ignore procs running LAZY VACUUM */
1444
			if (pgxact->vacuumFlags & PROC_IN_VACUUM)
1445
				continue;
1446

1447
			/* Update globalxmin to be the smallest valid xmin */
1448
			xid = pgxact->xmin; /* fetch just once */
1449
			if (TransactionIdIsNormal(xid) &&
1450
				NormalTransactionIdPrecedes(xid, globalxmin))
1451
				globalxmin = xid;
1452 1453

			/* Fetch xid just once - see GetNewTransactionId */
1454
			xid = pgxact->xid;
1455 1456

			/*
1457 1458 1459 1460
			 * If the transaction has no XID assigned, we can skip it; it
			 * won't have sub-XIDs either.  If the XID is >= xmax, we can also
			 * skip it; such transactions will be treated as running anyway
			 * (and any sub-XIDs will also be >= xmax).
1461
			 */
1462 1463
			if (!TransactionIdIsNormal(xid)
				|| !NormalTransactionIdPrecedes(xid, xmax))
1464
				continue;
1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476

			/*
			 * We don't include our own XIDs (if any) in the snapshot, but we
			 * must include them in xmin.
			 */
			if (NormalTransactionIdPrecedes(xid, xmin))
				xmin = xid;
			if (pgxact == MyPgXact)
				continue;

			/* Add XID to snapshot. */
			snapshot->xip[count++] = xid;
1477

1478
			/*
B
Bruce Momjian 已提交
1479 1480 1481 1482 1483
			 * Save subtransaction XIDs if possible (if we've already
			 * overflowed, there's no point).  Note that the subxact XIDs must
			 * be later than their parent, so no need to check them against
			 * xmin.  We could filter against xmax, but it seems better not to
			 * do that much work while holding the ProcArrayLock.
1484 1485
			 *
			 * The other backend can add more subxids concurrently, but cannot
B
Bruce Momjian 已提交
1486
			 * remove any.  Hence it's important to fetch nxids just once.
B
Bruce Momjian 已提交
1487 1488 1489
			 * Should be safe to use memcpy, though.  (We needn't worry about
			 * missing any xids added concurrently, because they must postdate
			 * xmax.)
1490 1491 1492
			 *
			 * Again, our own XIDs are not included in the snapshot.
			 */
1493
			if (!suboverflowed)
1494
			{
1495
				if (pgxact->overflowed)
1496 1497
					suboverflowed = true;
				else
1498
				{
1499
					int			nxids = pgxact->nxids;
1500 1501 1502

					if (nxids > 0)
					{
1503
						volatile PGPROC *proc = &allProcs[pgprocno];
1504

1505 1506 1507 1508 1509
						memcpy(snapshot->subxip + subcount,
							   (void *) proc->subxids.xids,
							   nxids * sizeof(TransactionId));
						subcount += nxids;
					}
1510 1511 1512
				}
			}
		}
1513
	}
1514
	else
1515 1516
	{
		/*
1517 1518
		 * We're in hot standby, so get XIDs from KnownAssignedXids.
		 *
1519 1520 1521 1522 1523
		 * We store all xids directly into subxip[]. Here's why:
		 *
		 * In recovery we don't know which xids are top-level and which are
		 * subxacts, a design choice that greatly simplifies xid processing.
		 *
B
Bruce Momjian 已提交
1524 1525 1526 1527
		 * It seems like we would want to try to put xids into xip[] only, but
		 * that is fairly small. We would either need to make that bigger or
		 * to increase the rate at which we WAL-log xid assignment; neither is
		 * an appealing choice.
1528 1529 1530 1531
		 *
		 * We could try to store xids into xip[] first and then into subxip[]
		 * if there are too many xids. That only works if the snapshot doesn't
		 * overflow because we do not search subxip[] in that case. A simpler
B
Bruce Momjian 已提交
1532 1533
		 * way is to just store all xids in the subxact array because this is
		 * by far the bigger array. We just leave the xip array empty.
1534 1535 1536 1537
		 *
		 * Either way we need to change the way XidInMVCCSnapshot() works
		 * depending upon when the snapshot was taken, or change normal
		 * snapshot processing so it matches.
1538
		 *
B
Bruce Momjian 已提交
1539 1540 1541 1542 1543
		 * Note: It is possible for recovery to end before we finish taking
		 * the snapshot, and for newly assigned transaction ids to be added to
		 * the ProcArray.  xmax cannot change while we hold ProcArrayLock, so
		 * those newly added transaction ids would be filtered away, so we
		 * need not be concerned about them.
1544
		 */
1545 1546
		subcount = KnownAssignedXidsGetAndSetXmin(snapshot->subxip, &xmin,
												  xmax);
1547

1548
		if (TransactionIdPrecedesOrEquals(xmin, procArray->lastOverflowedXid))
1549 1550 1551
			suboverflowed = true;
	}

R
Robert Haas 已提交
1552 1553 1554

	/* fetch into volatile var while ProcArrayLock is held */
	replication_slot_xmin = procArray->replication_slot_xmin;
R
Robert Haas 已提交
1555
	replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
R
Robert Haas 已提交
1556

1557 1558
	if (!TransactionIdIsValid(MyPgXact->xmin))
		MyPgXact->xmin = TransactionXmin = xmin;
R
Robert Haas 已提交
1559

1560 1561 1562
	LWLockRelease(ProcArrayLock);

	/*
B
Bruce Momjian 已提交
1563 1564 1565
	 * Update globalxmin to include actual process xids.  This is a slightly
	 * different way of computing it than GetOldestXmin uses, but should give
	 * the same result.
1566 1567 1568 1569 1570
	 */
	if (TransactionIdPrecedes(xmin, globalxmin))
		globalxmin = xmin;

	/* Update global variables too */
1571 1572 1573
	RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
	if (!TransactionIdIsNormal(RecentGlobalXmin))
		RecentGlobalXmin = FirstNormalTransactionId;
R
Robert Haas 已提交
1574 1575 1576 1577 1578 1579

	/* Check whether there's a replication slot requiring an older xmin. */
	if (TransactionIdIsValid(replication_slot_xmin) &&
		NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin))
		RecentGlobalXmin = replication_slot_xmin;

R
Robert Haas 已提交
1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590
	/* Non-catalog tables can be vacuumed if older than this xid */
	RecentGlobalDataXmin = RecentGlobalXmin;

	/*
	 * Check whether there's a replication slot requiring an older catalog
	 * xmin.
	 */
	if (TransactionIdIsNormal(replication_slot_catalog_xmin) &&
		NormalTransactionIdPrecedes(replication_slot_catalog_xmin, RecentGlobalXmin))
		RecentGlobalXmin = replication_slot_catalog_xmin;

1591 1592 1593 1594 1595
	RecentXmin = xmin;

	snapshot->xmin = xmin;
	snapshot->xmax = xmax;
	snapshot->xcnt = count;
1596
	snapshot->subxcnt = subcount;
1597
	snapshot->suboverflowed = suboverflowed;
1598

1599
	snapshot->curcid = GetCurrentCommandId(false);
1600

1601
	/*
1602 1603
	 * This is a new snapshot, so set both refcounts are zero, and mark it as
	 * not copied in persistent memory.
1604 1605 1606 1607 1608
	 */
	snapshot->active_count = 0;
	snapshot->regd_count = 0;
	snapshot->copied = false;

1609 1610 1611
	return snapshot;
}

1612
/*
1613
 * ProcArrayInstallImportedXmin -- install imported xmin into MyPgXact->xmin
1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637
 *
 * This is called when installing a snapshot imported from another
 * transaction.  To ensure that OldestXmin doesn't go backwards, we must
 * check that the source transaction is still running, and we'd better do
 * that atomically with installing the new xmin.
 *
 * Returns TRUE if successful, FALSE if source xact is no longer running.
 */
bool
ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid)
{
	bool		result = false;
	ProcArrayStruct *arrayP = procArray;
	int			index;

	Assert(TransactionIdIsNormal(xmin));
	if (!TransactionIdIsNormal(sourcexid))
		return false;

	/* Get lock so source xact can't end while we're doing this */
	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
1638 1639 1640 1641
		int			pgprocno = arrayP->pgprocnos[index];
		volatile PGPROC *proc = &allProcs[pgprocno];
		volatile PGXACT *pgxact = &allPgXact[pgprocno];
		TransactionId xid;
1642 1643

		/* Ignore procs running LAZY VACUUM */
1644
		if (pgxact->vacuumFlags & PROC_IN_VACUUM)
1645 1646
			continue;

1647
		xid = pgxact->xid;		/* fetch just once */
1648 1649 1650 1651
		if (xid != sourcexid)
			continue;

		/*
1652 1653 1654
		 * We check the transaction's database ID for paranoia's sake: if it's
		 * in another DB then its xmin does not cover us.  Caller should have
		 * detected this already, so we just treat any funny cases as
1655 1656 1657 1658 1659 1660 1661 1662
		 * "transaction not found".
		 */
		if (proc->databaseId != MyDatabaseId)
			continue;

		/*
		 * Likewise, let's just make real sure its xmin does cover us.
		 */
1663
		xid = pgxact->xmin;		/* fetch just once */
1664 1665 1666 1667 1668 1669 1670
		if (!TransactionIdIsNormal(xid) ||
			!TransactionIdPrecedesOrEquals(xid, xmin))
			continue;

		/*
		 * We're good.  Install the new xmin.  As in GetSnapshotData, set
		 * TransactionXmin too.  (Note that because snapmgr.c called
1671 1672
		 * GetSnapshotData first, we'll be overwriting a valid xmin here, so
		 * we don't check that.)
1673
		 */
1674
		MyPgXact->xmin = TransactionXmin = xmin;
1675 1676 1677 1678 1679 1680 1681 1682 1683 1684

		result = true;
		break;
	}

	LWLockRelease(ProcArrayLock);

	return result;
}

1685 1686 1687
/*
 * GetRunningTransactionData -- returns information about running transactions.
 *
1688
 * Similar to GetSnapshotData but returns more information. We include
1689
 * all PGXACTs with an assigned TransactionId, even VACUUM processes.
1690
 *
R
Robert Haas 已提交
1691 1692 1693 1694 1695
 * We acquire XidGenLock and ProcArrayLock, but the caller is responsible for
 * releasing them. Acquiring XidGenLock ensures that no new XIDs enter the proc
 * array until the caller has WAL-logged this snapshot, and releases the
 * lock. Acquiring ProcArrayLock ensures that no transactions commit until the
 * lock is released.
1696
 *
1697 1698 1699
 * The returned data structure is statically allocated; caller should not
 * modify it, and must not assume it is valid past the next call.
 *
1700 1701 1702 1703 1704 1705
 * This is never executed during recovery so there is no need to look at
 * KnownAssignedXids.
 *
 * We don't worry about updating other counters, we want to keep this as
 * simple as possible and leave GetSnapshotData() as the primary code for
 * that bookkeeping.
1706 1707 1708 1709 1710
 *
 * Note that if any transaction has overflowed its cached subtransactions
 * then there is no real need include any subtransactions. That isn't a
 * common enough case to worry about optimising the size of the WAL record,
 * and we may wish to see that data for diagnostic purposes anyway.
1711 1712 1713 1714
 */
RunningTransactions
GetRunningTransactionData(void)
{
1715 1716 1717
	/* result workspace */
	static RunningTransactionsData CurrentRunningXactsData;

1718
	ProcArrayStruct *arrayP = procArray;
1719
	RunningTransactions CurrentRunningXacts = &CurrentRunningXactsData;
1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735
	TransactionId latestCompletedXid;
	TransactionId oldestRunningXid;
	TransactionId *xids;
	int			index;
	int			count;
	int			subcount;
	bool		suboverflowed;

	Assert(!RecoveryInProgress());

	/*
	 * Allocating space for maxProcs xids is usually overkill; numProcs would
	 * be sufficient.  But it seems better to do the malloc while not holding
	 * the lock, so we can't look at numProcs.  Likewise, we allocate much
	 * more subxip storage than is probably needed.
	 *
1736
	 * Should only be allocated in bgwriter, since only ever executed during
B
Bruce Momjian 已提交
1737
	 * checkpoints.
1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766
	 */
	if (CurrentRunningXacts->xids == NULL)
	{
		/*
		 * First call
		 */
		CurrentRunningXacts->xids = (TransactionId *)
			malloc(TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
		if (CurrentRunningXacts->xids == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
	}

	xids = CurrentRunningXacts->xids;

	count = subcount = 0;
	suboverflowed = false;

	/*
	 * Ensure that no xids enter or leave the procarray while we obtain
	 * snapshot.
	 */
	LWLockAcquire(ProcArrayLock, LW_SHARED);
	LWLockAcquire(XidGenLock, LW_SHARED);

	latestCompletedXid = ShmemVariableCache->latestCompletedXid;

	oldestRunningXid = ShmemVariableCache->nextXid;
B
Bruce Momjian 已提交
1767

1768
	/*
1769
	 * Spin over procArray collecting all xids
1770 1771 1772
	 */
	for (index = 0; index < arrayP->numProcs; index++)
	{
1773
		int			pgprocno = arrayP->pgprocnos[index];
1774
		volatile PGXACT *pgxact = &allPgXact[pgprocno];
1775 1776 1777
		TransactionId xid;

		/* Fetch xid just once - see GetNewTransactionId */
1778
		xid = pgxact->xid;
1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791

		/*
		 * We don't need to store transactions that don't have a TransactionId
		 * yet because they will not show as running on a standby server.
		 */
		if (!TransactionIdIsValid(xid))
			continue;

		xids[count++] = xid;

		if (TransactionIdPrecedes(xid, oldestRunningXid))
			oldestRunningXid = xid;

1792 1793 1794
		if (pgxact->overflowed)
			suboverflowed = true;
	}
1795

1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807
	/*
	 * Spin over procArray collecting all subxids, but only if there hasn't
	 * been a suboverflow.
	 */
	if (!suboverflowed)
	{
		for (index = 0; index < arrayP->numProcs; index++)
		{
			int			pgprocno = arrayP->pgprocnos[index];
			volatile PGPROC *proc = &allProcs[pgprocno];
			volatile PGXACT *pgxact = &allPgXact[pgprocno];
			int			nxids;
1808 1809

			/*
1810 1811
			 * Save subtransaction XIDs. Other backends can't add or remove
			 * entries while we're holding XidGenLock.
1812
			 */
1813 1814 1815 1816 1817 1818 1819 1820 1821 1822
			nxids = pgxact->nxids;
			if (nxids > 0)
			{
				memcpy(&xids[count], (void *) proc->subxids.xids,
					   nxids * sizeof(TransactionId));
				count += nxids;
				subcount += nxids;

				/*
				 * Top-level XID of a transaction is always less than any of
B
Bruce Momjian 已提交
1823 1824
				 * its subxids, so we don't need to check if any of the
				 * subxids are smaller than oldestRunningXid
1825 1826
				 */
			}
1827 1828 1829
		}
	}

R
Robert Haas 已提交
1830 1831 1832 1833 1834 1835 1836 1837 1838
	/*
	 * It's important *not* to include the limits set by slots here because
	 * snapbuild.c uses oldestRunningXid to manage its xmin horizon. If those
	 * were to be included here the initial value could never increase because
	 * of a circular dependency where slots only increase their limits when
	 * running xacts increases oldestRunningXid and running xacts only
	 * increases if slots do.
	 */

1839 1840
	CurrentRunningXacts->xcnt = count - subcount;
	CurrentRunningXacts->subxcnt = subcount;
1841 1842 1843
	CurrentRunningXacts->subxid_overflow = suboverflowed;
	CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid;
	CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
1844
	CurrentRunningXacts->latestCompletedXid = latestCompletedXid;
1845

1846 1847 1848 1849
	Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid));
	Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid));
	Assert(TransactionIdIsNormal(CurrentRunningXacts->latestCompletedXid));

R
Robert Haas 已提交
1850 1851
	/* We don't release the locks here, the caller is responsible for that */

1852 1853 1854
	return CurrentRunningXacts;
}

1855 1856 1857 1858
/*
 * GetOldestActiveTransactionId()
 *
 * Similar to GetSnapshotData but returns just oldestActiveXid. We include
1859
 * all PGXACTs with an assigned TransactionId, even VACUUM processes.
1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880
 * We look at all databases, though there is no need to include WALSender
 * since this has no effect on hot standby conflicts.
 *
 * This is never executed during recovery so there is no need to look at
 * KnownAssignedXids.
 *
 * We don't worry about updating other counters, we want to keep this as
 * simple as possible and leave GetSnapshotData() as the primary code for
 * that bookkeeping.
 */
TransactionId
GetOldestActiveTransactionId(void)
{
	ProcArrayStruct *arrayP = procArray;
	TransactionId oldestRunningXid;
	int			index;

	Assert(!RecoveryInProgress());

	LWLockAcquire(ProcArrayLock, LW_SHARED);

1881 1882 1883 1884 1885 1886 1887
	/*
	 * It's okay to read nextXid without acquiring XidGenLock because (1) we
	 * assume TransactionIds can be read atomically and (2) we don't care if
	 * we get a slightly stale value.  It can't be very stale anyway, because
	 * the LWLockAcquire above will have done any necessary memory
	 * interlocking.
	 */
1888 1889 1890 1891 1892 1893 1894
	oldestRunningXid = ShmemVariableCache->nextXid;

	/*
	 * Spin over procArray collecting all xids and subxids.
	 */
	for (index = 0; index < arrayP->numProcs; index++)
	{
1895
		int			pgprocno = arrayP->pgprocnos[index];
1896
		volatile PGXACT *pgxact = &allPgXact[pgprocno];
1897 1898 1899
		TransactionId xid;

		/* Fetch xid just once - see GetNewTransactionId */
1900
		xid = pgxact->xid;
1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919

		if (!TransactionIdIsNormal(xid))
			continue;

		if (TransactionIdPrecedes(xid, oldestRunningXid))
			oldestRunningXid = xid;

		/*
		 * Top-level XID of a transaction is always less than any of its
		 * subxids, so we don't need to check if any of the subxids are
		 * smaller than oldestRunningXid
		 */
	}

	LWLockRelease(ProcArrayLock);

	return oldestRunningXid;
}

R
Robert Haas 已提交
1920 1921 1922 1923 1924 1925 1926 1927 1928
/*
 * GetOldestSafeDecodingTransactionId -- lowest xid not affected by vacuum
 *
 * Returns the oldest xid that we can guarantee not to have been affected by
 * vacuum, i.e. no rows >= that xid have been vacuumed away unless the
 * transaction aborted. Note that the value can (and most of the time will) be
 * much more conservative than what really has been affected by vacuum, but we
 * currently don't have better data available.
 *
H
Heikki Linnakangas 已提交
1929
 * This is useful to initialize the cutoff xid after which a new changeset
R
Robert Haas 已提交
1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948
 * extraction replication slot can start decoding changes.
 *
 * Must be called with ProcArrayLock held either shared or exclusively,
 * although most callers will want to use exclusive mode since it is expected
 * that the caller will immediately use the xid to peg the xmin horizon.
 */
TransactionId
GetOldestSafeDecodingTransactionId(void)
{
	ProcArrayStruct *arrayP = procArray;
	TransactionId oldestSafeXid;
	int			index;
	bool		recovery_in_progress = RecoveryInProgress();

	Assert(LWLockHeldByMe(ProcArrayLock));

	/*
	 * Acquire XidGenLock, so no transactions can acquire an xid while we're
	 * running. If no transaction with xid were running concurrently a new xid
1949
	 * could influence the RecentXmin et al.
R
Robert Haas 已提交
1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005
	 *
	 * We initialize the computation to nextXid since that's guaranteed to be
	 * a safe, albeit pessimal, value.
	 */
	LWLockAcquire(XidGenLock, LW_SHARED);
	oldestSafeXid = ShmemVariableCache->nextXid;

	/*
	 * If there's already a slot pegging the xmin horizon, we can start with
	 * that value, it's guaranteed to be safe since it's computed by this
	 * routine initally and has been enforced since.
	 */
	if (TransactionIdIsValid(procArray->replication_slot_catalog_xmin) &&
		TransactionIdPrecedes(procArray->replication_slot_catalog_xmin,
							  oldestSafeXid))
		oldestSafeXid = procArray->replication_slot_catalog_xmin;

	/*
	 * If we're not in recovery, we walk over the procarray and collect the
	 * lowest xid. Since we're called with ProcArrayLock held and have
	 * acquired XidGenLock, no entries can vanish concurrently, since
	 * PGXACT->xid is only set with XidGenLock held and only cleared with
	 * ProcArrayLock held.
	 *
	 * In recovery we can't lower the safe value besides what we've computed
	 * above, so we'll have to wait a bit longer there. We unfortunately can
	 * *not* use KnownAssignedXidsGetOldestXmin() since the KnownAssignedXids
	 * machinery can miss values and return an older value than is safe.
	 */
	if (!recovery_in_progress)
	{
		/*
		 * Spin over procArray collecting all min(PGXACT->xid)
		 */
		for (index = 0; index < arrayP->numProcs; index++)
		{
			int			pgprocno = arrayP->pgprocnos[index];
			volatile PGXACT *pgxact = &allPgXact[pgprocno];
			TransactionId xid;

			/* Fetch xid just once - see GetNewTransactionId */
			xid = pgxact->xid;

			if (!TransactionIdIsNormal(xid))
				continue;

			if (TransactionIdPrecedes(xid, oldestSafeXid))
				oldestSafeXid = xid;
		}
	}

	LWLockRelease(XidGenLock);

	return oldestSafeXid;
}

2006
/*
2007 2008
 * GetVirtualXIDsDelayingChkpt -- Get the VXIDs of transactions that are
 * delaying checkpoint because they have critical actions in progress.
2009
 *
2010 2011
 * Constructs an array of VXIDs of transactions that are currently in commit
 * critical sections, as shown by having delayChkpt set in their PGXACT.
2012
 *
2013 2014
 * Returns a palloc'd array that should be freed by the caller.
 * *nvxids is the number of valid entries.
2015
 *
2016
 * Note that because backends set or clear delayChkpt without holding any lock,
2017 2018
 * the result is somewhat indeterminate, but we don't really care.  Even in
 * a multiprocessor with delayed writes to shared memory, it should be certain
2019 2020
 * that setting of delayChkpt will propagate to shared memory when the backend
 * takes a lock, so we cannot fail to see an virtual xact as delayChkpt if
2021
 * it's already inserted its commit record.  Whether it takes a little while
2022
 * for clearing of delayChkpt to propagate is unimportant for correctness.
2023
 */
2024 2025
VirtualTransactionId *
GetVirtualXIDsDelayingChkpt(int *nvxids)
2026
{
2027
	VirtualTransactionId *vxids;
2028
	ProcArrayStruct *arrayP = procArray;
2029
	int			count = 0;
B
Bruce Momjian 已提交
2030
	int			index;
2031

2032 2033 2034
	/* allocate what's certainly enough result space */
	vxids = (VirtualTransactionId *)
		palloc(sizeof(VirtualTransactionId) * arrayP->maxProcs);
2035 2036 2037 2038 2039

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
B
Bruce Momjian 已提交
2040 2041 2042
		int			pgprocno = arrayP->pgprocnos[index];
		volatile PGPROC *proc = &allProcs[pgprocno];
		volatile PGXACT *pgxact = &allPgXact[pgprocno];
B
Bruce Momjian 已提交
2043

2044 2045 2046
		if (pgxact->delayChkpt)
		{
			VirtualTransactionId vxid;
2047

2048 2049 2050 2051
			GET_VXID_FROM_PGPROC(vxid, *proc);
			if (VirtualTransactionIdIsValid(vxid))
				vxids[count++] = vxid;
		}
2052 2053 2054 2055
	}

	LWLockRelease(ProcArrayLock);

2056 2057
	*nvxids = count;
	return vxids;
2058 2059 2060
}

/*
2061
 * HaveVirtualXIDsDelayingChkpt -- Are any of the specified VXIDs delaying?
2062
 *
2063 2064
 * This is used with the results of GetVirtualXIDsDelayingChkpt to see if any
 * of the specified VXIDs are still in critical sections of code.
2065
 *
2066
 * Note: this is O(N^2) in the number of vxacts that are/were delaying, but
2067 2068 2069
 * those numbers should be small enough for it not to be a problem.
 */
bool
2070
HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids)
2071
{
B
Bruce Momjian 已提交
2072
	bool		result = false;
2073
	ProcArrayStruct *arrayP = procArray;
B
Bruce Momjian 已提交
2074
	int			index;
2075 2076 2077

	LWLockAcquire(ProcArrayLock, LW_SHARED);

2078
	for (index = 0; index < arrayP->numProcs; index++)
2079
	{
2080 2081 2082 2083 2084 2085 2086 2087
		int			pgprocno = arrayP->pgprocnos[index];
		volatile PGPROC *proc = &allProcs[pgprocno];
		volatile PGXACT *pgxact = &allPgXact[pgprocno];
		VirtualTransactionId vxid;

		GET_VXID_FROM_PGPROC(vxid, *proc);

		if (pgxact->delayChkpt && VirtualTransactionIdIsValid(vxid))
2088
		{
2089
			int			i;
2090

2091
			for (i = 0; i < nvxids; i++)
2092
			{
2093
				if (VirtualTransactionIdEquals(vxid, vxids[i]))
2094 2095 2096 2097 2098
				{
					result = true;
					break;
				}
			}
2099 2100
			if (result)
				break;
2101 2102 2103 2104 2105 2106 2107 2108
		}
	}

	LWLockRelease(ProcArrayLock);

	return result;
}

2109 2110
/*
 * BackendPidGetProc -- get a backend's PGPROC given its PID
2111 2112 2113 2114
 *
 * Returns NULL if not found.  Note that it is up to the caller to be
 * sure that the question remains meaningful for long enough for the
 * answer to be used ...
2115
 */
2116
PGPROC *
2117 2118 2119 2120 2121 2122
BackendPidGetProc(int pid)
{
	PGPROC	   *result = NULL;
	ProcArrayStruct *arrayP = procArray;
	int			index;

2123 2124 2125
	if (pid == 0)				/* never match dummy PGPROCs */
		return NULL;

2126 2127 2128 2129
	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
2130
		PGPROC	   *proc = &allProcs[arrayP->pgprocnos[index]];
2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143

		if (proc->pid == pid)
		{
			result = proc;
			break;
		}
	}

	LWLockRelease(ProcArrayLock);

	return result;
}

T
Tatsuo Ishii 已提交
2144 2145 2146 2147 2148 2149
/*
 * BackendXidGetPid -- get a backend's pid given its XID
 *
 * Returns 0 if not found or it's a prepared transaction.  Note that
 * it is up to the caller to be sure that the question remains
 * meaningful for long enough for the answer to be used ...
B
Bruce Momjian 已提交
2150
 *
T
Tatsuo Ishii 已提交
2151 2152
 * Only main transaction Ids are considered.  This function is mainly
 * useful for determining what backend owns a lock.
2153
 *
B
Bruce Momjian 已提交
2154
 * Beware that not every xact has an XID assigned.  However, as long as you
2155
 * only call this using an XID found on disk, you're safe.
T
Tatsuo Ishii 已提交
2156 2157 2158 2159
 */
int
BackendXidGetPid(TransactionId xid)
{
B
Bruce Momjian 已提交
2160
	int			result = 0;
T
Tatsuo Ishii 已提交
2161 2162 2163 2164 2165 2166 2167 2168 2169 2170
	ProcArrayStruct *arrayP = procArray;
	int			index;

	if (xid == InvalidTransactionId)	/* never match invalid xid */
		return 0;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
2171 2172 2173
		int			pgprocno = arrayP->pgprocnos[index];
		volatile PGPROC *proc = &allProcs[pgprocno];
		volatile PGXACT *pgxact = &allPgXact[pgprocno];
T
Tatsuo Ishii 已提交
2174

2175
		if (pgxact->xid == xid)
T
Tatsuo Ishii 已提交
2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186
		{
			result = proc->pid;
			break;
		}
	}

	LWLockRelease(ProcArrayLock);

	return result;
}

2187 2188
/*
 * IsBackendPid -- is a given pid a running backend
2189 2190
 *
 * This is not called by the backend, but is called by external modules.
2191 2192 2193 2194 2195 2196 2197
 */
bool
IsBackendPid(int pid)
{
	return (BackendPidGetProc(pid) != NULL);
}

2198 2199 2200 2201

/*
 * GetCurrentVirtualXIDs -- returns an array of currently active VXIDs.
 *
2202
 * The array is palloc'd. The number of valid entries is returned into *nvxids.
2203
 *
2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217
 * The arguments allow filtering the set of VXIDs returned.  Our own process
 * is always skipped.  In addition:
 *	If limitXmin is not InvalidTransactionId, skip processes with
 *		xmin > limitXmin.
 *	If excludeXmin0 is true, skip processes with xmin = 0.
 *	If allDbs is false, skip processes attached to other databases.
 *	If excludeVacuum isn't zero, skip processes for which
 *		(vacuumFlags & excludeVacuum) is not zero.
 *
 * Note: the purpose of the limitXmin and excludeXmin0 parameters is to
 * allow skipping backends whose oldest live snapshot is no older than
 * some snapshot we have.  Since we examine the procarray with only shared
 * lock, there are race conditions: a backend could set its xmin just after
 * we look.  Indeed, on multiprocessors with weak memory ordering, the
B
Bruce Momjian 已提交
2218
 * other backend could have set its xmin *before* we look.  We know however
2219 2220 2221 2222 2223
 * that such a backend must have held shared ProcArrayLock overlapping our
 * own hold of ProcArrayLock, else we would see its xmin update.  Therefore,
 * any snapshot the other backend is taking concurrently with our scan cannot
 * consider any transactions as still running that we think are committed
 * (since backends must hold ProcArrayLock exclusive to commit).
2224 2225
 */
VirtualTransactionId *
2226 2227 2228
GetCurrentVirtualXIDs(TransactionId limitXmin, bool excludeXmin0,
					  bool allDbs, int excludeVacuum,
					  int *nvxids)
2229 2230 2231 2232 2233 2234
{
	VirtualTransactionId *vxids;
	ProcArrayStruct *arrayP = procArray;
	int			count = 0;
	int			index;

2235
	/* allocate what's certainly enough result space */
2236
	vxids = (VirtualTransactionId *)
2237
		palloc(sizeof(VirtualTransactionId) * arrayP->maxProcs);
2238 2239 2240 2241 2242

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
2243 2244 2245
		int			pgprocno = arrayP->pgprocnos[index];
		volatile PGPROC *proc = &allProcs[pgprocno];
		volatile PGXACT *pgxact = &allPgXact[pgprocno];
2246 2247 2248 2249

		if (proc == MyProc)
			continue;

2250
		if (excludeVacuum & pgxact->vacuumFlags)
2251 2252
			continue;

2253
		if (allDbs || proc->databaseId == MyDatabaseId)
2254
		{
2255
			/* Fetch xmin just once - might change on us */
2256
			TransactionId pxmin = pgxact->xmin;
2257

2258 2259 2260
			if (excludeXmin0 && !TransactionIdIsValid(pxmin))
				continue;

2261
			/*
2262 2263
			 * InvalidTransactionId precedes all other XIDs, so a proc that
			 * hasn't set xmin yet will not be rejected by this test.
2264 2265
			 */
			if (!TransactionIdIsValid(limitXmin) ||
2266
				TransactionIdPrecedesOrEquals(pxmin, limitXmin))
2267 2268
			{
				VirtualTransactionId vxid;
2269

2270 2271 2272 2273
				GET_VXID_FROM_PGPROC(vxid, *proc);
				if (VirtualTransactionIdIsValid(vxid))
					vxids[count++] = vxid;
			}
2274 2275 2276 2277 2278
		}
	}

	LWLockRelease(ProcArrayLock);

2279
	*nvxids = count;
2280 2281 2282
	return vxids;
}

2283 2284 2285 2286 2287 2288
/*
 * GetConflictingVirtualXIDs -- returns an array of currently active VXIDs.
 *
 * Usage is limited to conflict resolution during recovery on standby servers.
 * limitXmin is supplied as either latestRemovedXid, or InvalidTransactionId
 * in cases where we cannot accurately determine a value for latestRemovedXid.
2289
 *
2290 2291 2292
 * If limitXmin is InvalidTransactionId then we want to kill everybody,
 * so we're not worried if they have a snapshot or not, nor does it really
 * matter what type of lock we hold.
2293
 *
2294 2295 2296 2297 2298 2299 2300 2301 2302 2303
 * All callers that are checking xmins always now supply a valid and useful
 * value for limitXmin. The limitXmin is always lower than the lowest
 * numbered KnownAssignedXid that is not already a FATAL error. This is
 * because we only care about cleanup records that are cleaning up tuple
 * versions from committed transactions. In that case they will only occur
 * at the point where the record is less than the lowest running xid. That
 * allows us to say that if any backend takes a snapshot concurrently with
 * us then the conflict assessment made here would never include the snapshot
 * that is being derived. So we take LW_SHARED on the ProcArray and allow
 * concurrent snapshots when limitXmin is valid. We might think about adding
B
Bruce Momjian 已提交
2304
 *	 Assert(limitXmin < lowest(KnownAssignedXids))
2305 2306
 * but that would not be true in the case of FATAL errors lagging in array,
 * but we already know those are bogus anyway, so we skip that test.
2307
 *
2308
 * If dbOid is valid we skip backends attached to other databases.
2309 2310 2311 2312 2313
 *
 * Be careful to *not* pfree the result from this function. We reuse
 * this array sufficiently often that we use malloc for the result.
 */
VirtualTransactionId *
2314
GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
2315 2316 2317 2318 2319 2320 2321
{
	static VirtualTransactionId *vxids;
	ProcArrayStruct *arrayP = procArray;
	int			count = 0;
	int			index;

	/*
2322
	 * If first time through, get workspace to remember main XIDs in. We
B
Bruce Momjian 已提交
2323 2324
	 * malloc it permanently to avoid repeated palloc/pfree overhead. Allow
	 * result space, remembering room for a terminator.
2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335
	 */
	if (vxids == NULL)
	{
		vxids = (VirtualTransactionId *)
			malloc(sizeof(VirtualTransactionId) * (arrayP->maxProcs + 1));
		if (vxids == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
	}

2336
	LWLockAcquire(ProcArrayLock, LW_SHARED);
2337 2338 2339

	for (index = 0; index < arrayP->numProcs; index++)
	{
2340 2341 2342
		int			pgprocno = arrayP->pgprocnos[index];
		volatile PGPROC *proc = &allProcs[pgprocno];
		volatile PGXACT *pgxact = &allPgXact[pgprocno];
2343 2344 2345 2346 2347 2348 2349 2350 2351

		/* Exclude prepared transactions */
		if (proc->pid == 0)
			continue;

		if (!OidIsValid(dbOid) ||
			proc->databaseId == dbOid)
		{
			/* Fetch xmin just once - can't change on us, but good coding */
2352
			TransactionId pxmin = pgxact->xmin;
2353 2354

			/*
B
Bruce Momjian 已提交
2355 2356 2357
			 * We ignore an invalid pxmin because this means that backend has
			 * no snapshot and cannot get another one while we hold exclusive
			 * lock.
2358
			 */
2359 2360
			if (!TransactionIdIsValid(limitXmin) ||
				(TransactionIdIsValid(pxmin) && !TransactionIdFollows(pxmin, limitXmin)))
2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385
			{
				VirtualTransactionId vxid;

				GET_VXID_FROM_PGPROC(vxid, *proc);
				if (VirtualTransactionIdIsValid(vxid))
					vxids[count++] = vxid;
			}
		}
	}

	LWLockRelease(ProcArrayLock);

	/* add the terminator */
	vxids[count].backendId = InvalidBackendId;
	vxids[count].localTransactionId = InvalidLocalTransactionId;

	return vxids;
}

/*
 * CancelVirtualTransaction - used in recovery conflict processing
 *
 * Returns pid of the process signaled, or 0 if not found.
 */
pid_t
2386
CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode)
2387 2388 2389 2390 2391 2392 2393 2394 2395
{
	ProcArrayStruct *arrayP = procArray;
	int			index;
	pid_t		pid = 0;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
2396 2397 2398
		int			pgprocno = arrayP->pgprocnos[index];
		volatile PGPROC *proc = &allProcs[pgprocno];
		VirtualTransactionId procvxid;
2399 2400 2401 2402 2403 2404

		GET_VXID_FROM_PGPROC(procvxid, *proc);

		if (procvxid.backendId == vxid.backendId &&
			procvxid.localTransactionId == vxid.localTransactionId)
		{
2405
			proc->recoveryConflictPending = true;
2406
			pid = proc->pid;
2407 2408 2409
			if (pid != 0)
			{
				/*
B
Bruce Momjian 已提交
2410 2411
				 * Kill the pid if it's still here. If not, that's what we
				 * wanted so ignore any errors.
2412 2413 2414
				 */
				(void) SendProcSignal(pid, sigmode, vxid.backendId);
			}
2415 2416 2417 2418 2419 2420 2421 2422
			break;
		}
	}

	LWLockRelease(ProcArrayLock);

	return pid;
}
2423

2424
/*
2425 2426 2427
 * MinimumActiveBackends --- count backends (other than myself) that are
 *		in active transactions.  Return true if the count exceeds the
 *		minimum threshold passed.  This is used as a heuristic to decide if
2428 2429
 *		a pre-XLOG-flush delay is worthwhile during commit.
 *
2430 2431
 * Do not count backends that are blocked waiting for locks, since they are
 * not going to get to run until someone else commits.
2432
 */
2433 2434
bool
MinimumActiveBackends(int min)
2435 2436 2437 2438 2439
{
	ProcArrayStruct *arrayP = procArray;
	int			count = 0;
	int			index;

2440 2441 2442 2443
	/* Quick short-circuit if no minimum is specified */
	if (min == 0)
		return true;

2444 2445
	/*
	 * Note: for speed, we don't acquire ProcArrayLock.  This is a little bit
B
Bruce Momjian 已提交
2446 2447
	 * bogus, but since we are only testing fields for zero or nonzero, it
	 * should be OK.  The result is only used for heuristic purposes anyway...
2448 2449 2450
	 */
	for (index = 0; index < arrayP->numProcs; index++)
	{
2451 2452 2453
		int			pgprocno = arrayP->pgprocnos[index];
		volatile PGPROC *proc = &allProcs[pgprocno];
		volatile PGXACT *pgxact = &allPgXact[pgprocno];
2454

2455
		/*
S
Stephen Frost 已提交
2456 2457 2458
		 * Since we're not holding a lock, need to be prepared to deal with
		 * garbage, as someone could have incremented numPucs but not yet
		 * filled the structure.
2459 2460 2461
		 *
		 * If someone just decremented numProcs, 'proc' could also point to a
		 * PGPROC entry that's no longer in the array. It still points to a
2462
		 * PGPROC struct, though, because freed PGPROC entries just go to the
2463 2464
		 * free list and are recycled. Its contents are nonsense in that case,
		 * but that's acceptable for this function.
2465
		 */
2466 2467
		if (pgprocno == -1)
			continue;			/* do not count deleted entries */
2468 2469
		if (proc == MyProc)
			continue;			/* do not count myself */
2470 2471
		if (pgxact->xid == InvalidTransactionId)
			continue;			/* do not count if no XID assigned */
2472 2473
		if (proc->pid == 0)
			continue;			/* do not count prepared xacts */
2474 2475 2476
		if (proc->waitLock != NULL)
			continue;			/* do not count if blocked on a lock */
		count++;
2477 2478
		if (count >= min)
			break;
2479 2480
	}

2481
	return count >= min;
2482 2483
}

2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497
/*
 * CountDBBackends --- count backends that are using specified database
 */
int
CountDBBackends(Oid databaseid)
{
	ProcArrayStruct *arrayP = procArray;
	int			count = 0;
	int			index;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
2498
		int			pgprocno = arrayP->pgprocnos[index];
2499
		volatile PGPROC *proc = &allProcs[pgprocno];
2500 2501 2502

		if (proc->pid == 0)
			continue;			/* do not count prepared xacts */
2503 2504
		if (!OidIsValid(databaseid) ||
			proc->databaseId == databaseid)
2505 2506 2507 2508 2509 2510 2511 2512
			count++;
	}

	LWLockRelease(ProcArrayLock);

	return count;
}

2513 2514 2515 2516
/*
 * CancelDBBackends --- cancel backends that are using specified database
 */
void
2517
CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPending)
2518 2519 2520
{
	ProcArrayStruct *arrayP = procArray;
	int			index;
2521
	pid_t		pid = 0;
2522 2523 2524 2525 2526 2527

	/* tell all backends to die */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

	for (index = 0; index < arrayP->numProcs; index++)
	{
2528
		int			pgprocno = arrayP->pgprocnos[index];
2529
		volatile PGPROC *proc = &allProcs[pgprocno];
2530

2531
		if (databaseid == InvalidOid || proc->databaseId == databaseid)
2532
		{
2533 2534 2535 2536
			VirtualTransactionId procvxid;

			GET_VXID_FROM_PGPROC(procvxid, *proc);

2537
			proc->recoveryConflictPending = conflictPending;
2538 2539 2540 2541
			pid = proc->pid;
			if (pid != 0)
			{
				/*
B
Bruce Momjian 已提交
2542 2543
				 * Kill the pid if it's still here. If not, that's what we
				 * wanted so ignore any errors.
2544
				 */
2545
				(void) SendProcSignal(pid, sigmode, procvxid.backendId);
2546
			}
2547 2548 2549 2550 2551 2552
		}
	}

	LWLockRelease(ProcArrayLock);
}

2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566
/*
 * CountUserBackends --- count backends that are used by specified user
 */
int
CountUserBackends(Oid roleid)
{
	ProcArrayStruct *arrayP = procArray;
	int			count = 0;
	int			index;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
2567
		int			pgprocno = arrayP->pgprocnos[index];
2568
		volatile PGPROC *proc = &allProcs[pgprocno];
2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580

		if (proc->pid == 0)
			continue;			/* do not count prepared xacts */
		if (proc->roleId == roleid)
			count++;
	}

	LWLockRelease(ProcArrayLock);

	return count;
}

2581
/*
2582
 * CountOtherDBBackends -- check for other backends running in the given DB
2583 2584 2585
 *
 * If there are other backends in the DB, we will wait a maximum of 5 seconds
 * for them to exit.  Autovacuum backends are encouraged to exit early by
2586
 * sending them SIGTERM, but normal user backends are just waited for.
2587 2588 2589 2590 2591
 *
 * The current backend is always ignored; it is caller's responsibility to
 * check whether the current backend uses the given DB, if it's important.
 *
 * Returns TRUE if there are (still) other backends in the DB, FALSE if not.
2592 2593
 * Also, *nbackends and *nprepared are set to the number of other backends
 * and prepared transactions in the DB, respectively.
2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604
 *
 * This function is used to interlock DROP DATABASE and related commands
 * against there being any active backends in the target DB --- dropping the
 * DB while active backends remain would be a Bad Thing.  Note that we cannot
 * detect here the possibility of a newly-started backend that is trying to
 * connect to the doomed database, so additional interlocking is needed during
 * backend startup.  The caller should normally hold an exclusive lock on the
 * target DB before calling this, which is one reason we mustn't wait
 * indefinitely.
 */
bool
2605
CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
2606 2607
{
	ProcArrayStruct *arrayP = procArray;
2608 2609

#define MAXAUTOVACPIDS	10		/* max autovacs to SIGTERM per iteration */
2610
	int			autovac_pids[MAXAUTOVACPIDS];
2611 2612 2613 2614 2615
	int			tries;

	/* 50 tries with 100ms sleep between tries makes 5 sec total wait */
	for (tries = 0; tries < 50; tries++)
	{
2616
		int			nautovacs = 0;
2617 2618 2619 2620 2621
		bool		found = false;
		int			index;

		CHECK_FOR_INTERRUPTS();

2622 2623
		*nbackends = *nprepared = 0;

2624 2625 2626 2627
		LWLockAcquire(ProcArrayLock, LW_SHARED);

		for (index = 0; index < arrayP->numProcs; index++)
		{
2628
			int			pgprocno = arrayP->pgprocnos[index];
2629 2630
			volatile PGPROC *proc = &allProcs[pgprocno];
			volatile PGXACT *pgxact = &allPgXact[pgprocno];
2631 2632 2633 2634 2635 2636 2637 2638

			if (proc->databaseId != databaseId)
				continue;
			if (proc == MyProc)
				continue;

			found = true;

2639 2640
			if (proc->pid == 0)
				(*nprepared)++;
2641 2642
			else
			{
2643
				(*nbackends)++;
2644
				if ((pgxact->vacuumFlags & PROC_IS_AUTOVACUUM) &&
2645 2646
					nautovacs < MAXAUTOVACPIDS)
					autovac_pids[nautovacs++] = proc->pid;
2647 2648 2649
			}
		}

2650 2651
		LWLockRelease(ProcArrayLock);

2652
		if (!found)
B
Bruce Momjian 已提交
2653
			return false;		/* no conflicting backends, so done */
2654

2655
		/*
2656 2657 2658 2659
		 * Send SIGTERM to any conflicting autovacuums before sleeping. We
		 * postpone this step until after the loop because we don't want to
		 * hold ProcArrayLock while issuing kill(). We have no idea what might
		 * block kill() inside the kernel...
2660 2661 2662 2663 2664
		 */
		for (index = 0; index < nautovacs; index++)
			(void) kill(autovac_pids[index], SIGTERM);	/* ignore any error */

		/* sleep, then try again */
B
Bruce Momjian 已提交
2665
		pg_usleep(100 * 1000L); /* 100ms */
2666 2667
	}

B
Bruce Momjian 已提交
2668
	return true;				/* timed out, still conflicts */
2669 2670
}

R
Robert Haas 已提交
2671 2672 2673 2674 2675 2676 2677 2678
/*
 * ProcArraySetReplicationSlotXmin
 *
 * Install limits to future computations of the xmin horizon to prevent vacuum
 * and HOT pruning from removing affected rows still needed by clients with
 * replicaton slots.
 */
void
R
Robert Haas 已提交
2679 2680
ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin,
								bool already_locked)
R
Robert Haas 已提交
2681
{
R
Robert Haas 已提交
2682 2683 2684 2685 2686
	Assert(!already_locked || LWLockHeldByMe(ProcArrayLock));

	if (!already_locked)
		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

R
Robert Haas 已提交
2687
	procArray->replication_slot_xmin = xmin;
R
Robert Haas 已提交
2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711
	procArray->replication_slot_catalog_xmin = catalog_xmin;

	if (!already_locked)
		LWLockRelease(ProcArrayLock);
}

/*
 * ProcArrayGetReplicationSlotXmin
 *
 * Return the current slot xmin limits. That's useful to be able to remove
 * data that's older than those limits.
 */
void
ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
								TransactionId *catalog_xmin)
{
	LWLockAcquire(ProcArrayLock, LW_SHARED);

	if (xmin != NULL)
		*xmin = procArray->replication_slot_xmin;

	if (catalog_xmin != NULL)
		*catalog_xmin = procArray->replication_slot_catalog_xmin;

R
Robert Haas 已提交
2712 2713 2714
	LWLockRelease(ProcArrayLock);
}

2715 2716 2717

#define XidCacheRemove(i) \
	do { \
2718 2719
		MyProc->subxids.xids[i] = MyProc->subxids.xids[MyPgXact->nxids - 1]; \
		MyPgXact->nxids--; \
2720 2721 2722 2723 2724 2725
	} while (0)

/*
 * XidCacheRemoveRunningXids
 *
 * Remove a bunch of TransactionIds from the list of known-running
B
Bruce Momjian 已提交
2726
 * subtransactions for my backend.  Both the specified xid and those in
2727
 * the xids[] array (of length nxids) are removed from the subxids cache.
2728
 * latestXid must be the latest XID among the group.
2729 2730
 */
void
2731 2732 2733
XidCacheRemoveRunningXids(TransactionId xid,
						  int nxids, const TransactionId *xids,
						  TransactionId latestXid)
2734 2735 2736 2737
{
	int			i,
				j;

2738
	Assert(TransactionIdIsValid(xid));
2739 2740 2741

	/*
	 * We must hold ProcArrayLock exclusively in order to remove transactions
2742 2743 2744 2745
	 * from the PGPROC array.  (See src/backend/access/transam/README.)  It's
	 * possible this could be relaxed since we know this routine is only used
	 * to abort subtransactions, but pending closer analysis we'd best be
	 * conservative.
2746 2747 2748 2749
	 */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

	/*
B
Bruce Momjian 已提交
2750 2751 2752
	 * Under normal circumstances xid and xids[] will be in increasing order,
	 * as will be the entries in subxids.  Scan backwards to avoid O(N^2)
	 * behavior when removing a lot of xids.
2753 2754 2755 2756 2757
	 */
	for (i = nxids - 1; i >= 0; i--)
	{
		TransactionId anxid = xids[i];

2758
		for (j = MyPgXact->nxids - 1; j >= 0; j--)
2759 2760 2761 2762 2763 2764 2765
		{
			if (TransactionIdEquals(MyProc->subxids.xids[j], anxid))
			{
				XidCacheRemove(j);
				break;
			}
		}
B
Bruce Momjian 已提交
2766

2767
		/*
B
Bruce Momjian 已提交
2768 2769 2770 2771 2772
		 * Ordinarily we should have found it, unless the cache has
		 * overflowed. However it's also possible for this routine to be
		 * invoked multiple times for the same subtransaction, in case of an
		 * error during AbortSubTransaction.  So instead of Assert, emit a
		 * debug warning.
2773
		 */
2774
		if (j < 0 && !MyPgXact->overflowed)
2775 2776 2777
			elog(WARNING, "did not find subXID %u in MyProc", anxid);
	}

2778
	for (j = MyPgXact->nxids - 1; j >= 0; j--)
2779 2780 2781 2782 2783 2784 2785 2786
	{
		if (TransactionIdEquals(MyProc->subxids.xids[j], xid))
		{
			XidCacheRemove(j);
			break;
		}
	}
	/* Ordinarily we should have found it, unless the cache has overflowed */
2787
	if (j < 0 && !MyPgXact->overflowed)
2788 2789
		elog(WARNING, "did not find subXID %u in MyProc", xid);

2790 2791 2792 2793 2794
	/* Also advance global latestCompletedXid while holding the lock */
	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
							  latestXid))
		ShmemVariableCache->latestCompletedXid = latestXid;

2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806
	LWLockRelease(ProcArrayLock);
}

#ifdef XIDCACHE_DEBUG

/*
 * Print stats about effectiveness of XID cache
 */
static void
DisplayXidCache(void)
{
	fprintf(stderr,
2807
			"XidCache: xmin: %ld, known: %ld, myxact: %ld, latest: %ld, mainxid: %ld, childxid: %ld, knownassigned: %ld, nooflo: %ld, slow: %ld\n",
2808
			xc_by_recent_xmin,
2809
			xc_by_known_xact,
2810
			xc_by_my_xact,
2811
			xc_by_latest_xid,
2812 2813
			xc_by_main_xid,
			xc_by_child_xid,
2814
			xc_by_known_assigned,
2815
			xc_no_overflow,
2816 2817 2818
			xc_slow_answer);
}
#endif   /* XIDCACHE_DEBUG */
2819

2820

2821
/* ----------------------------------------------
B
Bruce Momjian 已提交
2822
 *		KnownAssignedTransactions sub-module
2823 2824 2825 2826 2827
 * ----------------------------------------------
 */

/*
 * In Hot Standby mode, we maintain a list of transactions that are (or were)
2828 2829
 * running in the master at the current point in WAL.  These XIDs must be
 * treated as running by standby transactions, even though they are not in
2830
 * the standby server's PGXACT array.
2831
 *
B
Bruce Momjian 已提交
2832
 * We record all XIDs that we know have been assigned.  That includes all the
2833 2834 2835 2836 2837 2838 2839 2840
 * XIDs seen in WAL records, plus all unobserved XIDs that we can deduce have
 * been assigned.  We can deduce the existence of unobserved XIDs because we
 * know XIDs are assigned in sequence, with no gaps.  The KnownAssignedXids
 * list expands as new XIDs are observed or inferred, and contracts when
 * transaction completion records arrive.
 *
 * During hot standby we do not fret too much about the distinction between
 * top-level XIDs and subtransaction XIDs. We store both together in the
B
Bruce Momjian 已提交
2841
 * KnownAssignedXids list.  In backends, this is copied into snapshots in
2842 2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853 2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871
 * GetSnapshotData(), taking advantage of the fact that XidInMVCCSnapshot()
 * doesn't care about the distinction either.  Subtransaction XIDs are
 * effectively treated as top-level XIDs and in the typical case pg_subtrans
 * links are *not* maintained (which does not affect visibility).
 *
 * We have room in KnownAssignedXids and in snapshots to hold maxProcs *
 * (1 + PGPROC_MAX_CACHED_SUBXIDS) XIDs, so every master transaction must
 * report its subtransaction XIDs in a WAL XLOG_XACT_ASSIGNMENT record at
 * least every PGPROC_MAX_CACHED_SUBXIDS.  When we receive one of these
 * records, we mark the subXIDs as children of the top XID in pg_subtrans,
 * and then remove them from KnownAssignedXids.  This prevents overflow of
 * KnownAssignedXids and snapshots, at the cost that status checks for these
 * subXIDs will take a slower path through TransactionIdIsInProgress().
 * This means that KnownAssignedXids is not necessarily complete for subXIDs,
 * though it should be complete for top-level XIDs; this is the same situation
 * that holds with respect to the PGPROC entries in normal running.
 *
 * When we throw away subXIDs from KnownAssignedXids, we need to keep track of
 * that, similarly to tracking overflow of a PGPROC's subxids array.  We do
 * that by remembering the lastOverflowedXID, ie the last thrown-away subXID.
 * As long as that is within the range of interesting XIDs, we have to assume
 * that subXIDs are missing from snapshots.  (Note that subXID overflow occurs
 * on primary when 65th subXID arrives, whereas on standby it occurs when 64th
 * subXID arrives - that is not an error.)
 *
 * Should a backend on primary somehow disappear before it can write an abort
 * record, then we just leave those XIDs in KnownAssignedXids. They actually
 * aborted but we think they were running; the distinction is irrelevant
 * because either way any changes done by the transaction are not visible to
 * backends in the standby.  We prune KnownAssignedXids when
2872
 * XLOG_RUNNING_XACTS arrives, to forestall possible overflow of the
2873 2874 2875 2876 2877 2878 2879
 * array due to such dead XIDs.
 */

/*
 * RecordKnownAssignedTransactionIds
 *		Record the given XID in KnownAssignedXids, as well as any preceding
 *		unobserved XIDs.
2880 2881
 *
 * RecordKnownAssignedTransactionIds() should be run for *every* WAL record
2882 2883
 * associated with a transaction. Must be called for each record after we
 * have executed StartupCLOG() et al, since we must ExtendCLOG() etc..
2884
 *
2885
 * Called during recovery in analogy with and in place of GetNewTransactionId()
2886 2887 2888 2889
 */
void
RecordKnownAssignedTransactionIds(TransactionId xid)
{
2890
	Assert(standbyState >= STANDBY_INITIALIZED);
2891
	Assert(TransactionIdIsValid(xid));
2892
	Assert(TransactionIdIsValid(latestObservedXid));
2893

2894
	elog(trace_recovery(DEBUG4), "record known xact %u latestObservedXid %u",
B
Bruce Momjian 已提交
2895
		 xid, latestObservedXid);
2896 2897

	/*
B
Bruce Momjian 已提交
2898 2899 2900
	 * When a newly observed xid arrives, it is frequently the case that it is
	 * *not* the next xid in sequence. When this occurs, we must treat the
	 * intervening xids as running also.
2901 2902 2903
	 */
	if (TransactionIdFollows(xid, latestObservedXid))
	{
2904
		TransactionId next_expected_xid;
2905 2906

		/*
2907 2908 2909 2910 2911 2912 2913
		 * Extend subtrans like we do in GetNewTransactionId() during normal
		 * operation using individual extend steps. Note that we do not need
		 * to extend clog since its extensions are WAL logged.
		 *
		 * This part has to be done regardless of standbyState since we
		 * immediately start assigning subtransactions to their toplevel
		 * transactions.
2914
		 */
2915
		next_expected_xid = latestObservedXid;
2916
		while (TransactionIdPrecedes(next_expected_xid, xid))
2917
		{
2918
			TransactionIdAdvance(next_expected_xid);
2919
			ExtendSUBTRANS(next_expected_xid);
2920 2921
		}
		Assert(next_expected_xid == xid);
2922

2923 2924 2925 2926 2927 2928 2929 2930
		/*
		 * If the KnownAssignedXids machinery isn't up yet, there's nothing
		 * more to do since we don't track assigned xids yet.
		 */
		if (standbyState <= STANDBY_INITIALIZED)
		{
			latestObservedXid = xid;
			return;
2931 2932
		}

2933
		/*
2934
		 * Add (latestObservedXid, xid] onto the KnownAssignedXids array.
2935 2936 2937 2938
		 */
		next_expected_xid = latestObservedXid;
		TransactionIdAdvance(next_expected_xid);
		KnownAssignedXidsAdd(next_expected_xid, xid, false);
2939

2940 2941 2942
		/*
		 * Now we can advance latestObservedXid
		 */
2943 2944
		latestObservedXid = xid;

2945 2946 2947
		/* ShmemVariableCache->nextXid must be beyond any observed xid */
		next_expected_xid = latestObservedXid;
		TransactionIdAdvance(next_expected_xid);
2948
		LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
2949
		ShmemVariableCache->nextXid = next_expected_xid;
2950
		LWLockRelease(XidGenLock);
2951 2952 2953
	}
}

2954 2955 2956
/*
 * ExpireTreeKnownAssignedTransactionIds
 *		Remove the given XIDs from KnownAssignedXids.
2957 2958
 *
 * Called during recovery in analogy with and in place of ProcArrayEndTransaction()
2959
 */
2960 2961
void
ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
B
Bruce Momjian 已提交
2962
							   TransactionId *subxids, TransactionId max_xid)
2963
{
2964
	Assert(standbyState >= STANDBY_INITIALIZED);
2965 2966 2967 2968 2969 2970

	/*
	 * Uses same locking as transaction commit
	 */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

2971
	KnownAssignedXidsRemoveTree(xid, nsubxids, subxids);
2972

2973 2974 2975
	/* As in ProcArrayEndTransaction, advance latestCompletedXid */
	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
							  max_xid))
2976 2977 2978 2979 2980
		ShmemVariableCache->latestCompletedXid = max_xid;

	LWLockRelease(ProcArrayLock);
}

2981 2982 2983 2984
/*
 * ExpireAllKnownAssignedTransactionIds
 *		Remove all entries in KnownAssignedXids
 */
2985 2986 2987 2988
void
ExpireAllKnownAssignedTransactionIds(void)
{
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
2989
	KnownAssignedXidsRemovePreceding(InvalidTransactionId);
2990 2991 2992
	LWLockRelease(ProcArrayLock);
}

2993 2994 2995 2996
/*
 * ExpireOldKnownAssignedTransactionIds
 *		Remove KnownAssignedXids entries preceding the given XID
 */
2997 2998 2999 3000
void
ExpireOldKnownAssignedTransactionIds(TransactionId xid)
{
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3001
	KnownAssignedXidsRemovePreceding(xid);
3002 3003 3004
	LWLockRelease(ProcArrayLock);
}

3005

3006 3007 3008
/*
 * Private module functions to manipulate KnownAssignedXids
 *
3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055
 * There are 5 main uses of the KnownAssignedXids data structure:
 *
 *	* backends taking snapshots - all valid XIDs need to be copied out
 *	* backends seeking to determine presence of a specific XID
 *	* startup process adding new known-assigned XIDs
 *	* startup process removing specific XIDs as transactions end
 *	* startup process pruning array when special WAL records arrive
 *
 * This data structure is known to be a hot spot during Hot Standby, so we
 * go to some lengths to make these operations as efficient and as concurrent
 * as possible.
 *
 * The XIDs are stored in an array in sorted order --- TransactionIdPrecedes
 * order, to be exact --- to allow binary search for specific XIDs.  Note:
 * in general TransactionIdPrecedes would not provide a total order, but
 * we know that the entries present at any instant should not extend across
 * a large enough fraction of XID space to wrap around (the master would
 * shut down for fear of XID wrap long before that happens).  So it's OK to
 * use TransactionIdPrecedes as a binary-search comparator.
 *
 * It's cheap to maintain the sortedness during insertions, since new known
 * XIDs are always reported in XID order; we just append them at the right.
 *
 * To keep individual deletions cheap, we need to allow gaps in the array.
 * This is implemented by marking array elements as valid or invalid using
 * the parallel boolean array KnownAssignedXidsValid[].  A deletion is done
 * by setting KnownAssignedXidsValid[i] to false, *without* clearing the
 * XID entry itself.  This preserves the property that the XID entries are
 * sorted, so we can do binary searches easily.  Periodically we compress
 * out the unused entries; that's much cheaper than having to compress the
 * array immediately on every deletion.
 *
 * The actually valid items in KnownAssignedXids[] and KnownAssignedXidsValid[]
 * are those with indexes tail <= i < head; items outside this subscript range
 * have unspecified contents.  When head reaches the end of the array, we
 * force compression of unused entries rather than wrapping around, since
 * allowing wraparound would greatly complicate the search logic.  We maintain
 * an explicit tail pointer so that pruning of old XIDs can be done without
 * immediately moving the array contents.  In most cases only a small fraction
 * of the array contains valid entries at any instant.
 *
 * Although only the startup process can ever change the KnownAssignedXids
 * data structure, we still need interlocking so that standby backends will
 * not observe invalid intermediate states.  The convention is that backends
 * must hold shared ProcArrayLock to examine the array.  To remove XIDs from
 * the array, the startup process must hold ProcArrayLock exclusively, for
 * the usual transactional reasons (compare commit/abort of a transaction
B
Bruce Momjian 已提交
3056
 * during normal running).  Compressing unused entries out of the array
3057 3058 3059 3060 3061 3062
 * likewise requires exclusive lock.  To add XIDs to the array, we just insert
 * them into slots to the right of the head pointer and then advance the head
 * pointer.  This wouldn't require any lock at all, except that on machines
 * with weak memory ordering we need to be careful that other processors
 * see the array element changes before they see the head pointer change.
 * We handle this by using a spinlock to protect reads and writes of the
B
Bruce Momjian 已提交
3063
 * head/tail pointers.  (We could dispense with the spinlock if we were to
3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087
 * create suitable memory access barrier primitives and use those instead.)
 * The spinlock must be taken to read or write the head/tail pointers unless
 * the caller holds ProcArrayLock exclusively.
 *
 * Algorithmic analysis:
 *
 * If we have a maximum of M slots, with N XIDs currently spread across
 * S elements then we have N <= S <= M always.
 *
 *	* Adding a new XID is O(1) and needs little locking (unless compression
 *		must happen)
 *	* Compressing the array is O(S) and requires exclusive lock
 *	* Removing an XID is O(logS) and requires exclusive lock
 *	* Taking a snapshot is O(S) and requires shared lock
 *	* Checking for an XID is O(logS) and requires shared lock
 *
 * In comparison, using a hash table for KnownAssignedXids would mean that
 * taking snapshots would be O(M). If we can maintain S << M then the
 * sorted array technique will deliver significantly faster snapshots.
 * If we try to keep S too small then we will spend too much time compressing,
 * so there is an optimal point for any workload mix. We use a heuristic to
 * decide when to compress the array, though trimming also helps reduce
 * frequency of compressing. The heuristic requires us to track the number of
 * currently valid XIDs in the array.
3088 3089
 */

3090

3091
/*
3092 3093 3094 3095 3096
 * Compress KnownAssignedXids by shifting valid data down to the start of the
 * array, removing any gaps.
 *
 * A compression step is forced if "force" is true, otherwise we do it
 * only if a heuristic indicates it's a good time to do it.
3097
 *
3098
 * Caller must hold ProcArrayLock in exclusive mode.
3099 3100
 */
static void
3101
KnownAssignedXidsCompress(bool force)
3102
{
3103 3104
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
B
Bruce Momjian 已提交
3105 3106 3107 3108
	int			head,
				tail;
	int			compress_index;
	int			i;
3109

3110 3111 3112 3113 3114
	/* no spinlock required since we hold ProcArrayLock exclusively */
	head = pArray->headKnownAssignedXids;
	tail = pArray->tailKnownAssignedXids;

	if (!force)
3115
	{
3116
		/*
B
Bruce Momjian 已提交
3117 3118
		 * If we can choose how much to compress, use a heuristic to avoid
		 * compressing too often or not often enough.
3119
		 *
B
Bruce Momjian 已提交
3120 3121 3122 3123 3124
		 * Heuristic is if we have a large enough current spread and less than
		 * 50% of the elements are currently in use, then compress. This
		 * should ensure we compress fairly infrequently. We could compress
		 * less often though the virtual array would spread out more and
		 * snapshots would become more expensive.
3125
		 */
B
Bruce Momjian 已提交
3126
		int			nelements = head - tail;
3127

3128 3129 3130 3131
		if (nelements < 4 * PROCARRAY_MAXPROCS ||
			nelements < 2 * pArray->numKnownAssignedXids)
			return;
	}
3132

3133
	/*
B
Bruce Momjian 已提交
3134 3135
	 * We compress the array by reading the valid values from tail to head,
	 * re-aligning data to 0th element.
3136 3137 3138 3139 3140
	 */
	compress_index = 0;
	for (i = tail; i < head; i++)
	{
		if (KnownAssignedXidsValid[i])
3141
		{
3142 3143 3144
			KnownAssignedXids[compress_index] = KnownAssignedXids[i];
			KnownAssignedXidsValid[compress_index] = true;
			compress_index++;
3145
		}
3146
	}
3147

3148 3149 3150
	pArray->tailKnownAssignedXids = 0;
	pArray->headKnownAssignedXids = compress_index;
}
3151

3152 3153 3154 3155 3156 3157 3158 3159
/*
 * Add xids into KnownAssignedXids at the head of the array.
 *
 * xids from from_xid to to_xid, inclusive, are added to the array.
 *
 * If exclusive_lock is true then caller already holds ProcArrayLock in
 * exclusive mode, so we need no extra locking here.  Else caller holds no
 * lock, so we need to be sure we maintain sufficient interlocks against
B
Bruce Momjian 已提交
3160
 * concurrent readers.  (Only the startup process ever calls this, so no need
3161 3162 3163 3164 3165 3166 3167 3168
 * to worry about concurrent writers.)
 */
static void
KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
					 bool exclusive_lock)
{
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
B
Bruce Momjian 已提交
3169 3170 3171
	TransactionId next_xid;
	int			head,
				tail;
3172 3173 3174 3175 3176 3177
	int			nxids;
	int			i;

	Assert(TransactionIdPrecedesOrEquals(from_xid, to_xid));

	/*
B
Bruce Momjian 已提交
3178 3179 3180
	 * Calculate how many array slots we'll need.  Normally this is cheap; in
	 * the unusual case where the XIDs cross the wrap point, we do it the hard
	 * way.
3181 3182 3183 3184 3185 3186 3187 3188
	 */
	if (to_xid >= from_xid)
		nxids = to_xid - from_xid + 1;
	else
	{
		nxids = 1;
		next_xid = from_xid;
		while (TransactionIdPrecedes(next_xid, to_xid))
3189
		{
3190 3191 3192 3193 3194 3195
			nxids++;
			TransactionIdAdvance(next_xid);
		}
	}

	/*
B
Bruce Momjian 已提交
3196 3197
	 * Since only the startup process modifies the head/tail pointers, we
	 * don't need a lock to read them here.
3198 3199 3200 3201 3202 3203 3204 3205
	 */
	head = pArray->headKnownAssignedXids;
	tail = pArray->tailKnownAssignedXids;

	Assert(head >= 0 && head <= pArray->maxKnownAssignedXids);
	Assert(tail >= 0 && tail < pArray->maxKnownAssignedXids);

	/*
B
Bruce Momjian 已提交
3206
	 * Verify that insertions occur in TransactionId sequence.  Note that even
B
Bruce Momjian 已提交
3207 3208
	 * if the last existing element is marked invalid, it must still have a
	 * correctly sequenced XID value.
3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226 3227 3228 3229 3230 3231
	 */
	if (head > tail &&
		TransactionIdFollowsOrEquals(KnownAssignedXids[head - 1], from_xid))
	{
		KnownAssignedXidsDisplay(LOG);
		elog(ERROR, "out-of-order XID insertion in KnownAssignedXids");
	}

	/*
	 * If our xids won't fit in the remaining space, compress out free space
	 */
	if (head + nxids > pArray->maxKnownAssignedXids)
	{
		/* must hold lock to compress */
		if (!exclusive_lock)
			LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

		KnownAssignedXidsCompress(true);

		head = pArray->headKnownAssignedXids;
		/* note: we no longer care about the tail pointer */

		if (!exclusive_lock)
3232
			LWLockRelease(ProcArrayLock);
3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256 3257 3258 3259

		/*
		 * If it still won't fit then we're out of memory
		 */
		if (head + nxids > pArray->maxKnownAssignedXids)
			elog(ERROR, "too many KnownAssignedXids");
	}

	/* Now we can insert the xids into the space starting at head */
	next_xid = from_xid;
	for (i = 0; i < nxids; i++)
	{
		KnownAssignedXids[head] = next_xid;
		KnownAssignedXidsValid[head] = true;
		TransactionIdAdvance(next_xid);
		head++;
	}

	/* Adjust count of number of valid entries */
	pArray->numKnownAssignedXids += nxids;

	/*
	 * Now update the head pointer.  We use a spinlock to protect this
	 * pointer, not because the update is likely to be non-atomic, but to
	 * ensure that other processors see the above array updates before they
	 * see the head pointer change.
	 *
B
Bruce Momjian 已提交
3260 3261
	 * If we're holding ProcArrayLock exclusively, there's no need to take the
	 * spinlock.
3262 3263 3264 3265 3266 3267 3268 3269 3270 3271 3272 3273 3274 3275 3276 3277 3278 3279 3280 3281 3282 3283 3284 3285 3286
	 */
	if (exclusive_lock)
		pArray->headKnownAssignedXids = head;
	else
	{
		SpinLockAcquire(&pArray->known_assigned_xids_lck);
		pArray->headKnownAssignedXids = head;
		SpinLockRelease(&pArray->known_assigned_xids_lck);
	}
}

/*
 * KnownAssignedXidsSearch
 *
 * Searches KnownAssignedXids for a specific xid and optionally removes it.
 * Returns true if it was found, false if not.
 *
 * Caller must hold ProcArrayLock in shared or exclusive mode.
 * Exclusive lock must be held for remove = true.
 */
static bool
KnownAssignedXidsSearch(TransactionId xid, bool remove)
{
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
B
Bruce Momjian 已提交
3287 3288 3289 3290 3291
	int			first,
				last;
	int			head;
	int			tail;
	int			result_index = -1;
3292 3293 3294 3295 3296 3297 3298 3299 3300 3301 3302 3303 3304 3305 3306 3307 3308

	if (remove)
	{
		/* we hold ProcArrayLock exclusively, so no need for spinlock */
		tail = pArray->tailKnownAssignedXids;
		head = pArray->headKnownAssignedXids;
	}
	else
	{
		/* take spinlock to ensure we see up-to-date array contents */
		SpinLockAcquire(&pArray->known_assigned_xids_lck);
		tail = pArray->tailKnownAssignedXids;
		head = pArray->headKnownAssignedXids;
		SpinLockRelease(&pArray->known_assigned_xids_lck);
	}

	/*
B
Bruce Momjian 已提交
3309
	 * Standard binary search.  Note we can ignore the KnownAssignedXidsValid
3310 3311 3312 3313 3314 3315
	 * array here, since even invalid entries will contain sorted XIDs.
	 */
	first = tail;
	last = head - 1;
	while (first <= last)
	{
B
Bruce Momjian 已提交
3316 3317
		int			mid_index;
		TransactionId mid_xid;
3318 3319 3320 3321 3322 3323 3324 3325

		mid_index = (first + last) / 2;
		mid_xid = KnownAssignedXids[mid_index];

		if (xid == mid_xid)
		{
			result_index = mid_index;
			break;
3326
		}
3327 3328 3329 3330 3331 3332 3333 3334 3335 3336 3337 3338 3339 3340 3341
		else if (TransactionIdPrecedes(xid, mid_xid))
			last = mid_index - 1;
		else
			first = mid_index + 1;
	}

	if (result_index < 0)
		return false;			/* not in array */

	if (!KnownAssignedXidsValid[result_index])
		return false;			/* in array, but invalid */

	if (remove)
	{
		KnownAssignedXidsValid[result_index] = false;
3342

3343 3344 3345 3346 3347 3348 3349 3350
		pArray->numKnownAssignedXids--;
		Assert(pArray->numKnownAssignedXids >= 0);

		/*
		 * If we're removing the tail element then advance tail pointer over
		 * any invalid elements.  This will speed future searches.
		 */
		if (result_index == tail)
3351
		{
3352 3353 3354 3355 3356 3357 3358 3359 3360 3361 3362 3363 3364
			tail++;
			while (tail < head && !KnownAssignedXidsValid[tail])
				tail++;
			if (tail >= head)
			{
				/* Array is empty, so we can reset both pointers */
				pArray->headKnownAssignedXids = 0;
				pArray->tailKnownAssignedXids = 0;
			}
			else
			{
				pArray->tailKnownAssignedXids = tail;
			}
3365 3366
		}
	}
3367 3368

	return true;
3369 3370 3371
}

/*
3372
 * Is the specified XID present in KnownAssignedXids[]?
3373
 *
3374
 * Caller must hold ProcArrayLock in shared or exclusive mode.
3375 3376
 */
static bool
3377
KnownAssignedXidExists(TransactionId xid)
3378
{
3379
	Assert(TransactionIdIsValid(xid));
B
Bruce Momjian 已提交
3380

3381
	return KnownAssignedXidsSearch(xid, false);
3382 3383 3384
}

/*
3385
 * Remove the specified XID from KnownAssignedXids[].
3386
 *
3387
 * Caller must hold ProcArrayLock in exclusive mode.
3388 3389 3390 3391 3392 3393 3394 3395 3396
 */
static void
KnownAssignedXidsRemove(TransactionId xid)
{
	Assert(TransactionIdIsValid(xid));

	elog(trace_recovery(DEBUG4), "remove KnownAssignedXid %u", xid);

	/*
3397 3398
	 * Note: we cannot consider it an error to remove an XID that's not
	 * present.  We intentionally remove subxact IDs while processing
B
Bruce Momjian 已提交
3399 3400
	 * XLOG_XACT_ASSIGNMENT, to avoid array overflow.  Then those XIDs will be
	 * removed again when the top-level xact commits or aborts.
3401
	 *
B
Bruce Momjian 已提交
3402 3403 3404
	 * It might be possible to track such XIDs to distinguish this case from
	 * actual errors, but it would be complicated and probably not worth it.
	 * So, just ignore the search result.
3405
	 */
3406
	(void) KnownAssignedXidsSearch(xid, true);
3407 3408 3409
}

/*
3410 3411
 * KnownAssignedXidsRemoveTree
 *		Remove xid (if it's not InvalidTransactionId) and all the subxids.
3412
 *
3413
 * Caller must hold ProcArrayLock in exclusive mode.
3414
 */
3415 3416 3417
static void
KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
							TransactionId *subxids)
3418
{
B
Bruce Momjian 已提交
3419
	int			i;
3420

3421 3422 3423 3424 3425 3426 3427 3428
	if (TransactionIdIsValid(xid))
		KnownAssignedXidsRemove(xid);

	for (i = 0; i < nsubxids; i++)
		KnownAssignedXidsRemove(subxids[i]);

	/* Opportunistically compress the array */
	KnownAssignedXidsCompress(false);
3429 3430 3431
}

/*
3432 3433
 * Prune KnownAssignedXids up to, but *not* including xid. If xid is invalid
 * then clear the whole table.
3434
 *
3435
 * Caller must hold ProcArrayLock in exclusive mode.
3436
 */
3437 3438
static void
KnownAssignedXidsRemovePreceding(TransactionId removeXid)
3439
{
3440 3441
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
B
Bruce Momjian 已提交
3442 3443 3444 3445
	int			count = 0;
	int			head,
				tail,
				i;
3446

3447
	if (!TransactionIdIsValid(removeXid))
3448
	{
3449 3450 3451 3452 3453
		elog(trace_recovery(DEBUG4), "removing all KnownAssignedXids");
		pArray->numKnownAssignedXids = 0;
		pArray->headKnownAssignedXids = pArray->tailKnownAssignedXids = 0;
		return;
	}
3454

3455
	elog(trace_recovery(DEBUG4), "prune KnownAssignedXids to %u", removeXid);
3456

3457
	/*
B
Bruce Momjian 已提交
3458 3459
	 * Mark entries invalid starting at the tail.  Since array is sorted, we
	 * can stop as soon as we reach a entry >= removeXid.
3460 3461 3462 3463 3464 3465 3466 3467
	 */
	tail = pArray->tailKnownAssignedXids;
	head = pArray->headKnownAssignedXids;

	for (i = tail; i < head; i++)
	{
		if (KnownAssignedXidsValid[i])
		{
B
Bruce Momjian 已提交
3468
			TransactionId knownXid = KnownAssignedXids[i];
3469 3470 3471 3472 3473 3474 3475 3476 3477 3478

			if (TransactionIdFollowsOrEquals(knownXid, removeXid))
				break;

			if (!StandbyTransactionIdIsPrepared(knownXid))
			{
				KnownAssignedXidsValid[i] = false;
				count++;
			}
		}
3479 3480
	}

3481 3482 3483 3484 3485 3486 3487 3488 3489 3490 3491 3492 3493 3494 3495 3496 3497 3498 3499 3500 3501 3502 3503 3504
	pArray->numKnownAssignedXids -= count;
	Assert(pArray->numKnownAssignedXids >= 0);

	/*
	 * Advance the tail pointer if we've marked the tail item invalid.
	 */
	for (i = tail; i < head; i++)
	{
		if (KnownAssignedXidsValid[i])
			break;
	}
	if (i >= head)
	{
		/* Array is empty, so we can reset both pointers */
		pArray->headKnownAssignedXids = 0;
		pArray->tailKnownAssignedXids = 0;
	}
	else
	{
		pArray->tailKnownAssignedXids = i;
	}

	/* Opportunistically compress the array */
	KnownAssignedXidsCompress(false);
3505 3506 3507
}

/*
3508 3509 3510 3511 3512
 * KnownAssignedXidsGet - Get an array of xids by scanning KnownAssignedXids.
 * We filter out anything >= xmax.
 *
 * Returns the number of XIDs stored into xarray[].  Caller is responsible
 * that array is large enough.
3513
 *
3514
 * Caller must hold ProcArrayLock in (at least) shared mode.
3515
 */
3516 3517
static int
KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax)
3518
{
3519
	TransactionId xtmp = InvalidTransactionId;
3520

3521 3522
	return KnownAssignedXidsGetAndSetXmin(xarray, &xtmp, xmax);
}
3523

3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536
/*
 * KnownAssignedXidsGetAndSetXmin - as KnownAssignedXidsGet, plus
 * we reduce *xmin to the lowest xid value seen if not already lower.
 *
 * Caller must hold ProcArrayLock in (at least) shared mode.
 */
static int
KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
							   TransactionId xmax)
{
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
	int			count = 0;
B
Bruce Momjian 已提交
3537 3538
	int			head,
				tail;
3539 3540 3541
	int			i;

	/*
B
Bruce Momjian 已提交
3542 3543 3544 3545 3546
	 * Fetch head just once, since it may change while we loop. We can stop
	 * once we reach the initially seen head, since we are certain that an xid
	 * cannot enter and then leave the array while we hold ProcArrayLock.  We
	 * might miss newly-added xids, but they should be >= xmax so irrelevant
	 * anyway.
3547 3548 3549 3550
	 *
	 * Must take spinlock to ensure we see up-to-date array contents.
	 */
	SpinLockAcquire(&pArray->known_assigned_xids_lck);
3551 3552
	tail = pArray->tailKnownAssignedXids;
	head = pArray->headKnownAssignedXids;
3553 3554 3555 3556 3557 3558
	SpinLockRelease(&pArray->known_assigned_xids_lck);

	for (i = tail; i < head; i++)
	{
		/* Skip any gaps in the array */
		if (KnownAssignedXidsValid[i])
3559
		{
3560 3561 3562 3563 3564 3565 3566 3567 3568 3569 3570 3571 3572 3573
			TransactionId knownXid = KnownAssignedXids[i];

			/*
			 * Update xmin if required.  Only the first XID need be checked,
			 * since the array is sorted.
			 */
			if (count == 0 &&
				TransactionIdPrecedes(knownXid, *xmin))
				*xmin = knownXid;

			/*
			 * Filter out anything >= xmax, again relying on sorted property
			 * of array.
			 */
3574 3575
			if (TransactionIdIsValid(xmax) &&
				TransactionIdFollowsOrEquals(knownXid, xmax))
3576 3577 3578 3579
				break;

			/* Add knownXid into output array */
			xarray[count++] = knownXid;
3580 3581
		}
	}
3582 3583

	return count;
3584 3585
}

3586 3587 3588 3589 3590
/*
 * Get oldest XID in the KnownAssignedXids array, or InvalidTransactionId
 * if nothing there.
 */
static TransactionId
3591 3592 3593 3594 3595 3596 3597 3598 3599 3600 3601 3602 3603 3604 3605 3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616
KnownAssignedXidsGetOldestXmin(void)
{
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
	int			head,
				tail;
	int			i;

	/*
	 * Fetch head just once, since it may change while we loop.
	 */
	SpinLockAcquire(&pArray->known_assigned_xids_lck);
	tail = pArray->tailKnownAssignedXids;
	head = pArray->headKnownAssignedXids;
	SpinLockRelease(&pArray->known_assigned_xids_lck);

	for (i = tail; i < head; i++)
	{
		/* Skip any gaps in the array */
		if (KnownAssignedXidsValid[i])
			return KnownAssignedXids[i];
	}

	return InvalidTransactionId;
}

3617 3618 3619
/*
 * Display KnownAssignedXids to provide debug trail
 *
3620 3621 3622 3623 3624 3625
 * Currently this is only called within startup process, so we need no
 * special locking.
 *
 * Note this is pretty expensive, and much of the expense will be incurred
 * even if the elog message will get discarded.  It's not currently called
 * in any performance-critical places, however, so no need to be tenser.
3626
 */
T
Tom Lane 已提交
3627
static void
3628 3629
KnownAssignedXidsDisplay(int trace_level)
{
3630 3631
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
B
Bruce Momjian 已提交
3632 3633 3634 3635 3636
	StringInfoData buf;
	int			head,
				tail,
				i;
	int			nxids = 0;
3637

3638 3639
	tail = pArray->tailKnownAssignedXids;
	head = pArray->headKnownAssignedXids;
3640 3641 3642

	initStringInfo(&buf);

3643 3644 3645 3646 3647
	for (i = tail; i < head; i++)
	{
		if (KnownAssignedXidsValid[i])
		{
			nxids++;
3648
			appendStringInfo(&buf, "[%d]=%u ", i, KnownAssignedXids[i]);
3649 3650
		}
	}
3651

3652
	elog(trace_level, "%d KnownAssignedXids (num=%d tail=%d head=%d) %s",
3653 3654 3655 3656 3657
		 nxids,
		 pArray->numKnownAssignedXids,
		 pArray->tailKnownAssignedXids,
		 pArray->headKnownAssignedXids,
		 buf.data);
3658 3659 3660

	pfree(buf.data);
}
3661 3662 3663 3664 3665 3666 3667 3668 3669 3670 3671 3672 3673 3674 3675 3676 3677 3678 3679

/*
 * KnownAssignedXidsReset
 *		Resets KnownAssignedXids to be empty
 */
static void
KnownAssignedXidsReset(void)
{
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;

	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

	pArray->numKnownAssignedXids = 0;
	pArray->tailKnownAssignedXids = 0;
	pArray->headKnownAssignedXids = 0;

	LWLockRelease(ProcArrayLock);
}