procarray.c 100.8 KB
Newer Older
1 2 3 4 5 6
/*-------------------------------------------------------------------------
 *
 * procarray.c
 *	  POSTGRES process array code.
 *
 *
7
 * This module maintains arrays of the PGPROC and PGXACT structures for all
8 9 10 11
 * active backends.  Although there are several uses for this, the principal
 * one is as a means of determining the set of currently running transactions.
 *
 * Because of various subtle race conditions it is critical that a backend
12
 * hold the correct locks while setting or clearing its MyPgXact->xid field.
13
 * See notes in src/backend/access/transam/README.
14
 *
15 16 17 18
 * The process arrays now also include structures representing prepared
 * transactions.  The xid and subxids fields of these are valid, as are the
 * myProcLocks lists.  They can be distinguished from regular backend PGPROCs
 * at need by checking for pid == 0.
B
Bruce Momjian 已提交
19
 *
20 21
 * During hot standby, we also keep a list of XIDs representing transactions
 * that are known to be running in the master (or more precisely, were running
B
Bruce Momjian 已提交
22
 * as of the current point in the WAL stream).	This list is kept in the
23 24 25
 * KnownAssignedXids array, and is updated by watching the sequence of
 * arriving XIDs.  This is necessary because if we leave those XIDs out of
 * snapshots taken for standby queries, then they will appear to be already
B
Bruce Momjian 已提交
26
 * complete, leading to MVCC failures.	Note that in hot standby, the PGPROC
27 28 29 30 31 32 33
 * array represents standby processes, which by definition are not running
 * transactions that have XIDs.
 *
 * It is perhaps possible for a backend on the master to terminate without
 * writing an abort record for its transaction.  While that shouldn't really
 * happen, it would tie up KnownAssignedXids indefinitely, so we protect
 * ourselves by pruning the array when a valid list of running XIDs arrives.
34
 *
35
 * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
36 37 38 39
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
40
 *	  src/backend/storage/ipc/procarray.c
41 42 43 44 45
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

46 47
#include <signal.h>

48
#include "access/clog.h"
49
#include "access/subtrans.h"
50 51
#include "access/transam.h"
#include "access/xact.h"
52
#include "access/twophase.h"
53 54
#include "miscadmin.h"
#include "storage/procarray.h"
T
Tom Lane 已提交
55
#include "storage/spin.h"
56
#include "utils/builtins.h"
57
#include "utils/snapmgr.h"
58 59 60 61 62 63 64 65


/* Our shared memory area */
typedef struct ProcArrayStruct
{
	int			numProcs;		/* number of valid procs entries */
	int			maxProcs;		/* allocated size of procs array */

66 67 68 69 70 71 72
	/*
	 * Known assigned XIDs handling
	 */
	int			maxKnownAssignedXids;	/* allocated size of array */
	int			numKnownAssignedXids;	/* currrent # of valid entries */
	int			tailKnownAssignedXids;	/* index of oldest valid element */
	int			headKnownAssignedXids;	/* index of newest element, + 1 */
B
Bruce Momjian 已提交
73
	slock_t		known_assigned_xids_lck;		/* protects head/tail pointers */
B
Bruce Momjian 已提交
74

75
	/*
76 77
	 * Highest subxid that has been removed from KnownAssignedXids array to
	 * prevent overflow; or InvalidTransactionId if none.  We track this for
78
	 * similar reasons to tracking overflowing cached subxids in PGXACT
79 80
	 * entries.  Must hold exclusive ProcArrayLock to change this, and shared
	 * lock to read it.
81
	 */
B
Bruce Momjian 已提交
82
	TransactionId lastOverflowedXid;
83

84
	/*
85
	 * We declare pgprocnos[] as 1 entry because C wants a fixed-size array, but
B
Bruce Momjian 已提交
86
	 * actually it is maxProcs entries long.
87
	 */
88
	int			pgprocnos[1];		/* VARIABLE LENGTH ARRAY */
89 90 91 92
} ProcArrayStruct;

static ProcArrayStruct *procArray;

93 94 95
static PGPROC *allProcs;
static PGXACT *allPgXact;

96 97 98
/*
 * Bookkeeping for tracking emulated transactions in recovery
 */
99 100
static TransactionId *KnownAssignedXids;
static bool *KnownAssignedXidsValid;
B
Bruce Momjian 已提交
101
static TransactionId latestObservedXid = InvalidTransactionId;
102 103 104 105 106 107 108 109

/*
 * If we're in STANDBY_SNAPSHOT_PENDING state, standbySnapshotPendingXmin is
 * the highest xid that might still be running that we don't have in
 * KnownAssignedXids.
 */
static TransactionId standbySnapshotPendingXmin;

110 111 112 113
#ifdef XIDCACHE_DEBUG

/* counters for XidCache measurement */
static long xc_by_recent_xmin = 0;
114
static long xc_by_known_xact = 0;
115
static long xc_by_my_xact = 0;
116
static long xc_by_latest_xid = 0;
117 118
static long xc_by_main_xid = 0;
static long xc_by_child_xid = 0;
119
static long xc_by_known_assigned = 0;
120
static long xc_no_overflow = 0;
121 122 123
static long xc_slow_answer = 0;

#define xc_by_recent_xmin_inc()		(xc_by_recent_xmin++)
124
#define xc_by_known_xact_inc()		(xc_by_known_xact++)
125
#define xc_by_my_xact_inc()			(xc_by_my_xact++)
126
#define xc_by_latest_xid_inc()		(xc_by_latest_xid++)
127 128
#define xc_by_main_xid_inc()		(xc_by_main_xid++)
#define xc_by_child_xid_inc()		(xc_by_child_xid++)
129
#define xc_by_known_assigned_inc()	(xc_by_known_assigned++)
130
#define xc_no_overflow_inc()		(xc_no_overflow++)
131 132 133 134 135 136
#define xc_slow_answer_inc()		(xc_slow_answer++)

static void DisplayXidCache(void);
#else							/* !XIDCACHE_DEBUG */

#define xc_by_recent_xmin_inc()		((void) 0)
137
#define xc_by_known_xact_inc()		((void) 0)
138
#define xc_by_my_xact_inc()			((void) 0)
139
#define xc_by_latest_xid_inc()		((void) 0)
140 141
#define xc_by_main_xid_inc()		((void) 0)
#define xc_by_child_xid_inc()		((void) 0)
142
#define xc_by_known_assigned_inc()	((void) 0)
143
#define xc_no_overflow_inc()		((void) 0)
144 145 146
#define xc_slow_answer_inc()		((void) 0)
#endif   /* XIDCACHE_DEBUG */

147
/* Primitives for KnownAssignedXids array handling for standby */
148 149
static void KnownAssignedXidsCompress(bool force);
static void KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
B
Bruce Momjian 已提交
150
					 bool exclusive_lock);
151 152
static bool KnownAssignedXidsSearch(TransactionId xid, bool remove);
static bool KnownAssignedXidExists(TransactionId xid);
153
static void KnownAssignedXidsRemove(TransactionId xid);
154
static void KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
B
Bruce Momjian 已提交
155
							TransactionId *subxids);
156
static void KnownAssignedXidsRemovePreceding(TransactionId xid);
B
Bruce Momjian 已提交
157
static int	KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax);
158
static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray,
B
Bruce Momjian 已提交
159 160
							   TransactionId *xmin,
							   TransactionId xmax);
161
static TransactionId KnownAssignedXidsGetOldestXmin(void);
162
static void KnownAssignedXidsDisplay(int trace_level);
163 164 165 166

/*
 * Report shared-memory space needed by CreateSharedProcArray.
 */
167
Size
168
ProcArrayShmemSize(void)
169
{
170 171
	Size		size;

172 173
	/* Size of the ProcArray structure itself */
#define PROCARRAY_MAXPROCS	(MaxBackends + max_prepared_xacts)
174

175 176
	size = offsetof(ProcArrayStruct, pgprocnos);
	size = add_size(size, mul_size(sizeof(int), PROCARRAY_MAXPROCS));
177 178

	/*
179
	 * During Hot Standby processing we have a data structure called
180 181 182 183 184 185
	 * KnownAssignedXids, created in shared memory. Local data structures are
	 * also created in various backends during GetSnapshotData(),
	 * TransactionIdIsInProgress() and GetRunningTransactionData(). All of the
	 * main structures created in those functions must be identically sized,
	 * since we may at times copy the whole of the data structures around. We
	 * refer to this size as TOTAL_MAX_CACHED_SUBXIDS.
186
	 *
B
Bruce Momjian 已提交
187 188 189
	 * Ideally we'd only create this structure if we were actually doing hot
	 * standby in the current run, but we don't know that yet at the time
	 * shared memory is being set up.
190
	 */
191 192 193
#define TOTAL_MAX_CACHED_SUBXIDS \
	((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)

194
	if (EnableHotStandby)
195 196 197 198
	{
		size = add_size(size,
						mul_size(sizeof(TransactionId),
								 TOTAL_MAX_CACHED_SUBXIDS));
199
		size = add_size(size,
200 201
						mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS));
	}
202 203

	return size;
204 205 206 207 208 209
}

/*
 * Initialize the shared PGPROC array during postmaster startup.
 */
void
210
CreateSharedProcArray(void)
211 212 213 214 215
{
	bool		found;

	/* Create or attach to the ProcArray shared structure */
	procArray = (ProcArrayStruct *)
216
		ShmemInitStruct("Proc Array",
217 218
						add_size(offsetof(ProcArrayStruct, pgprocnos),
								 mul_size(sizeof(int),
219
										  PROCARRAY_MAXPROCS)),
220
						&found);
221 222 223 224 225 226 227

	if (!found)
	{
		/*
		 * We're the first - initialize.
		 */
		procArray->numProcs = 0;
228
		procArray->maxProcs = PROCARRAY_MAXPROCS;
229
		procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS;
230 231 232 233
		procArray->numKnownAssignedXids = 0;
		procArray->tailKnownAssignedXids = 0;
		procArray->headKnownAssignedXids = 0;
		SpinLockInit(&procArray->known_assigned_xids_lck);
234 235
		procArray->lastOverflowedXid = InvalidTransactionId;
	}
236

237 238 239
	allProcs = ProcGlobal->allProcs;
	allPgXact = ProcGlobal->allPgXact;

240
	/* Create or attach to the KnownAssignedXids arrays too, if needed */
241
	if (EnableHotStandby)
242
	{
243 244 245 246 247 248 249 250 251
		KnownAssignedXids = (TransactionId *)
			ShmemInitStruct("KnownAssignedXids",
							mul_size(sizeof(TransactionId),
									 TOTAL_MAX_CACHED_SUBXIDS),
							&found);
		KnownAssignedXidsValid = (bool *)
			ShmemInitStruct("KnownAssignedXidsValid",
							mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS),
							&found);
252 253 254 255
	}
}

/*
256
 * Add the specified PGPROC to the shared array.
257 258
 */
void
259
ProcArrayAdd(PGPROC *proc)
260 261
{
	ProcArrayStruct *arrayP = procArray;
262
	int			index;
263 264 265 266 267 268

	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

	if (arrayP->numProcs >= arrayP->maxProcs)
	{
		/*
B
Bruce Momjian 已提交
269 270 271
		 * Ooops, no room.	(This really shouldn't happen, since there is a
		 * fixed supply of PGPROC structs too, and so we should have failed
		 * earlier.)
272 273 274 275 276 277 278
		 */
		LWLockRelease(ProcArrayLock);
		ereport(FATAL,
				(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
				 errmsg("sorry, too many clients already")));
	}

279 280 281
	/*
	 * Keep the procs array sorted by (PGPROC *) so that we can utilize
	 * locality of references much better. This is useful while traversing the
R
Robert Haas 已提交
282
	 * ProcArray because there is a increased likelihood of finding the next
283 284
	 * PGPROC structure in the cache.
	 * 
R
Robert Haas 已提交
285
	 * Since the occurrence of adding/removing a proc is much lower than the
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
	 * access to the ProcArray itself, the overhead should be marginal
	 */
	for (index = 0; index < arrayP->numProcs; index++)
	{
		/*
		 * If we are the first PGPROC or if we have found our right position in
		 * the array, break
		 */
		if ((arrayP->pgprocnos[index] == -1) || (arrayP->pgprocnos[index] > proc->pgprocno))
			break;
	}

	memmove(&arrayP->pgprocnos[index + 1], &arrayP->pgprocnos[index],
			(arrayP->numProcs - index) * sizeof (int));
	arrayP->pgprocnos[index] = proc->pgprocno;
301 302 303 304 305 306
	arrayP->numProcs++;

	LWLockRelease(ProcArrayLock);
}

/*
307
 * Remove the specified PGPROC from the shared array.
308 309 310 311 312 313 314
 *
 * When latestXid is a valid XID, we are removing a live 2PC gxact from the
 * array, and thus causing it to appear as "not running" anymore.  In this
 * case we must advance latestCompletedXid.  (This is essentially the same
 * as ProcArrayEndTransaction followed by removal of the PGPROC, but we take
 * the ProcArrayLock only once, and don't damage the content of the PGPROC;
 * twophase.c depends on the latter.)
315 316
 */
void
317
ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
318 319 320 321 322
{
	ProcArrayStruct *arrayP = procArray;
	int			index;

#ifdef XIDCACHE_DEBUG
323 324 325
	/* dump stats at backend shutdown, but not prepared-xact end */
	if (proc->pid != 0)
		DisplayXidCache();
326 327 328 329
#endif

	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

330 331
	if (TransactionIdIsValid(latestXid))
	{
332
		Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
333 334 335 336 337 338 339 340 341

		/* Advance global latestCompletedXid while holding the lock */
		if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
								  latestXid))
			ShmemVariableCache->latestCompletedXid = latestXid;
	}
	else
	{
		/* Shouldn't be trying to remove a live transaction here */
342
		Assert(!TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
343 344
	}

345 346
	for (index = 0; index < arrayP->numProcs; index++)
	{
347
		if (arrayP->pgprocnos[index] == proc->pgprocno)
348
		{
349 350 351 352
			/* Keep the PGPROC array sorted. See notes above */
			memmove(&arrayP->pgprocnos[index], &arrayP->pgprocnos[index + 1],
					(arrayP->numProcs - index - 1) * sizeof (int));
			arrayP->pgprocnos[arrayP->numProcs - 1] = -1; /* for debugging */
353 354 355 356 357 358 359 360 361
			arrayP->numProcs--;
			LWLockRelease(ProcArrayLock);
			return;
		}
	}

	/* Ooops */
	LWLockRelease(ProcArrayLock);

362
	elog(LOG, "failed to find proc %p in ProcArray", proc);
363 364 365
}


366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381
/*
 * ProcArrayEndTransaction -- mark a transaction as no longer running
 *
 * This is used interchangeably for commit and abort cases.  The transaction
 * commit/abort must already be reported to WAL and pg_clog.
 *
 * proc is currently always MyProc, but we pass it explicitly for flexibility.
 * latestXid is the latest Xid among the transaction's main XID and
 * subtransactions, or InvalidTransactionId if it has no XID.  (We must ask
 * the caller to pass latestXid, instead of computing it from the PGPROC's
 * contents, because the subxid information in the PGPROC might be
 * incomplete.)
 */
void
ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
{
382 383
	PGXACT *pgxact = &allPgXact[proc->pgprocno];

384 385 386
	if (TransactionIdIsValid(latestXid))
	{
		/*
387 388 389
		 * We must lock ProcArrayLock while clearing our advertised XID, so
		 * that we do not exit the set of "running" transactions while someone
		 * else is taking a snapshot.  See discussion in
390 391
		 * src/backend/access/transam/README.
		 */
392
		Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
393 394 395

		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

396
		pgxact->xid = InvalidTransactionId;
397
		proc->lxid = InvalidLocalTransactionId;
398
		pgxact->xmin = InvalidTransactionId;
399
		/* must be cleared with xid/xmin: */
400 401
		pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
		pgxact->inCommit = false; /* be sure this is cleared in abort */
402
		proc->recoveryConflictPending = false;
403 404

		/* Clear the subtransaction-XID cache too while holding the lock */
405 406
		pgxact->nxids = 0;
		pgxact->overflowed = false;
407 408 409 410 411 412 413 414 415 416 417

		/* Also advance global latestCompletedXid while holding the lock */
		if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
								  latestXid))
			ShmemVariableCache->latestCompletedXid = latestXid;

		LWLockRelease(ProcArrayLock);
	}
	else
	{
		/*
B
Bruce Momjian 已提交
418 419 420
		 * If we have no XID, we don't need to lock, since we won't affect
		 * anyone else's calculation of a snapshot.  We might change their
		 * estimate of global xmin, but that's OK.
421
		 */
422
		Assert(!TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
423 424

		proc->lxid = InvalidLocalTransactionId;
425
		pgxact->xmin = InvalidTransactionId;
426
		/* must be cleared with xid/xmin: */
427 428
		pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
		pgxact->inCommit = false; /* be sure this is cleared in abort */
429
		proc->recoveryConflictPending = false;
430

431 432
		Assert(pgxact->nxids == 0);
		Assert(pgxact->overflowed == false);
433 434 435 436 437 438 439 440 441 442
	}
}


/*
 * ProcArrayClearTransaction -- clear the transaction fields
 *
 * This is used after successfully preparing a 2-phase transaction.  We are
 * not actually reporting the transaction's XID as no longer running --- it
 * will still appear as running because the 2PC's gxact is in the ProcArray
443
 * too.  We just have to clear out our own PGXACT.
444 445 446 447
 */
void
ProcArrayClearTransaction(PGPROC *proc)
{
448 449
	PGXACT *pgxact = &allPgXact[proc->pgprocno];

450 451
	/*
	 * We can skip locking ProcArrayLock here, because this action does not
B
Bruce Momjian 已提交
452 453
	 * actually change anyone's view of the set of running XIDs: our entry is
	 * duplicate with the gxact that has already been inserted into the
454 455
	 * ProcArray.
	 */
456
	pgxact->xid = InvalidTransactionId;
457
	proc->lxid = InvalidLocalTransactionId;
458
	pgxact->xmin = InvalidTransactionId;
459
	proc->recoveryConflictPending = false;
460 461

	/* redundant, but just in case */
462 463
	pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
	pgxact->inCommit = false;
464 465

	/* Clear the subtransaction-XID cache too */
466 467
	pgxact->nxids = 0;
	pgxact->overflowed = false;
468 469
}

470 471 472
/*
 * ProcArrayApplyRecoveryInfo -- apply recovery info about xids
 *
473
 * Takes us through 3 states: Initialized, Pending and Ready.
474 475 476 477
 * Normal case is to go all the way to Ready straight away, though there
 * are atypical cases where we need to take it in steps.
 *
 * Use the data about running transactions on master to create the initial
478
 * state of KnownAssignedXids. We also use these records to regularly prune
479
 * KnownAssignedXids because we know it is possible that some transactions
480
 * with FATAL errors fail to write abort records, which could cause eventual
481 482
 * overflow.
 *
483
 * See comments for LogStandbySnapshot().
484 485 486 487
 */
void
ProcArrayApplyRecoveryInfo(RunningTransactions running)
{
B
Bruce Momjian 已提交
488
	TransactionId *xids;
B
Bruce Momjian 已提交
489
	int			nxids;
490
	TransactionId nextXid;
B
Bruce Momjian 已提交
491
	int			i;
492 493

	Assert(standbyState >= STANDBY_INITIALIZED);
494 495 496
	Assert(TransactionIdIsValid(running->nextXid));
	Assert(TransactionIdIsValid(running->oldestRunningXid));
	Assert(TransactionIdIsNormal(running->latestCompletedXid));
497 498 499 500 501

	/*
	 * Remove stale transactions, if any.
	 */
	ExpireOldKnownAssignedTransactionIds(running->oldestRunningXid);
502
	StandbyReleaseOldLocks(running->xcnt, running->xids);
503 504 505 506 507 508 509 510

	/*
	 * If our snapshot is already valid, nothing else to do...
	 */
	if (standbyState == STANDBY_SNAPSHOT_READY)
		return;

	/*
511
	 * If our initial RunningTransactionsData had an overflowed snapshot then
512 513 514
	 * we knew we were missing some subxids from our snapshot. If we continue
	 * to see overflowed snapshots then we might never be able to start up,
	 * so we make another test to see if our snapshot is now valid. We know
515
	 * that the missing subxids are equal to or earlier than nextXid. After we
B
Bruce Momjian 已提交
516 517 518 519
	 * initialise we continue to apply changes during recovery, so once the
	 * oldestRunningXid is later than the nextXid from the initial snapshot we
	 * know that we no longer have missing information and can mark the
	 * snapshot as valid.
520 521 522
	 */
	if (standbyState == STANDBY_SNAPSHOT_PENDING)
	{
523 524 525 526 527
		/*
		 * If the snapshot isn't overflowed or if its empty we can
		 * reset our pending state and use this snapshot instead.
		 */
		if (!running->subxid_overflow || running->xcnt == 0)
528
		{
529
			standbyState = STANDBY_INITIALIZED;
530
		}
531
		else
532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547
		{
			if (TransactionIdPrecedes(standbySnapshotPendingXmin,
									  running->oldestRunningXid))
			{
				standbyState = STANDBY_SNAPSHOT_READY;
				elog(trace_recovery(DEBUG1),
					 "recovery snapshots are now enabled");
			}
			else
				elog(trace_recovery(DEBUG1),
					 "recovery snapshot waiting for non-overflowed snapshot or "
					 "until oldest active xid on standby is at least %u (now %u)",
					 standbySnapshotPendingXmin,
					 running->oldestRunningXid);
			return;
		}
548 549
	}

550 551
	Assert(standbyState == STANDBY_INITIALIZED);

552
	/*
553
	 * OK, we need to initialise from the RunningTransactionsData record
554 555
	 */

556 557 558 559
	/*
	 * Nobody else is running yet, but take locks anyhow
	 */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
560 561

	/*
562 563
	 * KnownAssignedXids is sorted so we cannot just add the xids, we have to
	 * sort them first.
564
	 *
B
Bruce Momjian 已提交
565 566 567 568 569 570
	 * Some of the new xids are top-level xids and some are subtransactions.
	 * We don't call SubtransSetParent because it doesn't matter yet. If we
	 * aren't overflowed then all xids will fit in snapshot and so we don't
	 * need subtrans. If we later overflow, an xid assignment record will add
	 * xids to subtrans. If RunningXacts is overflowed then we don't have
	 * enough information to correctly update subtrans anyway.
571
	 */
572
	Assert(procArray->numKnownAssignedXids == 0);
573 574

	/*
575 576
	 * Allocate a temporary array to avoid modifying the array passed as
	 * argument.
577
	 */
578
	xids = palloc(sizeof(TransactionId) * running->xcnt);
579 580

	/*
581
	 * Add to the temp array any xids which have not already completed.
582
	 */
583
	nxids = 0;
584
	for (i = 0; i < running->xcnt; i++)
585
	{
586
		TransactionId xid = running->xids[i];
587 588

		/*
589 590 591 592
		 * The running-xacts snapshot can contain xids that were still visible
		 * in the procarray when the snapshot was taken, but were already
		 * WAL-logged as completed. They're not running anymore, so ignore
		 * them.
593 594 595 596
		 */
		if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
			continue;

597
		xids[nxids++] = xid;
598 599
	}

600 601 602
	if (nxids > 0)
	{
		/*
B
Bruce Momjian 已提交
603 604
		 * Sort the array so that we can add them safely into
		 * KnownAssignedXids.
605 606 607 608 609 610 611
		 */
		qsort(xids, nxids, sizeof(TransactionId), xidComparator);

		/*
		 * Add the sorted snapshot into KnownAssignedXids
		 */
		for (i = 0; i < nxids; i++)
612
			KnownAssignedXidsAdd(xids[i], xids[i], true);
613 614 615 616 617

		KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
	}

	pfree(xids);
618 619

	/*
620 621
	 * Now we've got the running xids we need to set the global values that
	 * are used to track snapshots as they evolve further.
622
	 *
623 624
	 * - latestCompletedXid which will be the xmax for snapshots -
	 * lastOverflowedXid which shows whether snapshots overflow - nextXid
625 626 627
	 *
	 * If the snapshot overflowed, then we still initialise with what we know,
	 * but the recovery snapshot isn't fully valid yet because we know there
B
Bruce Momjian 已提交
628 629
	 * are some subxids missing. We don't know the specific subxids that are
	 * missing, so conservatively assume the last one is latestObservedXid.
630
	 */
631 632 633
	latestObservedXid = running->nextXid;
	TransactionIdRetreat(latestObservedXid);

634 635
	if (running->subxid_overflow)
	{
636 637 638
		standbyState = STANDBY_SNAPSHOT_PENDING;

		standbySnapshotPendingXmin = latestObservedXid;
639
		procArray->lastOverflowedXid = latestObservedXid;
640 641 642 643 644 645 646 647 648
	}
	else
	{
		standbyState = STANDBY_SNAPSHOT_READY;

		standbySnapshotPendingXmin = InvalidTransactionId;
	}

	/*
649
	 * If a transaction wrote a commit record in the gap between taking and
650 651
	 * logging the snapshot then latestCompletedXid may already be higher than
	 * the value from the snapshot, so check before we use the incoming value.
652 653 654 655
	 */
	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
							  running->latestCompletedXid))
		ShmemVariableCache->latestCompletedXid = running->latestCompletedXid;
656

657 658 659 660 661 662 663 664 665 666 667
	Assert(TransactionIdIsNormal(ShmemVariableCache->latestCompletedXid));

	LWLockRelease(ProcArrayLock);

	/*
	 * ShmemVariableCache->nextXid must be beyond any observed xid.
	 *
	 * We don't expect anyone else to modify nextXid, hence we don't need to
	 * hold a lock while examining it.  We still acquire the lock to modify
	 * it, though.
	 */
668 669 670
	nextXid = latestObservedXid;
	TransactionIdAdvance(nextXid);
	if (TransactionIdFollows(nextXid, ShmemVariableCache->nextXid))
671 672
	{
		LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
673
		ShmemVariableCache->nextXid = nextXid;
674 675
		LWLockRelease(XidGenLock);
	}
676

677 678
	Assert(TransactionIdIsValid(ShmemVariableCache->nextXid));

679
	KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
680
	if (standbyState == STANDBY_SNAPSHOT_READY)
681
		elog(trace_recovery(DEBUG1), "recovery snapshots are now enabled");
682
	else
683 684 685 686 687
		elog(trace_recovery(DEBUG1),
			 "recovery snapshot waiting for non-overflowed snapshot or "
			 "until oldest active xid on standby is at least %u (now %u)",
			 standbySnapshotPendingXmin,
			 running->oldestRunningXid);
688 689
}

690 691 692 693
/*
 * ProcArrayApplyXidAssignment
 *		Process an XLOG_XACT_ASSIGNMENT WAL record
 */
694 695 696 697 698
void
ProcArrayApplyXidAssignment(TransactionId topxid,
							int nsubxids, TransactionId *subxids)
{
	TransactionId max_xid;
B
Bruce Momjian 已提交
699
	int			i;
700

701
	Assert(standbyState >= STANDBY_INITIALIZED);
702 703 704 705 706 707 708 709 710 711 712 713 714 715

	max_xid = TransactionIdLatest(topxid, nsubxids, subxids);

	/*
	 * Mark all the subtransactions as observed.
	 *
	 * NOTE: This will fail if the subxid contains too many previously
	 * unobserved xids to fit into known-assigned-xids. That shouldn't happen
	 * as the code stands, because xid-assignment records should never contain
	 * more than PGPROC_MAX_CACHED_SUBXIDS entries.
	 */
	RecordKnownAssignedTransactionIds(max_xid);

	/*
B
Bruce Momjian 已提交
716 717 718 719 720 721 722 723 724
	 * Notice that we update pg_subtrans with the top-level xid, rather than
	 * the parent xid. This is a difference between normal processing and
	 * recovery, yet is still correct in all cases. The reason is that
	 * subtransaction commit is not marked in clog until commit processing, so
	 * all aborted subtransactions have already been clearly marked in clog.
	 * As a result we are able to refer directly to the top-level
	 * transaction's state rather than skipping through all the intermediate
	 * states in the subtransaction tree. This should be the first time we
	 * have attempted to SubTransSetParent().
725 726 727 728 729 730 731 732 733 734
	 */
	for (i = 0; i < nsubxids; i++)
		SubTransSetParent(subxids[i], topxid, false);

	/*
	 * Uses same locking as transaction commit
	 */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

	/*
735
	 * Remove subxids from known-assigned-xacts.
736
	 */
737
	KnownAssignedXidsRemoveTree(InvalidTransactionId, nsubxids, subxids);
738 739

	/*
740
	 * Advance lastOverflowedXid to be at least the last of these subxids.
741 742 743 744 745 746
	 */
	if (TransactionIdPrecedes(procArray->lastOverflowedXid, max_xid))
		procArray->lastOverflowedXid = max_xid;

	LWLockRelease(ProcArrayLock);
}
747

748 749 750
/*
 * TransactionIdIsInProgress -- is given transaction running in some backend
 *
751
 * Aside from some shortcuts such as checking RecentXmin and our own Xid,
752
 * there are four possibilities for finding a running transaction:
753
 *
754
 * 1. The given Xid is a main transaction Id.  We will find this out cheaply
755
 * by looking at the PGXACT struct for each backend.
756
 *
757
 * 2. The given Xid is one of the cached subxact Xids in the PGPROC array.
758 759
 * We can find this out cheaply too.
 *
760 761 762
 * 3. In Hot Standby mode, we must search the KnownAssignedXids list to see
 * if the Xid is running on the master.
 *
763 764 765
 * 4. Search the SubTrans tree to find the Xid's topmost parent, and then see
 * if that is running according to PGXACT or KnownAssignedXids.  This is the
 * slowest way, but sadly it has to be done always if the others failed,
766 767
 * unless we see that the cached subxact sets are complete (none have
 * overflowed).
768
 *
769 770 771
 * ProcArrayLock has to be held while we do 1, 2, 3.  If we save the top Xids
 * while doing 1 and 3, we can release the ProcArrayLock while we do 4.
 * This buys back some concurrency (and we can't retrieve the main Xids from
772
 * PGXACT again anyway; see GetNewTransactionId).
773 774 775 776
 */
bool
TransactionIdIsInProgress(TransactionId xid)
{
777 778
	static TransactionId *xids = NULL;
	int			nxids = 0;
779
	ProcArrayStruct *arrayP = procArray;
780
	TransactionId topxid;
781 782 783 784
	int			i,
				j;

	/*
B
Bruce Momjian 已提交
785
	 * Don't bother checking a transaction older than RecentXmin; it could not
B
Bruce Momjian 已提交
786 787 788
	 * possibly still be running.  (Note: in particular, this guarantees that
	 * we reject InvalidTransactionId, FrozenTransactionId, etc as not
	 * running.)
789 790 791 792 793 794 795
	 */
	if (TransactionIdPrecedes(xid, RecentXmin))
	{
		xc_by_recent_xmin_inc();
		return false;
	}

796 797 798 799 800 801 802 803 804 805 806
	/*
	 * We may have just checked the status of this transaction, so if it is
	 * already known to be completed, we can fall out without any access to
	 * shared memory.
	 */
	if (TransactionIdIsKnownCompleted(xid))
	{
		xc_by_known_xact_inc();
		return false;
	}

807 808 809 810 811 812 813 814 815 816 817
	/*
	 * Also, we can handle our own transaction (and subtransactions) without
	 * any access to shared memory.
	 */
	if (TransactionIdIsCurrentTransactionId(xid))
	{
		xc_by_my_xact_inc();
		return true;
	}

	/*
818
	 * If first time through, get workspace to remember main XIDs in. We
B
Bruce Momjian 已提交
819
	 * malloc it permanently to avoid repeated palloc/pfree overhead.
820 821 822
	 */
	if (xids == NULL)
	{
823
		/*
B
Bruce Momjian 已提交
824 825 826
		 * In hot standby mode, reserve enough space to hold all xids in the
		 * known-assigned list. If we later finish recovery, we no longer need
		 * the bigger array, but we don't bother to shrink it.
827
		 */
828
		int			maxxids = RecoveryInProgress() ? TOTAL_MAX_CACHED_SUBXIDS : arrayP->maxProcs;
829 830

		xids = (TransactionId *) malloc(maxxids * sizeof(TransactionId));
831 832 833 834 835
		if (xids == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
	}
836 837 838

	LWLockAcquire(ProcArrayLock, LW_SHARED);

839 840 841 842 843 844 845 846 847 848 849 850
	/*
	 * Now that we have the lock, we can check latestCompletedXid; if the
	 * target Xid is after that, it's surely still running.
	 */
	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, xid))
	{
		LWLockRelease(ProcArrayLock);
		xc_by_latest_xid_inc();
		return true;
	}

	/* No shortcuts, gotta grovel through the array */
851 852
	for (i = 0; i < arrayP->numProcs; i++)
	{
853 854 855 856
		int		pgprocno = arrayP->pgprocnos[i];
		volatile PGPROC    *proc = &allProcs[pgprocno];
		volatile PGXACT    *pgxact = &allPgXact[pgprocno];
		TransactionId	pxid;
857 858 859 860

		/* Ignore my own proc --- dealt with it above */
		if (proc == MyProc)
			continue;
861 862

		/* Fetch xid just once - see GetNewTransactionId */
863
		pxid = pgxact->xid;
864 865 866 867 868 869 870 871 872

		if (!TransactionIdIsValid(pxid))
			continue;

		/*
		 * Step 1: check the main Xid
		 */
		if (TransactionIdEquals(pxid, xid))
		{
873
			LWLockRelease(ProcArrayLock);
874
			xc_by_main_xid_inc();
875
			return true;
876 877 878
		}

		/*
B
Bruce Momjian 已提交
879 880
		 * We can ignore main Xids that are younger than the target Xid, since
		 * the target could not possibly be their child.
881 882 883 884 885 886 887
		 */
		if (TransactionIdPrecedes(xid, pxid))
			continue;

		/*
		 * Step 2: check the cached child-Xids arrays
		 */
888
		for (j = pgxact->nxids - 1; j >= 0; j--)
889 890 891 892 893 894
		{
			/* Fetch xid just once - see GetNewTransactionId */
			TransactionId cxid = proc->subxids.xids[j];

			if (TransactionIdEquals(cxid, xid))
			{
895
				LWLockRelease(ProcArrayLock);
896
				xc_by_child_xid_inc();
897
				return true;
898 899 900 901
			}
		}

		/*
902
		 * Save the main Xid for step 4.  We only need to remember main Xids
B
Bruce Momjian 已提交
903 904 905 906
		 * that have uncached children.  (Note: there is no race condition
		 * here because the overflowed flag cannot be cleared, only set, while
		 * we hold ProcArrayLock.  So we can't miss an Xid that we need to
		 * worry about.)
907
		 */
908
		if (pgxact->overflowed)
909 910 911
			xids[nxids++] = pxid;
	}

912 913 914 915
	/*
	 * Step 3: in hot standby mode, check the known-assigned-xids list.  XIDs
	 * in the list must be treated as running.
	 */
916 917
	if (RecoveryInProgress())
	{
918
		/* none of the PGXACT entries should have XIDs in hot standby mode */
919 920
		Assert(nxids == 0);

921
		if (KnownAssignedXidExists(xid))
922 923
		{
			LWLockRelease(ProcArrayLock);
924
			xc_by_known_assigned_inc();
925 926 927 928
			return true;
		}

		/*
B
Bruce Momjian 已提交
929
		 * If the KnownAssignedXids overflowed, we have to check pg_subtrans
B
Bruce Momjian 已提交
930 931 932 933
		 * too.  Fetch all xids from KnownAssignedXids that are lower than
		 * xid, since if xid is a subtransaction its parent will always have a
		 * lower value.  Note we will collect both main and subXIDs here, but
		 * there's no help for it.
934 935 936 937 938
		 */
		if (TransactionIdPrecedesOrEquals(xid, procArray->lastOverflowedXid))
			nxids = KnownAssignedXidsGet(xids, xid);
	}

939 940 941 942
	LWLockRelease(ProcArrayLock);

	/*
	 * If none of the relevant caches overflowed, we know the Xid is not
943
	 * running without even looking at pg_subtrans.
944 945
	 */
	if (nxids == 0)
946 947
	{
		xc_no_overflow_inc();
948
		return false;
949
	}
950 951

	/*
952
	 * Step 4: have to check pg_subtrans.
953
	 *
954 955 956 957
	 * At this point, we know it's either a subtransaction of one of the Xids
	 * in xids[], or it's not running.  If it's an already-failed
	 * subtransaction, we want to say "not running" even though its parent may
	 * still be running.  So first, check pg_clog to see if it's been aborted.
958 959 960 961
	 */
	xc_slow_answer_inc();

	if (TransactionIdDidAbort(xid))
962
		return false;
963 964

	/*
B
Bruce Momjian 已提交
965
	 * It isn't aborted, so check whether the transaction tree it belongs to
B
Bruce Momjian 已提交
966 967
	 * is still running (or, more precisely, whether it was running when we
	 * held ProcArrayLock).
968 969 970 971 972 973 974 975
	 */
	topxid = SubTransGetTopmostTransaction(xid);
	Assert(TransactionIdIsValid(topxid));
	if (!TransactionIdEquals(topxid, xid))
	{
		for (i = 0; i < nxids; i++)
		{
			if (TransactionIdEquals(xids[i], topxid))
976
				return true;
977 978 979
		}
	}

980
	return false;
981 982
}

983 984 985 986
/*
 * TransactionIdIsActive -- is xid the top-level XID of an active backend?
 *
 * This differs from TransactionIdIsInProgress in that it ignores prepared
987 988
 * transactions, as well as transactions running on the master if we're in
 * hot standby.  Also, we ignore subtransactions since that's not needed
989 990 991 992 993 994 995 996 997 998
 * for current uses.
 */
bool
TransactionIdIsActive(TransactionId xid)
{
	bool		result = false;
	ProcArrayStruct *arrayP = procArray;
	int			i;

	/*
B
Bruce Momjian 已提交
999 1000
	 * Don't bother checking a transaction older than RecentXmin; it could not
	 * possibly still be running.
1001 1002 1003 1004 1005 1006 1007 1008
	 */
	if (TransactionIdPrecedes(xid, RecentXmin))
		return false;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (i = 0; i < arrayP->numProcs; i++)
	{
1009 1010 1011 1012
		int		pgprocno = arrayP->pgprocnos[i];
		volatile PGPROC    *proc = &allProcs[pgprocno];
		volatile PGXACT    *pgxact = &allPgXact[pgprocno];
		TransactionId	pxid;
1013 1014

		/* Fetch xid just once - see GetNewTransactionId */
1015
		pxid = pgxact->xid;
1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035

		if (!TransactionIdIsValid(pxid))
			continue;

		if (proc->pid == 0)
			continue;			/* ignore prepared transactions */

		if (TransactionIdEquals(pxid, xid))
		{
			result = true;
			break;
		}
	}

	LWLockRelease(ProcArrayLock);

	return result;
}


1036 1037 1038 1039 1040 1041 1042
/*
 * GetOldestXmin -- returns oldest transaction that was running
 *					when any current transaction was started.
 *
 * If allDbs is TRUE then all backends are considered; if allDbs is FALSE
 * then only backends running in my own database are considered.
 *
1043 1044
 * If ignoreVacuum is TRUE then backends with the PROC_IN_VACUUM flag set are
 * ignored.
1045
 *
1046 1047 1048
 * This is used by VACUUM to decide which deleted tuples must be preserved
 * in a table.	allDbs = TRUE is needed for shared relations, but allDbs =
 * FALSE is sufficient for non-shared relations, since only backends in my
B
Bruce Momjian 已提交
1049
 * own database could ever see the tuples in them.	Also, we can ignore
1050 1051
 * concurrently running lazy VACUUMs because (a) they must be working on other
 * tables, and (b) they don't need to do snapshot-based lookups.
1052 1053
 *
 * This is also used to determine where to truncate pg_subtrans.  allDbs
1054
 * must be TRUE for that case, and ignoreVacuum FALSE.
1055
 *
1056
 * Note: we include all currently running xids in the set of considered xids.
1057 1058
 * This ensures that if a just-started xact has not yet set its snapshot,
 * when it does set the snapshot it cannot set xmin less than what we compute.
1059
 * See notes in src/backend/access/transam/README.
1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081
 *
 * Note: despite the above, it's possible for the calculated value to move
 * backwards on repeated calls. The calculated value is conservative, so that
 * anything older is definitely not considered as running by anyone anymore,
 * but the exact value calculated depends on a number of things. For example,
 * if allDbs is FALSE and there are no transactions running in the current
 * database, GetOldestXmin() returns latestCompletedXid. If a transaction
 * begins after that, its xmin will include in-progress transactions in other
 * databases that started earlier, so another call will return a lower value.
 * Nonetheless it is safe to vacuum a table in the current database with the
 * first result.  There are also replication-related effects: a walsender
 * process can set its xmin based on transactions that are no longer running
 * in the master but are still being replayed on the standby, thus possibly
 * making the GetOldestXmin reading go backwards.  In this case there is a
 * possibility that we lose data that the standby would like to have, but
 * there is little we can do about that --- data is only protected if the
 * walsender runs continuously while queries are executed on the standby.
 * (The Hot Standby code deals with such cases by failing standby queries
 * that needed to access already-removed data, so there's no integrity bug.)
 * The return value is also adjusted with vacuum_defer_cleanup_age, so
 * increasing that setting on the fly is another easy way to make
 * GetOldestXmin() move backwards, with no consequences for data integrity.
1082 1083
 */
TransactionId
1084
GetOldestXmin(bool allDbs, bool ignoreVacuum)
1085 1086 1087 1088 1089
{
	ProcArrayStruct *arrayP = procArray;
	TransactionId result;
	int			index;

1090 1091 1092
	/* Cannot look for individual databases during recovery */
	Assert(allDbs || !RecoveryInProgress());

1093 1094
	LWLockAcquire(ProcArrayLock, LW_SHARED);

1095
	/*
B
Bruce Momjian 已提交
1096 1097 1098 1099
	 * We initialize the MIN() calculation with latestCompletedXid + 1. This
	 * is a lower bound for the XIDs that might appear in the ProcArray later,
	 * and so protects us against overestimating the result due to future
	 * additions.
1100
	 */
1101 1102 1103
	result = ShmemVariableCache->latestCompletedXid;
	Assert(TransactionIdIsNormal(result));
	TransactionIdAdvance(result);
1104 1105 1106

	for (index = 0; index < arrayP->numProcs; index++)
	{
1107 1108 1109
		int		pgprocno = arrayP->pgprocnos[index];
		volatile PGPROC    *proc = &allProcs[pgprocno];
		volatile PGXACT    *pgxact = &allPgXact[pgprocno];
1110

1111
		if (ignoreVacuum && (pgxact->vacuumFlags & PROC_IN_VACUUM))
1112 1113
			continue;

1114 1115
		if (allDbs ||
			proc->databaseId == MyDatabaseId ||
1116
			proc->databaseId == 0)		/* always include WalSender */
1117 1118
		{
			/* Fetch xid just once - see GetNewTransactionId */
1119
			TransactionId xid = pgxact->xid;
1120

1121 1122 1123 1124 1125 1126 1127 1128 1129
			/* First consider the transaction's own Xid, if any */
			if (TransactionIdIsNormal(xid) &&
				TransactionIdPrecedes(xid, result))
				result = xid;

			/*
			 * Also consider the transaction's Xmin, if set.
			 *
			 * We must check both Xid and Xmin because a transaction might
B
Bruce Momjian 已提交
1130 1131
			 * have an Xmin but not (yet) an Xid; conversely, if it has an
			 * Xid, that could determine some not-yet-set Xmin.
1132
			 */
1133
			xid = pgxact->xmin;	/* Fetch just once */
1134 1135 1136
			if (TransactionIdIsNormal(xid) &&
				TransactionIdPrecedes(xid, result))
				result = xid;
1137 1138 1139
		}
	}

1140 1141 1142
	if (RecoveryInProgress())
	{
		/*
1143 1144
		 * Check to see whether KnownAssignedXids contains an xid value older
		 * than the main procarray.
1145 1146
		 */
		TransactionId kaxmin = KnownAssignedXidsGetOldestXmin();
1147

1148 1149
		LWLockRelease(ProcArrayLock);

1150 1151
		if (TransactionIdIsNormal(kaxmin) &&
			TransactionIdPrecedes(kaxmin, result))
1152
			result = kaxmin;
1153
	}
1154 1155 1156 1157 1158 1159
	else
	{
		/*
		 * No other information needed, so release the lock immediately.
		 */
		LWLockRelease(ProcArrayLock);
1160

1161
		/*
1162 1163
		 * Compute the cutoff XID by subtracting vacuum_defer_cleanup_age,
		 * being careful not to generate a "permanent" XID.
1164 1165 1166 1167
		 *
		 * vacuum_defer_cleanup_age provides some additional "slop" for the
		 * benefit of hot standby queries on slave servers.  This is quick and
		 * dirty, and perhaps not all that useful unless the master has a
1168 1169 1170 1171 1172 1173
		 * predictable transaction rate, but it offers some protection when
		 * there's no walsender connection.  Note that we are assuming
		 * vacuum_defer_cleanup_age isn't large enough to cause wraparound ---
		 * so guc.c should limit it to no more than the xidStopLimit threshold
		 * in varsup.c.  Also note that we intentionally don't apply
		 * vacuum_defer_cleanup_age on standby servers.
1174 1175 1176 1177 1178
		 */
		result -= vacuum_defer_cleanup_age;
		if (!TransactionIdIsNormal(result))
			result = FirstNormalTransactionId;
	}
1179

1180 1181 1182
	return result;
}

1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204
/*
 * GetMaxSnapshotXidCount -- get max size for snapshot XID array
 *
 * We have to export this for use by snapmgr.c.
 */
int
GetMaxSnapshotXidCount(void)
{
	return procArray->maxProcs;
}

/*
 * GetMaxSnapshotSubxidCount -- get max size for snapshot sub-XID array
 *
 * We have to export this for use by snapmgr.c.
 */
int
GetMaxSnapshotSubxidCount(void)
{
	return TOTAL_MAX_CACHED_SUBXIDS;
}

1205
/*
1206 1207 1208
 * GetSnapshotData -- returns information about running transactions.
 *
 * The returned snapshot includes xmin (lowest still-running xact ID),
1209
 * xmax (highest completed xact ID + 1), and a list of running xact IDs
1210 1211 1212 1213 1214 1215 1216 1217
 * in the range xmin <= xid < xmax.  It is used as follows:
 *		All xact IDs < xmin are considered finished.
 *		All xact IDs >= xmax are considered still running.
 *		For an xact ID xmin <= xid < xmax, consult list to see whether
 *		it is considered running or not.
 * This ensures that the set of transactions seen as "running" by the
 * current xact will not change after it takes the snapshot.
 *
1218 1219 1220 1221 1222
 * All running top-level XIDs are included in the snapshot, except for lazy
 * VACUUM processes.  We also try to include running subtransaction XIDs,
 * but since PGPROC has only a limited cache area for subxact XIDs, full
 * information may not be available.  If we find any overflowed subxid arrays,
 * we have to mark the snapshot's subxid data as overflowed, and extra work
1223
 * *may* need to be done to determine what's running (see XidInMVCCSnapshot()
1224
 * in tqual.c).
1225 1226 1227
 *
 * We also update the following backend-global variables:
 *		TransactionXmin: the oldest xmin of any snapshot in use in the
1228
 *			current transaction (this is the same as MyPgXact->xmin).
1229 1230 1231
 *		RecentXmin: the xmin computed for the most recent snapshot.  XIDs
 *			older than this are known not running any more.
 *		RecentGlobalXmin: the global xmin (oldest TransactionXmin across all
1232
 *			running transactions, except those running LAZY VACUUM).  This is
T
Tom Lane 已提交
1233
 *			the same computation done by GetOldestXmin(true, true).
1234 1235 1236
 *
 * Note: this function should probably not be called with an argument that's
 * not statically allocated (see xip allocation below).
1237 1238
 */
Snapshot
1239
GetSnapshotData(Snapshot snapshot)
1240 1241 1242 1243 1244 1245 1246
{
	ProcArrayStruct *arrayP = procArray;
	TransactionId xmin;
	TransactionId xmax;
	TransactionId globalxmin;
	int			index;
	int			count = 0;
1247
	int			subcount = 0;
1248
	bool		suboverflowed = false;
1249 1250 1251 1252

	Assert(snapshot != NULL);

	/*
B
Bruce Momjian 已提交
1253 1254
	 * Allocating space for maxProcs xids is usually overkill; numProcs would
	 * be sufficient.  But it seems better to do the malloc while not holding
1255 1256
	 * the lock, so we can't look at numProcs.  Likewise, we allocate much
	 * more subxip storage than is probably needed.
1257 1258
	 *
	 * This does open a possibility for avoiding repeated malloc/free: since
B
Bruce Momjian 已提交
1259
	 * maxProcs does not change at runtime, we can simply reuse the previous
1260
	 * xip arrays if any.  (This relies on the fact that all callers pass
B
Bruce Momjian 已提交
1261
	 * static SnapshotData structs.)
1262 1263 1264 1265
	 */
	if (snapshot->xip == NULL)
	{
		/*
B
Bruce Momjian 已提交
1266 1267
		 * First call for this snapshot. Snapshot is same size whether or not
		 * we are in recovery, see later comments.
1268 1269
		 */
		snapshot->xip = (TransactionId *)
1270
			malloc(GetMaxSnapshotXidCount() * sizeof(TransactionId));
1271 1272 1273 1274
		if (snapshot->xip == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
1275 1276
		Assert(snapshot->subxip == NULL);
		snapshot->subxip = (TransactionId *)
1277
			malloc(GetMaxSnapshotSubxidCount() * sizeof(TransactionId));
1278 1279 1280 1281
		if (snapshot->subxip == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
1282 1283 1284
	}

	/*
B
Bruce Momjian 已提交
1285
	 * It is sufficient to get shared lock on ProcArrayLock, even if we are
1286
	 * going to set MyPgXact->xmin.
1287
	 */
1288
	LWLockAcquire(ProcArrayLock, LW_SHARED);
1289

1290 1291 1292 1293
	/* xmax is always latestCompletedXid + 1 */
	xmax = ShmemVariableCache->latestCompletedXid;
	Assert(TransactionIdIsNormal(xmax));
	TransactionIdAdvance(xmax);
1294

1295 1296 1297
	/* initialize xmin calculation with xmax */
	globalxmin = xmin = xmax;

B
Bruce Momjian 已提交
1298
	/*
1299
	 * If we're in recovery then snapshot data comes from a different place,
B
Bruce Momjian 已提交
1300 1301
	 * so decide which route we take before grab the lock. It is possible for
	 * recovery to end before we finish taking snapshot, and for newly
1302 1303 1304
	 * assigned transaction ids to be added to the procarray. Xmax cannot
	 * change while we hold ProcArrayLock, so those newly added transaction
	 * ids would be filtered away, so we need not be concerned about them.
B
Bruce Momjian 已提交
1305
	 */
1306 1307
	snapshot->takenDuringRecovery = RecoveryInProgress();

1308
	if (!snapshot->takenDuringRecovery)
1309
	{
1310 1311 1312
		int *pgprocnos = arrayP->pgprocnos;
		int			numProcs;

1313
		/*
B
Bruce Momjian 已提交
1314 1315
		 * Spin over procArray checking xid, xmin, and subxids.  The goal is
		 * to gather all active xids, find the lowest xmin, and try to record
1316
		 * subxids.
1317
		 */
1318 1319
		numProcs = arrayP->numProcs;
		for (index = 0; index < numProcs; index++)
1320
		{
1321 1322 1323
			int		pgprocno = pgprocnos[index];
			volatile PGXACT    *pgxact = &allPgXact[pgprocno];
			TransactionId	xid;
1324 1325

			/* Ignore procs running LAZY VACUUM */
1326
			if (pgxact->vacuumFlags & PROC_IN_VACUUM)
1327
				continue;
1328

1329
			/* Update globalxmin to be the smallest valid xmin */
1330
			xid = pgxact->xmin;	/* fetch just once */
1331
			if (TransactionIdIsNormal(xid) &&
1332
				NormalTransactionIdPrecedes(xid, globalxmin))
1333
					globalxmin = xid;
1334 1335

			/* Fetch xid just once - see GetNewTransactionId */
1336
			xid = pgxact->xid;
1337 1338

			/*
1339 1340 1341 1342
			 * If the transaction has no XID assigned, we can skip it; it won't
			 * have sub-XIDs either.  If the XID is >= xmax, we can also skip
			 * it; such transactions will be treated as running anyway (and any
			 * sub-XIDs will also be >= xmax).
1343
			 */
1344 1345
			if (!TransactionIdIsNormal(xid)
				|| !NormalTransactionIdPrecedes(xid, xmax))
1346
					continue;
1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358

			/*
			 * We don't include our own XIDs (if any) in the snapshot, but we
			 * must include them in xmin.
			 */
			if (NormalTransactionIdPrecedes(xid, xmin))
				xmin = xid;
			if (pgxact == MyPgXact)
				continue;

			/* Add XID to snapshot. */
			snapshot->xip[count++] = xid;
1359

1360
			/*
B
Bruce Momjian 已提交
1361 1362 1363 1364 1365
			 * Save subtransaction XIDs if possible (if we've already
			 * overflowed, there's no point).  Note that the subxact XIDs must
			 * be later than their parent, so no need to check them against
			 * xmin.  We could filter against xmax, but it seems better not to
			 * do that much work while holding the ProcArrayLock.
1366 1367
			 *
			 * The other backend can add more subxids concurrently, but cannot
B
Bruce Momjian 已提交
1368 1369 1370 1371
			 * remove any.	Hence it's important to fetch nxids just once.
			 * Should be safe to use memcpy, though.  (We needn't worry about
			 * missing any xids added concurrently, because they must postdate
			 * xmax.)
1372 1373 1374
			 *
			 * Again, our own XIDs are not included in the snapshot.
			 */
1375
			if (!suboverflowed)
1376
			{
1377
				if (pgxact->overflowed)
1378 1379
					suboverflowed = true;
				else
1380
				{
1381
					int			nxids = pgxact->nxids;
1382 1383 1384

					if (nxids > 0)
					{
1385
						volatile PGPROC *proc = &allProcs[pgprocno];
1386 1387 1388 1389 1390
						memcpy(snapshot->subxip + subcount,
							   (void *) proc->subxids.xids,
							   nxids * sizeof(TransactionId));
						subcount += nxids;
					}
1391 1392 1393
				}
			}
		}
1394
	}
1395
	else
1396 1397
	{
		/*
1398 1399
		 * We're in hot standby, so get XIDs from KnownAssignedXids.
		 *
1400 1401 1402 1403 1404
		 * We store all xids directly into subxip[]. Here's why:
		 *
		 * In recovery we don't know which xids are top-level and which are
		 * subxacts, a design choice that greatly simplifies xid processing.
		 *
B
Bruce Momjian 已提交
1405 1406 1407 1408
		 * It seems like we would want to try to put xids into xip[] only, but
		 * that is fairly small. We would either need to make that bigger or
		 * to increase the rate at which we WAL-log xid assignment; neither is
		 * an appealing choice.
1409 1410 1411 1412
		 *
		 * We could try to store xids into xip[] first and then into subxip[]
		 * if there are too many xids. That only works if the snapshot doesn't
		 * overflow because we do not search subxip[] in that case. A simpler
B
Bruce Momjian 已提交
1413 1414
		 * way is to just store all xids in the subxact array because this is
		 * by far the bigger array. We just leave the xip array empty.
1415 1416 1417 1418 1419
		 *
		 * Either way we need to change the way XidInMVCCSnapshot() works
		 * depending upon when the snapshot was taken, or change normal
		 * snapshot processing so it matches.
		 */
1420 1421
		subcount = KnownAssignedXidsGetAndSetXmin(snapshot->subxip, &xmin,
												  xmax);
1422

1423
		if (TransactionIdPrecedesOrEquals(xmin, procArray->lastOverflowedXid))
1424 1425 1426
			suboverflowed = true;
	}

1427 1428
	if (!TransactionIdIsValid(MyPgXact->xmin))
		MyPgXact->xmin = TransactionXmin = xmin;
1429 1430 1431
	LWLockRelease(ProcArrayLock);

	/*
B
Bruce Momjian 已提交
1432 1433 1434
	 * Update globalxmin to include actual process xids.  This is a slightly
	 * different way of computing it than GetOldestXmin uses, but should give
	 * the same result.
1435 1436 1437 1438 1439
	 */
	if (TransactionIdPrecedes(xmin, globalxmin))
		globalxmin = xmin;

	/* Update global variables too */
1440 1441 1442
	RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
	if (!TransactionIdIsNormal(RecentGlobalXmin))
		RecentGlobalXmin = FirstNormalTransactionId;
1443 1444 1445 1446 1447
	RecentXmin = xmin;

	snapshot->xmin = xmin;
	snapshot->xmax = xmax;
	snapshot->xcnt = count;
1448
	snapshot->subxcnt = subcount;
1449
	snapshot->suboverflowed = suboverflowed;
1450

1451
	snapshot->curcid = GetCurrentCommandId(false);
1452

1453
	/*
1454 1455
	 * This is a new snapshot, so set both refcounts are zero, and mark it as
	 * not copied in persistent memory.
1456 1457 1458 1459 1460
	 */
	snapshot->active_count = 0;
	snapshot->regd_count = 0;
	snapshot->copied = false;

1461 1462 1463
	return snapshot;
}

1464
/*
1465
 * ProcArrayInstallImportedXmin -- install imported xmin into MyPgXact->xmin
1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489
 *
 * This is called when installing a snapshot imported from another
 * transaction.  To ensure that OldestXmin doesn't go backwards, we must
 * check that the source transaction is still running, and we'd better do
 * that atomically with installing the new xmin.
 *
 * Returns TRUE if successful, FALSE if source xact is no longer running.
 */
bool
ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid)
{
	bool		result = false;
	ProcArrayStruct *arrayP = procArray;
	int			index;

	Assert(TransactionIdIsNormal(xmin));
	if (!TransactionIdIsNormal(sourcexid))
		return false;

	/* Get lock so source xact can't end while we're doing this */
	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
1490 1491 1492 1493
		int		pgprocno = arrayP->pgprocnos[index];
		volatile PGPROC    *proc = &allProcs[pgprocno];
		volatile PGXACT    *pgxact = &allPgXact[pgprocno];
		TransactionId	xid;
1494 1495

		/* Ignore procs running LAZY VACUUM */
1496
		if (pgxact->vacuumFlags & PROC_IN_VACUUM)
1497 1498
			continue;

1499
		xid = pgxact->xid;	/* fetch just once */
1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514
		if (xid != sourcexid)
			continue;

		/*
		 * We check the transaction's database ID for paranoia's sake: if
		 * it's in another DB then its xmin does not cover us.  Caller should
		 * have detected this already, so we just treat any funny cases as
		 * "transaction not found".
		 */
		if (proc->databaseId != MyDatabaseId)
			continue;

		/*
		 * Likewise, let's just make real sure its xmin does cover us.
		 */
1515
		xid = pgxact->xmin;	/* fetch just once */
1516 1517 1518 1519 1520 1521 1522 1523 1524 1525
		if (!TransactionIdIsNormal(xid) ||
			!TransactionIdPrecedesOrEquals(xid, xmin))
			continue;

		/*
		 * We're good.  Install the new xmin.  As in GetSnapshotData, set
		 * TransactionXmin too.  (Note that because snapmgr.c called
		 * GetSnapshotData first, we'll be overwriting a valid xmin here,
		 * so we don't check that.)
		 */
1526
		MyPgXact->xmin = TransactionXmin = xmin;
1527 1528 1529 1530 1531 1532 1533 1534 1535 1536

		result = true;
		break;
	}

	LWLockRelease(ProcArrayLock);

	return result;
}

1537 1538 1539
/*
 * GetRunningTransactionData -- returns information about running transactions.
 *
1540
 * Similar to GetSnapshotData but returns more information. We include
1541
 * all PGXACTs with an assigned TransactionId, even VACUUM processes.
1542
 *
1543 1544 1545 1546
 * We acquire XidGenLock, but the caller is responsible for releasing it.
 * This ensures that no new XIDs enter the proc array until the caller has
 * WAL-logged this snapshot, and releases the lock.
 *
1547 1548 1549
 * The returned data structure is statically allocated; caller should not
 * modify it, and must not assume it is valid past the next call.
 *
1550 1551 1552 1553 1554 1555 1556 1557 1558 1559
 * This is never executed during recovery so there is no need to look at
 * KnownAssignedXids.
 *
 * We don't worry about updating other counters, we want to keep this as
 * simple as possible and leave GetSnapshotData() as the primary code for
 * that bookkeeping.
 */
RunningTransactions
GetRunningTransactionData(void)
{
1560 1561 1562
	/* result workspace */
	static RunningTransactionsData CurrentRunningXactsData;

1563
	ProcArrayStruct *arrayP = procArray;
1564
	RunningTransactions CurrentRunningXacts = &CurrentRunningXactsData;
1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580
	TransactionId latestCompletedXid;
	TransactionId oldestRunningXid;
	TransactionId *xids;
	int			index;
	int			count;
	int			subcount;
	bool		suboverflowed;

	Assert(!RecoveryInProgress());

	/*
	 * Allocating space for maxProcs xids is usually overkill; numProcs would
	 * be sufficient.  But it seems better to do the malloc while not holding
	 * the lock, so we can't look at numProcs.  Likewise, we allocate much
	 * more subxip storage than is probably needed.
	 *
1581
	 * Should only be allocated in bgwriter, since only ever executed during
B
Bruce Momjian 已提交
1582
	 * checkpoints.
1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611
	 */
	if (CurrentRunningXacts->xids == NULL)
	{
		/*
		 * First call
		 */
		CurrentRunningXacts->xids = (TransactionId *)
			malloc(TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
		if (CurrentRunningXacts->xids == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
	}

	xids = CurrentRunningXacts->xids;

	count = subcount = 0;
	suboverflowed = false;

	/*
	 * Ensure that no xids enter or leave the procarray while we obtain
	 * snapshot.
	 */
	LWLockAcquire(ProcArrayLock, LW_SHARED);
	LWLockAcquire(XidGenLock, LW_SHARED);

	latestCompletedXid = ShmemVariableCache->latestCompletedXid;

	oldestRunningXid = ShmemVariableCache->nextXid;
B
Bruce Momjian 已提交
1612

1613 1614 1615 1616 1617
	/*
	 * Spin over procArray collecting all xids and subxids.
	 */
	for (index = 0; index < arrayP->numProcs; index++)
	{
1618 1619 1620
		int		pgprocno = arrayP->pgprocnos[index];
		volatile PGPROC *proc = &allProcs[pgprocno];
		volatile PGXACT *pgxact = &allPgXact[pgprocno];
1621 1622 1623 1624
		TransactionId xid;
		int			nxids;

		/* Fetch xid just once - see GetNewTransactionId */
1625
		xid = pgxact->xid;
1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639

		/*
		 * We don't need to store transactions that don't have a TransactionId
		 * yet because they will not show as running on a standby server.
		 */
		if (!TransactionIdIsValid(xid))
			continue;

		xids[count++] = xid;

		if (TransactionIdPrecedes(xid, oldestRunningXid))
			oldestRunningXid = xid;

		/*
B
Bruce Momjian 已提交
1640 1641
		 * Save subtransaction XIDs. Other backends can't add or remove
		 * entries while we're holding XidGenLock.
1642
		 */
1643
		nxids = pgxact->nxids;
1644 1645 1646 1647 1648 1649 1650
		if (nxids > 0)
		{
			memcpy(&xids[count], (void *) proc->subxids.xids,
				   nxids * sizeof(TransactionId));
			count += nxids;
			subcount += nxids;

1651
			if (pgxact->overflowed)
1652 1653 1654
				suboverflowed = true;

			/*
1655 1656 1657
			 * Top-level XID of a transaction is always less than any of its
			 * subxids, so we don't need to check if any of the subxids are
			 * smaller than oldestRunningXid
1658 1659 1660 1661 1662 1663 1664 1665
			 */
		}
	}

	CurrentRunningXacts->xcnt = count;
	CurrentRunningXacts->subxid_overflow = suboverflowed;
	CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid;
	CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
1666
	CurrentRunningXacts->latestCompletedXid = latestCompletedXid;
1667

1668
	/* We don't release XidGenLock here, the caller is responsible for that */
1669 1670
	LWLockRelease(ProcArrayLock);

1671 1672 1673 1674
	Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid));
	Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid));
	Assert(TransactionIdIsNormal(CurrentRunningXacts->latestCompletedXid));

1675 1676 1677
	return CurrentRunningXacts;
}

1678 1679 1680 1681
/*
 * GetOldestActiveTransactionId()
 *
 * Similar to GetSnapshotData but returns just oldestActiveXid. We include
1682
 * all PGXACTs with an assigned TransactionId, even VACUUM processes.
1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703
 * We look at all databases, though there is no need to include WALSender
 * since this has no effect on hot standby conflicts.
 *
 * This is never executed during recovery so there is no need to look at
 * KnownAssignedXids.
 *
 * We don't worry about updating other counters, we want to keep this as
 * simple as possible and leave GetSnapshotData() as the primary code for
 * that bookkeeping.
 */
TransactionId
GetOldestActiveTransactionId(void)
{
	ProcArrayStruct *arrayP = procArray;
	TransactionId oldestRunningXid;
	int			index;

	Assert(!RecoveryInProgress());

	LWLockAcquire(ProcArrayLock, LW_SHARED);

1704 1705 1706 1707 1708 1709 1710
	/*
	 * It's okay to read nextXid without acquiring XidGenLock because (1) we
	 * assume TransactionIds can be read atomically and (2) we don't care if
	 * we get a slightly stale value.  It can't be very stale anyway, because
	 * the LWLockAcquire above will have done any necessary memory
	 * interlocking.
	 */
1711 1712 1713 1714 1715 1716 1717
	oldestRunningXid = ShmemVariableCache->nextXid;

	/*
	 * Spin over procArray collecting all xids and subxids.
	 */
	for (index = 0; index < arrayP->numProcs; index++)
	{
1718 1719
		int		pgprocno = arrayP->pgprocnos[index];
		volatile PGXACT *pgxact = &allPgXact[pgprocno];
1720 1721 1722
		TransactionId xid;

		/* Fetch xid just once - see GetNewTransactionId */
1723
		xid = pgxact->xid;
1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742

		if (!TransactionIdIsNormal(xid))
			continue;

		if (TransactionIdPrecedes(xid, oldestRunningXid))
			oldestRunningXid = xid;

		/*
		 * Top-level XID of a transaction is always less than any of its
		 * subxids, so we don't need to check if any of the subxids are
		 * smaller than oldestRunningXid
		 */
	}

	LWLockRelease(ProcArrayLock);

	return oldestRunningXid;
}

1743 1744 1745 1746
/*
 * GetTransactionsInCommit -- Get the XIDs of transactions that are committing
 *
 * Constructs an array of XIDs of transactions that are currently in commit
1747
 * critical sections, as shown by having inCommit set in their PGXACT entries.
1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764
 *
 * *xids_p is set to a palloc'd array that should be freed by the caller.
 * The return value is the number of valid entries.
 *
 * Note that because backends set or clear inCommit without holding any lock,
 * the result is somewhat indeterminate, but we don't really care.  Even in
 * a multiprocessor with delayed writes to shared memory, it should be certain
 * that setting of inCommit will propagate to shared memory when the backend
 * takes the WALInsertLock, so we cannot fail to see an xact as inCommit if
 * it's already inserted its commit record.  Whether it takes a little while
 * for clearing of inCommit to propagate is unimportant for correctness.
 */
int
GetTransactionsInCommit(TransactionId **xids_p)
{
	ProcArrayStruct *arrayP = procArray;
	TransactionId *xids;
B
Bruce Momjian 已提交
1765 1766
	int			nxids;
	int			index;
1767 1768 1769 1770 1771 1772 1773 1774

	xids = (TransactionId *) palloc(arrayP->maxProcs * sizeof(TransactionId));
	nxids = 0;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
1775 1776 1777
		int		pgprocno = arrayP->pgprocnos[index];
		volatile PGXACT *pgxact = &allPgXact[pgprocno];
		TransactionId pxid;
B
Bruce Momjian 已提交
1778

1779
		/* Fetch xid just once - see GetNewTransactionId */
1780
		pxid = pgxact->xid;
1781

1782
		if (pgxact->inCommit && TransactionIdIsValid(pxid))
1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803
			xids[nxids++] = pxid;
	}

	LWLockRelease(ProcArrayLock);

	*xids_p = xids;
	return nxids;
}

/*
 * HaveTransactionsInCommit -- Are any of the specified XIDs in commit?
 *
 * This is used with the results of GetTransactionsInCommit to see if any
 * of the specified XIDs are still in their commit critical sections.
 *
 * Note: this is O(N^2) in the number of xacts that are/were in commit, but
 * those numbers should be small enough for it not to be a problem.
 */
bool
HaveTransactionsInCommit(TransactionId *xids, int nxids)
{
B
Bruce Momjian 已提交
1804
	bool		result = false;
1805
	ProcArrayStruct *arrayP = procArray;
B
Bruce Momjian 已提交
1806
	int			index;
1807 1808 1809 1810 1811

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
1812 1813 1814
		int		pgprocno = arrayP->pgprocnos[index];
		volatile PGXACT    *pgxact = &allPgXact[pgprocno];
		TransactionId	pxid;
B
Bruce Momjian 已提交
1815

1816
		/* Fetch xid just once - see GetNewTransactionId */
1817
		pxid = pgxact->xid;
1818

1819
		if (pgxact->inCommit && TransactionIdIsValid(pxid))
1820
		{
B
Bruce Momjian 已提交
1821
			int			i;
1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840

			for (i = 0; i < nxids; i++)
			{
				if (xids[i] == pxid)
				{
					result = true;
					break;
				}
			}
			if (result)
				break;
		}
	}

	LWLockRelease(ProcArrayLock);

	return result;
}

1841 1842
/*
 * BackendPidGetProc -- get a backend's PGPROC given its PID
1843 1844 1845 1846
 *
 * Returns NULL if not found.  Note that it is up to the caller to be
 * sure that the question remains meaningful for long enough for the
 * answer to be used ...
1847
 */
1848
PGPROC *
1849 1850 1851 1852 1853 1854
BackendPidGetProc(int pid)
{
	PGPROC	   *result = NULL;
	ProcArrayStruct *arrayP = procArray;
	int			index;

1855 1856 1857
	if (pid == 0)				/* never match dummy PGPROCs */
		return NULL;

1858 1859 1860 1861
	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
1862
		PGPROC	   *proc = &allProcs[arrayP->pgprocnos[index]];
1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875

		if (proc->pid == pid)
		{
			result = proc;
			break;
		}
	}

	LWLockRelease(ProcArrayLock);

	return result;
}

T
Tatsuo Ishii 已提交
1876 1877 1878 1879 1880 1881
/*
 * BackendXidGetPid -- get a backend's pid given its XID
 *
 * Returns 0 if not found or it's a prepared transaction.  Note that
 * it is up to the caller to be sure that the question remains
 * meaningful for long enough for the answer to be used ...
B
Bruce Momjian 已提交
1882
 *
T
Tatsuo Ishii 已提交
1883 1884
 * Only main transaction Ids are considered.  This function is mainly
 * useful for determining what backend owns a lock.
1885
 *
B
Bruce Momjian 已提交
1886
 * Beware that not every xact has an XID assigned.	However, as long as you
1887
 * only call this using an XID found on disk, you're safe.
T
Tatsuo Ishii 已提交
1888 1889 1890 1891
 */
int
BackendXidGetPid(TransactionId xid)
{
B
Bruce Momjian 已提交
1892
	int			result = 0;
T
Tatsuo Ishii 已提交
1893 1894 1895 1896 1897 1898 1899 1900 1901 1902
	ProcArrayStruct *arrayP = procArray;
	int			index;

	if (xid == InvalidTransactionId)	/* never match invalid xid */
		return 0;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
1903 1904 1905
		int		pgprocno = arrayP->pgprocnos[index];
		volatile PGPROC    *proc = &allProcs[pgprocno];
		volatile PGXACT    *pgxact = &allPgXact[pgprocno];
T
Tatsuo Ishii 已提交
1906

1907
		if (pgxact->xid == xid)
T
Tatsuo Ishii 已提交
1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918
		{
			result = proc->pid;
			break;
		}
	}

	LWLockRelease(ProcArrayLock);

	return result;
}

1919 1920 1921 1922 1923 1924 1925 1926 1927
/*
 * IsBackendPid -- is a given pid a running backend
 */
bool
IsBackendPid(int pid)
{
	return (BackendPidGetProc(pid) != NULL);
}

1928 1929 1930 1931

/*
 * GetCurrentVirtualXIDs -- returns an array of currently active VXIDs.
 *
1932
 * The array is palloc'd. The number of valid entries is returned into *nvxids.
1933
 *
1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947
 * The arguments allow filtering the set of VXIDs returned.  Our own process
 * is always skipped.  In addition:
 *	If limitXmin is not InvalidTransactionId, skip processes with
 *		xmin > limitXmin.
 *	If excludeXmin0 is true, skip processes with xmin = 0.
 *	If allDbs is false, skip processes attached to other databases.
 *	If excludeVacuum isn't zero, skip processes for which
 *		(vacuumFlags & excludeVacuum) is not zero.
 *
 * Note: the purpose of the limitXmin and excludeXmin0 parameters is to
 * allow skipping backends whose oldest live snapshot is no older than
 * some snapshot we have.  Since we examine the procarray with only shared
 * lock, there are race conditions: a backend could set its xmin just after
 * we look.  Indeed, on multiprocessors with weak memory ordering, the
1948
 * other backend could have set its xmin *before* we look.	We know however
1949 1950 1951 1952 1953
 * that such a backend must have held shared ProcArrayLock overlapping our
 * own hold of ProcArrayLock, else we would see its xmin update.  Therefore,
 * any snapshot the other backend is taking concurrently with our scan cannot
 * consider any transactions as still running that we think are committed
 * (since backends must hold ProcArrayLock exclusive to commit).
1954 1955
 */
VirtualTransactionId *
1956 1957 1958
GetCurrentVirtualXIDs(TransactionId limitXmin, bool excludeXmin0,
					  bool allDbs, int excludeVacuum,
					  int *nvxids)
1959 1960 1961 1962 1963 1964
{
	VirtualTransactionId *vxids;
	ProcArrayStruct *arrayP = procArray;
	int			count = 0;
	int			index;

1965
	/* allocate what's certainly enough result space */
1966
	vxids = (VirtualTransactionId *)
1967
		palloc(sizeof(VirtualTransactionId) * arrayP->maxProcs);
1968 1969 1970 1971 1972

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
1973 1974 1975
		int		pgprocno = arrayP->pgprocnos[index];
		volatile PGPROC    *proc = &allProcs[pgprocno];
		volatile PGXACT    *pgxact = &allPgXact[pgprocno];
1976 1977 1978 1979

		if (proc == MyProc)
			continue;

1980
		if (excludeVacuum & pgxact->vacuumFlags)
1981 1982
			continue;

1983
		if (allDbs || proc->databaseId == MyDatabaseId)
1984
		{
1985
			/* Fetch xmin just once - might change on us */
1986
			TransactionId pxmin = pgxact->xmin;
1987

1988 1989 1990
			if (excludeXmin0 && !TransactionIdIsValid(pxmin))
				continue;

1991
			/*
1992 1993
			 * InvalidTransactionId precedes all other XIDs, so a proc that
			 * hasn't set xmin yet will not be rejected by this test.
1994 1995
			 */
			if (!TransactionIdIsValid(limitXmin) ||
1996
				TransactionIdPrecedesOrEquals(pxmin, limitXmin))
1997 1998
			{
				VirtualTransactionId vxid;
1999

2000 2001 2002 2003
				GET_VXID_FROM_PGPROC(vxid, *proc);
				if (VirtualTransactionIdIsValid(vxid))
					vxids[count++] = vxid;
			}
2004 2005 2006 2007 2008
		}
	}

	LWLockRelease(ProcArrayLock);

2009
	*nvxids = count;
2010 2011 2012
	return vxids;
}

2013 2014 2015 2016 2017 2018
/*
 * GetConflictingVirtualXIDs -- returns an array of currently active VXIDs.
 *
 * Usage is limited to conflict resolution during recovery on standby servers.
 * limitXmin is supplied as either latestRemovedXid, or InvalidTransactionId
 * in cases where we cannot accurately determine a value for latestRemovedXid.
2019
 *
2020 2021 2022
 * If limitXmin is InvalidTransactionId then we want to kill everybody,
 * so we're not worried if they have a snapshot or not, nor does it really
 * matter what type of lock we hold.
2023
 *
2024 2025 2026 2027 2028 2029 2030 2031 2032 2033
 * All callers that are checking xmins always now supply a valid and useful
 * value for limitXmin. The limitXmin is always lower than the lowest
 * numbered KnownAssignedXid that is not already a FATAL error. This is
 * because we only care about cleanup records that are cleaning up tuple
 * versions from committed transactions. In that case they will only occur
 * at the point where the record is less than the lowest running xid. That
 * allows us to say that if any backend takes a snapshot concurrently with
 * us then the conflict assessment made here would never include the snapshot
 * that is being derived. So we take LW_SHARED on the ProcArray and allow
 * concurrent snapshots when limitXmin is valid. We might think about adding
B
Bruce Momjian 已提交
2034
 *	 Assert(limitXmin < lowest(KnownAssignedXids))
2035 2036
 * but that would not be true in the case of FATAL errors lagging in array,
 * but we already know those are bogus anyway, so we skip that test.
2037
 *
2038
 * If dbOid is valid we skip backends attached to other databases.
2039 2040 2041 2042 2043
 *
 * Be careful to *not* pfree the result from this function. We reuse
 * this array sufficiently often that we use malloc for the result.
 */
VirtualTransactionId *
2044
GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
2045 2046 2047 2048 2049 2050 2051
{
	static VirtualTransactionId *vxids;
	ProcArrayStruct *arrayP = procArray;
	int			count = 0;
	int			index;

	/*
2052
	 * If first time through, get workspace to remember main XIDs in. We
B
Bruce Momjian 已提交
2053 2054
	 * malloc it permanently to avoid repeated palloc/pfree overhead. Allow
	 * result space, remembering room for a terminator.
2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065
	 */
	if (vxids == NULL)
	{
		vxids = (VirtualTransactionId *)
			malloc(sizeof(VirtualTransactionId) * (arrayP->maxProcs + 1));
		if (vxids == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
	}

2066
	LWLockAcquire(ProcArrayLock, LW_SHARED);
2067 2068 2069

	for (index = 0; index < arrayP->numProcs; index++)
	{
2070 2071 2072
		int		pgprocno = arrayP->pgprocnos[index];
		volatile PGPROC    *proc = &allProcs[pgprocno];
		volatile PGXACT    *pgxact = &allPgXact[pgprocno];
2073 2074 2075 2076 2077 2078 2079 2080 2081

		/* Exclude prepared transactions */
		if (proc->pid == 0)
			continue;

		if (!OidIsValid(dbOid) ||
			proc->databaseId == dbOid)
		{
			/* Fetch xmin just once - can't change on us, but good coding */
2082
			TransactionId pxmin = pgxact->xmin;
2083 2084

			/*
B
Bruce Momjian 已提交
2085 2086 2087
			 * We ignore an invalid pxmin because this means that backend has
			 * no snapshot and cannot get another one while we hold exclusive
			 * lock.
2088
			 */
2089 2090
			if (!TransactionIdIsValid(limitXmin) ||
				(TransactionIdIsValid(pxmin) && !TransactionIdFollows(pxmin, limitXmin)))
2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115
			{
				VirtualTransactionId vxid;

				GET_VXID_FROM_PGPROC(vxid, *proc);
				if (VirtualTransactionIdIsValid(vxid))
					vxids[count++] = vxid;
			}
		}
	}

	LWLockRelease(ProcArrayLock);

	/* add the terminator */
	vxids[count].backendId = InvalidBackendId;
	vxids[count].localTransactionId = InvalidLocalTransactionId;

	return vxids;
}

/*
 * CancelVirtualTransaction - used in recovery conflict processing
 *
 * Returns pid of the process signaled, or 0 if not found.
 */
pid_t
2116
CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode)
2117 2118 2119 2120 2121 2122 2123 2124 2125
{
	ProcArrayStruct *arrayP = procArray;
	int			index;
	pid_t		pid = 0;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
2126 2127 2128
		int		pgprocno = arrayP->pgprocnos[index];
		volatile PGPROC    *proc = &allProcs[pgprocno];
		VirtualTransactionId	procvxid;
2129 2130 2131 2132 2133 2134

		GET_VXID_FROM_PGPROC(procvxid, *proc);

		if (procvxid.backendId == vxid.backendId &&
			procvxid.localTransactionId == vxid.localTransactionId)
		{
2135
			proc->recoveryConflictPending = true;
2136
			pid = proc->pid;
2137 2138 2139
			if (pid != 0)
			{
				/*
B
Bruce Momjian 已提交
2140 2141
				 * Kill the pid if it's still here. If not, that's what we
				 * wanted so ignore any errors.
2142 2143 2144
				 */
				(void) SendProcSignal(pid, sigmode, vxid.backendId);
			}
2145 2146 2147 2148 2149 2150 2151 2152
			break;
		}
	}

	LWLockRelease(ProcArrayLock);

	return pid;
}
2153

2154
/*
2155 2156 2157
 * MinimumActiveBackends --- count backends (other than myself) that are
 *		in active transactions.  Return true if the count exceeds the
 *		minimum threshold passed.  This is used as a heuristic to decide if
2158 2159
 *		a pre-XLOG-flush delay is worthwhile during commit.
 *
2160 2161
 * Do not count backends that are blocked waiting for locks, since they are
 * not going to get to run until someone else commits.
2162
 */
2163 2164
bool
MinimumActiveBackends(int min)
2165 2166 2167 2168 2169
{
	ProcArrayStruct *arrayP = procArray;
	int			count = 0;
	int			index;

2170 2171 2172 2173
	/* Quick short-circuit if no minimum is specified */
	if (min == 0)
		return true;

2174 2175
	/*
	 * Note: for speed, we don't acquire ProcArrayLock.  This is a little bit
B
Bruce Momjian 已提交
2176 2177
	 * bogus, but since we are only testing fields for zero or nonzero, it
	 * should be OK.  The result is only used for heuristic purposes anyway...
2178 2179 2180
	 */
	for (index = 0; index < arrayP->numProcs; index++)
	{
2181 2182 2183
		int		pgprocno = arrayP->pgprocnos[index];
		volatile PGPROC    *proc = &allProcs[pgprocno];
		volatile PGXACT    *pgxact = &allPgXact[pgprocno];
2184

2185 2186 2187 2188 2189 2190 2191
		/*
		 * Since we're not holding a lock, need to check that the pointer is
		 * valid. Someone holding the lock could have incremented numProcs
		 * already, but not yet inserted a valid pointer to the array.
		 *
		 * If someone just decremented numProcs, 'proc' could also point to a
		 * PGPROC entry that's no longer in the array. It still points to a
2192
		 * PGPROC struct, though, because freed PGPROC entries just go to the
2193 2194
		 * free list and are recycled. Its contents are nonsense in that case,
		 * but that's acceptable for this function.
2195
		 */
2196
		if (proc == NULL)
2197 2198
			continue;

2199 2200
		if (proc == MyProc)
			continue;			/* do not count myself */
2201 2202
		if (pgxact->xid == InvalidTransactionId)
			continue;			/* do not count if no XID assigned */
2203 2204
		if (proc->pid == 0)
			continue;			/* do not count prepared xacts */
2205 2206 2207
		if (proc->waitLock != NULL)
			continue;			/* do not count if blocked on a lock */
		count++;
2208 2209
		if (count >= min)
			break;
2210 2211
	}

2212
	return count >= min;
2213 2214
}

2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228
/*
 * CountDBBackends --- count backends that are using specified database
 */
int
CountDBBackends(Oid databaseid)
{
	ProcArrayStruct *arrayP = procArray;
	int			count = 0;
	int			index;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
2229 2230
		int pgprocno = arrayP->pgprocnos[index];
		volatile PGPROC *proc = &allProcs[pgprocno];
2231 2232 2233

		if (proc->pid == 0)
			continue;			/* do not count prepared xacts */
2234 2235
		if (!OidIsValid(databaseid) ||
			proc->databaseId == databaseid)
2236 2237 2238 2239 2240 2241 2242 2243
			count++;
	}

	LWLockRelease(ProcArrayLock);

	return count;
}

2244 2245 2246 2247
/*
 * CancelDBBackends --- cancel backends that are using specified database
 */
void
2248
CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPending)
2249 2250 2251
{
	ProcArrayStruct *arrayP = procArray;
	int			index;
2252
	pid_t		pid = 0;
2253 2254 2255 2256 2257 2258

	/* tell all backends to die */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

	for (index = 0; index < arrayP->numProcs; index++)
	{
2259 2260
		int pgprocno = arrayP->pgprocnos[index];
		volatile PGPROC *proc = &allProcs[pgprocno];
2261

2262
		if (databaseid == InvalidOid || proc->databaseId == databaseid)
2263
		{
2264 2265 2266 2267
			VirtualTransactionId procvxid;

			GET_VXID_FROM_PGPROC(procvxid, *proc);

2268
			proc->recoveryConflictPending = conflictPending;
2269 2270 2271 2272
			pid = proc->pid;
			if (pid != 0)
			{
				/*
B
Bruce Momjian 已提交
2273 2274
				 * Kill the pid if it's still here. If not, that's what we
				 * wanted so ignore any errors.
2275
				 */
2276
				(void) SendProcSignal(pid, sigmode, procvxid.backendId);
2277
			}
2278 2279 2280 2281 2282 2283
		}
	}

	LWLockRelease(ProcArrayLock);
}

2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297
/*
 * CountUserBackends --- count backends that are used by specified user
 */
int
CountUserBackends(Oid roleid)
{
	ProcArrayStruct *arrayP = procArray;
	int			count = 0;
	int			index;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
2298 2299
		int pgprocno = arrayP->pgprocnos[index];
		volatile PGPROC *proc = &allProcs[pgprocno];
2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311

		if (proc->pid == 0)
			continue;			/* do not count prepared xacts */
		if (proc->roleId == roleid)
			count++;
	}

	LWLockRelease(ProcArrayLock);

	return count;
}

2312
/*
2313
 * CountOtherDBBackends -- check for other backends running in the given DB
2314 2315 2316
 *
 * If there are other backends in the DB, we will wait a maximum of 5 seconds
 * for them to exit.  Autovacuum backends are encouraged to exit early by
2317
 * sending them SIGTERM, but normal user backends are just waited for.
2318 2319 2320 2321 2322
 *
 * The current backend is always ignored; it is caller's responsibility to
 * check whether the current backend uses the given DB, if it's important.
 *
 * Returns TRUE if there are (still) other backends in the DB, FALSE if not.
2323 2324
 * Also, *nbackends and *nprepared are set to the number of other backends
 * and prepared transactions in the DB, respectively.
2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335
 *
 * This function is used to interlock DROP DATABASE and related commands
 * against there being any active backends in the target DB --- dropping the
 * DB while active backends remain would be a Bad Thing.  Note that we cannot
 * detect here the possibility of a newly-started backend that is trying to
 * connect to the doomed database, so additional interlocking is needed during
 * backend startup.  The caller should normally hold an exclusive lock on the
 * target DB before calling this, which is one reason we mustn't wait
 * indefinitely.
 */
bool
2336
CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
2337 2338
{
	ProcArrayStruct *arrayP = procArray;
2339 2340

#define MAXAUTOVACPIDS	10		/* max autovacs to SIGTERM per iteration */
2341
	int			autovac_pids[MAXAUTOVACPIDS];
2342 2343 2344 2345 2346
	int			tries;

	/* 50 tries with 100ms sleep between tries makes 5 sec total wait */
	for (tries = 0; tries < 50; tries++)
	{
2347
		int			nautovacs = 0;
2348 2349 2350 2351 2352
		bool		found = false;
		int			index;

		CHECK_FOR_INTERRUPTS();

2353 2354
		*nbackends = *nprepared = 0;

2355 2356 2357 2358
		LWLockAcquire(ProcArrayLock, LW_SHARED);

		for (index = 0; index < arrayP->numProcs; index++)
		{
2359 2360 2361
			int pgprocno = arrayP->pgprocnos[index];
			volatile PGPROC *proc = &allProcs[pgprocno];
			volatile PGXACT *pgxact = &allPgXact[pgprocno];
2362 2363 2364 2365 2366 2367 2368 2369

			if (proc->databaseId != databaseId)
				continue;
			if (proc == MyProc)
				continue;

			found = true;

2370 2371
			if (proc->pid == 0)
				(*nprepared)++;
2372 2373
			else
			{
2374
				(*nbackends)++;
2375
				if ((pgxact->vacuumFlags & PROC_IS_AUTOVACUUM) &&
2376 2377
					nautovacs < MAXAUTOVACPIDS)
					autovac_pids[nautovacs++] = proc->pid;
2378 2379 2380
			}
		}

2381 2382
		LWLockRelease(ProcArrayLock);

2383
		if (!found)
B
Bruce Momjian 已提交
2384
			return false;		/* no conflicting backends, so done */
2385

2386
		/*
2387 2388 2389 2390
		 * Send SIGTERM to any conflicting autovacuums before sleeping. We
		 * postpone this step until after the loop because we don't want to
		 * hold ProcArrayLock while issuing kill(). We have no idea what might
		 * block kill() inside the kernel...
2391 2392 2393 2394 2395
		 */
		for (index = 0; index < nautovacs; index++)
			(void) kill(autovac_pids[index], SIGTERM);	/* ignore any error */

		/* sleep, then try again */
B
Bruce Momjian 已提交
2396
		pg_usleep(100 * 1000L); /* 100ms */
2397 2398
	}

B
Bruce Momjian 已提交
2399
	return true;				/* timed out, still conflicts */
2400 2401
}

2402 2403 2404

#define XidCacheRemove(i) \
	do { \
2405 2406
		MyProc->subxids.xids[i] = MyProc->subxids.xids[MyPgXact->nxids - 1]; \
		MyPgXact->nxids--; \
2407 2408 2409 2410 2411 2412 2413 2414
	} while (0)

/*
 * XidCacheRemoveRunningXids
 *
 * Remove a bunch of TransactionIds from the list of known-running
 * subtransactions for my backend.	Both the specified xid and those in
 * the xids[] array (of length nxids) are removed from the subxids cache.
2415
 * latestXid must be the latest XID among the group.
2416 2417
 */
void
2418 2419 2420
XidCacheRemoveRunningXids(TransactionId xid,
						  int nxids, const TransactionId *xids,
						  TransactionId latestXid)
2421 2422 2423 2424
{
	int			i,
				j;

2425
	Assert(TransactionIdIsValid(xid));
2426 2427 2428

	/*
	 * We must hold ProcArrayLock exclusively in order to remove transactions
2429 2430 2431 2432
	 * from the PGPROC array.  (See src/backend/access/transam/README.)  It's
	 * possible this could be relaxed since we know this routine is only used
	 * to abort subtransactions, but pending closer analysis we'd best be
	 * conservative.
2433 2434 2435 2436
	 */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

	/*
B
Bruce Momjian 已提交
2437 2438 2439
	 * Under normal circumstances xid and xids[] will be in increasing order,
	 * as will be the entries in subxids.  Scan backwards to avoid O(N^2)
	 * behavior when removing a lot of xids.
2440 2441 2442 2443 2444
	 */
	for (i = nxids - 1; i >= 0; i--)
	{
		TransactionId anxid = xids[i];

2445
		for (j = MyPgXact->nxids - 1; j >= 0; j--)
2446 2447 2448 2449 2450 2451 2452
		{
			if (TransactionIdEquals(MyProc->subxids.xids[j], anxid))
			{
				XidCacheRemove(j);
				break;
			}
		}
B
Bruce Momjian 已提交
2453

2454
		/*
B
Bruce Momjian 已提交
2455 2456 2457 2458 2459
		 * Ordinarily we should have found it, unless the cache has
		 * overflowed. However it's also possible for this routine to be
		 * invoked multiple times for the same subtransaction, in case of an
		 * error during AbortSubTransaction.  So instead of Assert, emit a
		 * debug warning.
2460
		 */
2461
		if (j < 0 && !MyPgXact->overflowed)
2462 2463 2464
			elog(WARNING, "did not find subXID %u in MyProc", anxid);
	}

2465
	for (j = MyPgXact->nxids - 1; j >= 0; j--)
2466 2467 2468 2469 2470 2471 2472 2473
	{
		if (TransactionIdEquals(MyProc->subxids.xids[j], xid))
		{
			XidCacheRemove(j);
			break;
		}
	}
	/* Ordinarily we should have found it, unless the cache has overflowed */
2474
	if (j < 0 && !MyPgXact->overflowed)
2475 2476
		elog(WARNING, "did not find subXID %u in MyProc", xid);

2477 2478 2479 2480 2481
	/* Also advance global latestCompletedXid while holding the lock */
	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
							  latestXid))
		ShmemVariableCache->latestCompletedXid = latestXid;

2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493
	LWLockRelease(ProcArrayLock);
}

#ifdef XIDCACHE_DEBUG

/*
 * Print stats about effectiveness of XID cache
 */
static void
DisplayXidCache(void)
{
	fprintf(stderr,
2494
			"XidCache: xmin: %ld, known: %ld, myxact: %ld, latest: %ld, mainxid: %ld, childxid: %ld, knownassigned: %ld, nooflo: %ld, slow: %ld\n",
2495
			xc_by_recent_xmin,
2496
			xc_by_known_xact,
2497
			xc_by_my_xact,
2498
			xc_by_latest_xid,
2499 2500
			xc_by_main_xid,
			xc_by_child_xid,
2501
			xc_by_known_assigned,
2502
			xc_no_overflow,
2503 2504 2505
			xc_slow_answer);
}
#endif   /* XIDCACHE_DEBUG */
2506

2507

2508
/* ----------------------------------------------
B
Bruce Momjian 已提交
2509
 *		KnownAssignedTransactions sub-module
2510 2511 2512 2513 2514
 * ----------------------------------------------
 */

/*
 * In Hot Standby mode, we maintain a list of transactions that are (or were)
2515 2516
 * running in the master at the current point in WAL.  These XIDs must be
 * treated as running by standby transactions, even though they are not in
2517
 * the standby server's PGXACT array.
2518
 *
B
Bruce Momjian 已提交
2519
 * We record all XIDs that we know have been assigned.	That includes all the
2520 2521 2522 2523 2524 2525 2526 2527
 * XIDs seen in WAL records, plus all unobserved XIDs that we can deduce have
 * been assigned.  We can deduce the existence of unobserved XIDs because we
 * know XIDs are assigned in sequence, with no gaps.  The KnownAssignedXids
 * list expands as new XIDs are observed or inferred, and contracts when
 * transaction completion records arrive.
 *
 * During hot standby we do not fret too much about the distinction between
 * top-level XIDs and subtransaction XIDs. We store both together in the
B
Bruce Momjian 已提交
2528
 * KnownAssignedXids list.	In backends, this is copied into snapshots in
2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558
 * GetSnapshotData(), taking advantage of the fact that XidInMVCCSnapshot()
 * doesn't care about the distinction either.  Subtransaction XIDs are
 * effectively treated as top-level XIDs and in the typical case pg_subtrans
 * links are *not* maintained (which does not affect visibility).
 *
 * We have room in KnownAssignedXids and in snapshots to hold maxProcs *
 * (1 + PGPROC_MAX_CACHED_SUBXIDS) XIDs, so every master transaction must
 * report its subtransaction XIDs in a WAL XLOG_XACT_ASSIGNMENT record at
 * least every PGPROC_MAX_CACHED_SUBXIDS.  When we receive one of these
 * records, we mark the subXIDs as children of the top XID in pg_subtrans,
 * and then remove them from KnownAssignedXids.  This prevents overflow of
 * KnownAssignedXids and snapshots, at the cost that status checks for these
 * subXIDs will take a slower path through TransactionIdIsInProgress().
 * This means that KnownAssignedXids is not necessarily complete for subXIDs,
 * though it should be complete for top-level XIDs; this is the same situation
 * that holds with respect to the PGPROC entries in normal running.
 *
 * When we throw away subXIDs from KnownAssignedXids, we need to keep track of
 * that, similarly to tracking overflow of a PGPROC's subxids array.  We do
 * that by remembering the lastOverflowedXID, ie the last thrown-away subXID.
 * As long as that is within the range of interesting XIDs, we have to assume
 * that subXIDs are missing from snapshots.  (Note that subXID overflow occurs
 * on primary when 65th subXID arrives, whereas on standby it occurs when 64th
 * subXID arrives - that is not an error.)
 *
 * Should a backend on primary somehow disappear before it can write an abort
 * record, then we just leave those XIDs in KnownAssignedXids. They actually
 * aborted but we think they were running; the distinction is irrelevant
 * because either way any changes done by the transaction are not visible to
 * backends in the standby.  We prune KnownAssignedXids when
2559
 * XLOG_RUNNING_XACTS arrives, to forestall possible overflow of the
2560 2561 2562 2563 2564 2565 2566
 * array due to such dead XIDs.
 */

/*
 * RecordKnownAssignedTransactionIds
 *		Record the given XID in KnownAssignedXids, as well as any preceding
 *		unobserved XIDs.
2567 2568
 *
 * RecordKnownAssignedTransactionIds() should be run for *every* WAL record
2569 2570
 * associated with a transaction. Must be called for each record after we
 * have executed StartupCLOG() et al, since we must ExtendCLOG() etc..
2571
 *
2572
 * Called during recovery in analogy with and in place of GetNewTransactionId()
2573 2574 2575 2576
 */
void
RecordKnownAssignedTransactionIds(TransactionId xid)
{
2577
	Assert(standbyState >= STANDBY_INITIALIZED);
2578
	Assert(TransactionIdIsValid(xid));
2579

2580
	elog(trace_recovery(DEBUG4), "record known xact %u latestObservedXid %u",
B
Bruce Momjian 已提交
2581
		 xid, latestObservedXid);
2582

2583 2584 2585 2586 2587 2588 2589 2590
	/*
	 * If the KnownAssignedXids machinery isn't up yet, do nothing.
	 */
	if (standbyState <= STANDBY_INITIALIZED)
		return;

	Assert(TransactionIdIsValid(latestObservedXid));

2591
	/*
B
Bruce Momjian 已提交
2592 2593 2594
	 * When a newly observed xid arrives, it is frequently the case that it is
	 * *not* the next xid in sequence. When this occurs, we must treat the
	 * intervening xids as running also.
2595 2596 2597
	 */
	if (TransactionIdFollows(xid, latestObservedXid))
	{
2598
		TransactionId next_expected_xid;
2599 2600

		/*
B
Bruce Momjian 已提交
2601 2602 2603
		 * Extend clog and subtrans like we do in GetNewTransactionId() during
		 * normal operation using individual extend steps. Typical case
		 * requires almost no activity.
2604
		 */
2605 2606
		next_expected_xid = latestObservedXid;
		TransactionIdAdvance(next_expected_xid);
2607 2608 2609 2610 2611 2612 2613 2614
		while (TransactionIdPrecedesOrEquals(next_expected_xid, xid))
		{
			ExtendCLOG(next_expected_xid);
			ExtendSUBTRANS(next_expected_xid);

			TransactionIdAdvance(next_expected_xid);
		}

2615 2616 2617 2618 2619 2620
		/*
		 * Add the new xids onto the KnownAssignedXids array.
		 */
		next_expected_xid = latestObservedXid;
		TransactionIdAdvance(next_expected_xid);
		KnownAssignedXidsAdd(next_expected_xid, xid, false);
2621

2622 2623 2624
		/*
		 * Now we can advance latestObservedXid
		 */
2625 2626
		latestObservedXid = xid;

2627 2628 2629
		/* ShmemVariableCache->nextXid must be beyond any observed xid */
		next_expected_xid = latestObservedXid;
		TransactionIdAdvance(next_expected_xid);
2630
		LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
2631
		ShmemVariableCache->nextXid = next_expected_xid;
2632
		LWLockRelease(XidGenLock);
2633 2634 2635
	}
}

2636 2637 2638
/*
 * ExpireTreeKnownAssignedTransactionIds
 *		Remove the given XIDs from KnownAssignedXids.
2639 2640
 *
 * Called during recovery in analogy with and in place of ProcArrayEndTransaction()
2641
 */
2642 2643
void
ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
B
Bruce Momjian 已提交
2644
							   TransactionId *subxids, TransactionId max_xid)
2645
{
2646
	Assert(standbyState >= STANDBY_INITIALIZED);
2647 2648 2649 2650 2651 2652

	/*
	 * Uses same locking as transaction commit
	 */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

2653
	KnownAssignedXidsRemoveTree(xid, nsubxids, subxids);
2654

2655 2656 2657
	/* As in ProcArrayEndTransaction, advance latestCompletedXid */
	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
							  max_xid))
2658 2659 2660 2661 2662
		ShmemVariableCache->latestCompletedXid = max_xid;

	LWLockRelease(ProcArrayLock);
}

2663 2664 2665 2666
/*
 * ExpireAllKnownAssignedTransactionIds
 *		Remove all entries in KnownAssignedXids
 */
2667 2668 2669 2670
void
ExpireAllKnownAssignedTransactionIds(void)
{
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
2671
	KnownAssignedXidsRemovePreceding(InvalidTransactionId);
2672 2673 2674
	LWLockRelease(ProcArrayLock);
}

2675 2676 2677 2678
/*
 * ExpireOldKnownAssignedTransactionIds
 *		Remove KnownAssignedXids entries preceding the given XID
 */
2679 2680 2681 2682
void
ExpireOldKnownAssignedTransactionIds(TransactionId xid)
{
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
2683
	KnownAssignedXidsRemovePreceding(xid);
2684 2685 2686
	LWLockRelease(ProcArrayLock);
}

2687

2688 2689 2690
/*
 * Private module functions to manipulate KnownAssignedXids
 *
2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737
 * There are 5 main uses of the KnownAssignedXids data structure:
 *
 *	* backends taking snapshots - all valid XIDs need to be copied out
 *	* backends seeking to determine presence of a specific XID
 *	* startup process adding new known-assigned XIDs
 *	* startup process removing specific XIDs as transactions end
 *	* startup process pruning array when special WAL records arrive
 *
 * This data structure is known to be a hot spot during Hot Standby, so we
 * go to some lengths to make these operations as efficient and as concurrent
 * as possible.
 *
 * The XIDs are stored in an array in sorted order --- TransactionIdPrecedes
 * order, to be exact --- to allow binary search for specific XIDs.  Note:
 * in general TransactionIdPrecedes would not provide a total order, but
 * we know that the entries present at any instant should not extend across
 * a large enough fraction of XID space to wrap around (the master would
 * shut down for fear of XID wrap long before that happens).  So it's OK to
 * use TransactionIdPrecedes as a binary-search comparator.
 *
 * It's cheap to maintain the sortedness during insertions, since new known
 * XIDs are always reported in XID order; we just append them at the right.
 *
 * To keep individual deletions cheap, we need to allow gaps in the array.
 * This is implemented by marking array elements as valid or invalid using
 * the parallel boolean array KnownAssignedXidsValid[].  A deletion is done
 * by setting KnownAssignedXidsValid[i] to false, *without* clearing the
 * XID entry itself.  This preserves the property that the XID entries are
 * sorted, so we can do binary searches easily.  Periodically we compress
 * out the unused entries; that's much cheaper than having to compress the
 * array immediately on every deletion.
 *
 * The actually valid items in KnownAssignedXids[] and KnownAssignedXidsValid[]
 * are those with indexes tail <= i < head; items outside this subscript range
 * have unspecified contents.  When head reaches the end of the array, we
 * force compression of unused entries rather than wrapping around, since
 * allowing wraparound would greatly complicate the search logic.  We maintain
 * an explicit tail pointer so that pruning of old XIDs can be done without
 * immediately moving the array contents.  In most cases only a small fraction
 * of the array contains valid entries at any instant.
 *
 * Although only the startup process can ever change the KnownAssignedXids
 * data structure, we still need interlocking so that standby backends will
 * not observe invalid intermediate states.  The convention is that backends
 * must hold shared ProcArrayLock to examine the array.  To remove XIDs from
 * the array, the startup process must hold ProcArrayLock exclusively, for
 * the usual transactional reasons (compare commit/abort of a transaction
B
Bruce Momjian 已提交
2738
 * during normal running).	Compressing unused entries out of the array
2739 2740 2741 2742 2743 2744
 * likewise requires exclusive lock.  To add XIDs to the array, we just insert
 * them into slots to the right of the head pointer and then advance the head
 * pointer.  This wouldn't require any lock at all, except that on machines
 * with weak memory ordering we need to be careful that other processors
 * see the array element changes before they see the head pointer change.
 * We handle this by using a spinlock to protect reads and writes of the
B
Bruce Momjian 已提交
2745
 * head/tail pointers.	(We could dispense with the spinlock if we were to
2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769
 * create suitable memory access barrier primitives and use those instead.)
 * The spinlock must be taken to read or write the head/tail pointers unless
 * the caller holds ProcArrayLock exclusively.
 *
 * Algorithmic analysis:
 *
 * If we have a maximum of M slots, with N XIDs currently spread across
 * S elements then we have N <= S <= M always.
 *
 *	* Adding a new XID is O(1) and needs little locking (unless compression
 *		must happen)
 *	* Compressing the array is O(S) and requires exclusive lock
 *	* Removing an XID is O(logS) and requires exclusive lock
 *	* Taking a snapshot is O(S) and requires shared lock
 *	* Checking for an XID is O(logS) and requires shared lock
 *
 * In comparison, using a hash table for KnownAssignedXids would mean that
 * taking snapshots would be O(M). If we can maintain S << M then the
 * sorted array technique will deliver significantly faster snapshots.
 * If we try to keep S too small then we will spend too much time compressing,
 * so there is an optimal point for any workload mix. We use a heuristic to
 * decide when to compress the array, though trimming also helps reduce
 * frequency of compressing. The heuristic requires us to track the number of
 * currently valid XIDs in the array.
2770 2771
 */

2772

2773
/*
2774 2775 2776 2777 2778
 * Compress KnownAssignedXids by shifting valid data down to the start of the
 * array, removing any gaps.
 *
 * A compression step is forced if "force" is true, otherwise we do it
 * only if a heuristic indicates it's a good time to do it.
2779
 *
2780
 * Caller must hold ProcArrayLock in exclusive mode.
2781 2782
 */
static void
2783
KnownAssignedXidsCompress(bool force)
2784
{
2785 2786
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
B
Bruce Momjian 已提交
2787 2788 2789 2790
	int			head,
				tail;
	int			compress_index;
	int			i;
2791

2792 2793 2794 2795 2796
	/* no spinlock required since we hold ProcArrayLock exclusively */
	head = pArray->headKnownAssignedXids;
	tail = pArray->tailKnownAssignedXids;

	if (!force)
2797
	{
2798
		/*
B
Bruce Momjian 已提交
2799 2800
		 * If we can choose how much to compress, use a heuristic to avoid
		 * compressing too often or not often enough.
2801
		 *
B
Bruce Momjian 已提交
2802 2803 2804 2805 2806
		 * Heuristic is if we have a large enough current spread and less than
		 * 50% of the elements are currently in use, then compress. This
		 * should ensure we compress fairly infrequently. We could compress
		 * less often though the virtual array would spread out more and
		 * snapshots would become more expensive.
2807
		 */
B
Bruce Momjian 已提交
2808
		int			nelements = head - tail;
2809

2810 2811 2812 2813
		if (nelements < 4 * PROCARRAY_MAXPROCS ||
			nelements < 2 * pArray->numKnownAssignedXids)
			return;
	}
2814

2815
	/*
B
Bruce Momjian 已提交
2816 2817
	 * We compress the array by reading the valid values from tail to head,
	 * re-aligning data to 0th element.
2818 2819 2820 2821 2822
	 */
	compress_index = 0;
	for (i = tail; i < head; i++)
	{
		if (KnownAssignedXidsValid[i])
2823
		{
2824 2825 2826
			KnownAssignedXids[compress_index] = KnownAssignedXids[i];
			KnownAssignedXidsValid[compress_index] = true;
			compress_index++;
2827
		}
2828
	}
2829

2830 2831 2832
	pArray->tailKnownAssignedXids = 0;
	pArray->headKnownAssignedXids = compress_index;
}
2833

2834 2835 2836 2837 2838 2839 2840 2841
/*
 * Add xids into KnownAssignedXids at the head of the array.
 *
 * xids from from_xid to to_xid, inclusive, are added to the array.
 *
 * If exclusive_lock is true then caller already holds ProcArrayLock in
 * exclusive mode, so we need no extra locking here.  Else caller holds no
 * lock, so we need to be sure we maintain sufficient interlocks against
B
Bruce Momjian 已提交
2842
 * concurrent readers.	(Only the startup process ever calls this, so no need
2843 2844 2845 2846 2847 2848 2849 2850
 * to worry about concurrent writers.)
 */
static void
KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
					 bool exclusive_lock)
{
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
B
Bruce Momjian 已提交
2851 2852 2853
	TransactionId next_xid;
	int			head,
				tail;
2854 2855 2856 2857 2858 2859
	int			nxids;
	int			i;

	Assert(TransactionIdPrecedesOrEquals(from_xid, to_xid));

	/*
B
Bruce Momjian 已提交
2860 2861 2862
	 * Calculate how many array slots we'll need.  Normally this is cheap; in
	 * the unusual case where the XIDs cross the wrap point, we do it the hard
	 * way.
2863 2864 2865 2866 2867 2868 2869 2870
	 */
	if (to_xid >= from_xid)
		nxids = to_xid - from_xid + 1;
	else
	{
		nxids = 1;
		next_xid = from_xid;
		while (TransactionIdPrecedes(next_xid, to_xid))
2871
		{
2872 2873 2874 2875 2876 2877
			nxids++;
			TransactionIdAdvance(next_xid);
		}
	}

	/*
B
Bruce Momjian 已提交
2878 2879
	 * Since only the startup process modifies the head/tail pointers, we
	 * don't need a lock to read them here.
2880 2881 2882 2883 2884 2885 2886 2887
	 */
	head = pArray->headKnownAssignedXids;
	tail = pArray->tailKnownAssignedXids;

	Assert(head >= 0 && head <= pArray->maxKnownAssignedXids);
	Assert(tail >= 0 && tail < pArray->maxKnownAssignedXids);

	/*
B
Bruce Momjian 已提交
2888 2889 2890
	 * Verify that insertions occur in TransactionId sequence.	Note that even
	 * if the last existing element is marked invalid, it must still have a
	 * correctly sequenced XID value.
2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913
	 */
	if (head > tail &&
		TransactionIdFollowsOrEquals(KnownAssignedXids[head - 1], from_xid))
	{
		KnownAssignedXidsDisplay(LOG);
		elog(ERROR, "out-of-order XID insertion in KnownAssignedXids");
	}

	/*
	 * If our xids won't fit in the remaining space, compress out free space
	 */
	if (head + nxids > pArray->maxKnownAssignedXids)
	{
		/* must hold lock to compress */
		if (!exclusive_lock)
			LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

		KnownAssignedXidsCompress(true);

		head = pArray->headKnownAssignedXids;
		/* note: we no longer care about the tail pointer */

		if (!exclusive_lock)
2914
			LWLockRelease(ProcArrayLock);
2915 2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941

		/*
		 * If it still won't fit then we're out of memory
		 */
		if (head + nxids > pArray->maxKnownAssignedXids)
			elog(ERROR, "too many KnownAssignedXids");
	}

	/* Now we can insert the xids into the space starting at head */
	next_xid = from_xid;
	for (i = 0; i < nxids; i++)
	{
		KnownAssignedXids[head] = next_xid;
		KnownAssignedXidsValid[head] = true;
		TransactionIdAdvance(next_xid);
		head++;
	}

	/* Adjust count of number of valid entries */
	pArray->numKnownAssignedXids += nxids;

	/*
	 * Now update the head pointer.  We use a spinlock to protect this
	 * pointer, not because the update is likely to be non-atomic, but to
	 * ensure that other processors see the above array updates before they
	 * see the head pointer change.
	 *
B
Bruce Momjian 已提交
2942 2943
	 * If we're holding ProcArrayLock exclusively, there's no need to take the
	 * spinlock.
2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968
	 */
	if (exclusive_lock)
		pArray->headKnownAssignedXids = head;
	else
	{
		SpinLockAcquire(&pArray->known_assigned_xids_lck);
		pArray->headKnownAssignedXids = head;
		SpinLockRelease(&pArray->known_assigned_xids_lck);
	}
}

/*
 * KnownAssignedXidsSearch
 *
 * Searches KnownAssignedXids for a specific xid and optionally removes it.
 * Returns true if it was found, false if not.
 *
 * Caller must hold ProcArrayLock in shared or exclusive mode.
 * Exclusive lock must be held for remove = true.
 */
static bool
KnownAssignedXidsSearch(TransactionId xid, bool remove)
{
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
B
Bruce Momjian 已提交
2969 2970 2971 2972 2973
	int			first,
				last;
	int			head;
	int			tail;
	int			result_index = -1;
2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987 2988 2989 2990

	if (remove)
	{
		/* we hold ProcArrayLock exclusively, so no need for spinlock */
		tail = pArray->tailKnownAssignedXids;
		head = pArray->headKnownAssignedXids;
	}
	else
	{
		/* take spinlock to ensure we see up-to-date array contents */
		SpinLockAcquire(&pArray->known_assigned_xids_lck);
		tail = pArray->tailKnownAssignedXids;
		head = pArray->headKnownAssignedXids;
		SpinLockRelease(&pArray->known_assigned_xids_lck);
	}

	/*
B
Bruce Momjian 已提交
2991
	 * Standard binary search.	Note we can ignore the KnownAssignedXidsValid
2992 2993 2994 2995 2996 2997
	 * array here, since even invalid entries will contain sorted XIDs.
	 */
	first = tail;
	last = head - 1;
	while (first <= last)
	{
B
Bruce Momjian 已提交
2998 2999
		int			mid_index;
		TransactionId mid_xid;
3000 3001 3002 3003 3004 3005 3006 3007

		mid_index = (first + last) / 2;
		mid_xid = KnownAssignedXids[mid_index];

		if (xid == mid_xid)
		{
			result_index = mid_index;
			break;
3008
		}
3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023
		else if (TransactionIdPrecedes(xid, mid_xid))
			last = mid_index - 1;
		else
			first = mid_index + 1;
	}

	if (result_index < 0)
		return false;			/* not in array */

	if (!KnownAssignedXidsValid[result_index])
		return false;			/* in array, but invalid */

	if (remove)
	{
		KnownAssignedXidsValid[result_index] = false;
3024

3025 3026 3027 3028 3029 3030 3031 3032
		pArray->numKnownAssignedXids--;
		Assert(pArray->numKnownAssignedXids >= 0);

		/*
		 * If we're removing the tail element then advance tail pointer over
		 * any invalid elements.  This will speed future searches.
		 */
		if (result_index == tail)
3033
		{
3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046
			tail++;
			while (tail < head && !KnownAssignedXidsValid[tail])
				tail++;
			if (tail >= head)
			{
				/* Array is empty, so we can reset both pointers */
				pArray->headKnownAssignedXids = 0;
				pArray->tailKnownAssignedXids = 0;
			}
			else
			{
				pArray->tailKnownAssignedXids = tail;
			}
3047 3048
		}
	}
3049 3050

	return true;
3051 3052 3053
}

/*
3054
 * Is the specified XID present in KnownAssignedXids[]?
3055
 *
3056
 * Caller must hold ProcArrayLock in shared or exclusive mode.
3057 3058
 */
static bool
3059
KnownAssignedXidExists(TransactionId xid)
3060
{
3061
	Assert(TransactionIdIsValid(xid));
B
Bruce Momjian 已提交
3062

3063
	return KnownAssignedXidsSearch(xid, false);
3064 3065 3066
}

/*
3067
 * Remove the specified XID from KnownAssignedXids[].
3068
 *
3069
 * Caller must hold ProcArrayLock in exclusive mode.
3070 3071 3072 3073 3074 3075 3076 3077 3078
 */
static void
KnownAssignedXidsRemove(TransactionId xid)
{
	Assert(TransactionIdIsValid(xid));

	elog(trace_recovery(DEBUG4), "remove KnownAssignedXid %u", xid);

	/*
3079 3080
	 * Note: we cannot consider it an error to remove an XID that's not
	 * present.  We intentionally remove subxact IDs while processing
B
Bruce Momjian 已提交
3081 3082
	 * XLOG_XACT_ASSIGNMENT, to avoid array overflow.  Then those XIDs will be
	 * removed again when the top-level xact commits or aborts.
3083
	 *
B
Bruce Momjian 已提交
3084 3085 3086
	 * It might be possible to track such XIDs to distinguish this case from
	 * actual errors, but it would be complicated and probably not worth it.
	 * So, just ignore the search result.
3087
	 */
3088
	(void) KnownAssignedXidsSearch(xid, true);
3089 3090 3091
}

/*
3092 3093
 * KnownAssignedXidsRemoveTree
 *		Remove xid (if it's not InvalidTransactionId) and all the subxids.
3094
 *
3095
 * Caller must hold ProcArrayLock in exclusive mode.
3096
 */
3097 3098 3099
static void
KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
							TransactionId *subxids)
3100
{
B
Bruce Momjian 已提交
3101
	int			i;
3102

3103 3104 3105 3106 3107 3108 3109 3110
	if (TransactionIdIsValid(xid))
		KnownAssignedXidsRemove(xid);

	for (i = 0; i < nsubxids; i++)
		KnownAssignedXidsRemove(subxids[i]);

	/* Opportunistically compress the array */
	KnownAssignedXidsCompress(false);
3111 3112 3113
}

/*
3114 3115
 * Prune KnownAssignedXids up to, but *not* including xid. If xid is invalid
 * then clear the whole table.
3116
 *
3117
 * Caller must hold ProcArrayLock in exclusive mode.
3118
 */
3119 3120
static void
KnownAssignedXidsRemovePreceding(TransactionId removeXid)
3121
{
3122 3123
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
B
Bruce Momjian 已提交
3124 3125 3126 3127
	int			count = 0;
	int			head,
				tail,
				i;
3128

3129
	if (!TransactionIdIsValid(removeXid))
3130
	{
3131 3132 3133 3134 3135
		elog(trace_recovery(DEBUG4), "removing all KnownAssignedXids");
		pArray->numKnownAssignedXids = 0;
		pArray->headKnownAssignedXids = pArray->tailKnownAssignedXids = 0;
		return;
	}
3136

3137
	elog(trace_recovery(DEBUG4), "prune KnownAssignedXids to %u", removeXid);
3138

3139
	/*
B
Bruce Momjian 已提交
3140 3141
	 * Mark entries invalid starting at the tail.  Since array is sorted, we
	 * can stop as soon as we reach a entry >= removeXid.
3142 3143 3144 3145 3146 3147 3148 3149
	 */
	tail = pArray->tailKnownAssignedXids;
	head = pArray->headKnownAssignedXids;

	for (i = tail; i < head; i++)
	{
		if (KnownAssignedXidsValid[i])
		{
B
Bruce Momjian 已提交
3150
			TransactionId knownXid = KnownAssignedXids[i];
3151 3152 3153 3154 3155 3156 3157 3158 3159 3160

			if (TransactionIdFollowsOrEquals(knownXid, removeXid))
				break;

			if (!StandbyTransactionIdIsPrepared(knownXid))
			{
				KnownAssignedXidsValid[i] = false;
				count++;
			}
		}
3161 3162
	}

3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186
	pArray->numKnownAssignedXids -= count;
	Assert(pArray->numKnownAssignedXids >= 0);

	/*
	 * Advance the tail pointer if we've marked the tail item invalid.
	 */
	for (i = tail; i < head; i++)
	{
		if (KnownAssignedXidsValid[i])
			break;
	}
	if (i >= head)
	{
		/* Array is empty, so we can reset both pointers */
		pArray->headKnownAssignedXids = 0;
		pArray->tailKnownAssignedXids = 0;
	}
	else
	{
		pArray->tailKnownAssignedXids = i;
	}

	/* Opportunistically compress the array */
	KnownAssignedXidsCompress(false);
3187 3188 3189
}

/*
3190 3191 3192 3193 3194
 * KnownAssignedXidsGet - Get an array of xids by scanning KnownAssignedXids.
 * We filter out anything >= xmax.
 *
 * Returns the number of XIDs stored into xarray[].  Caller is responsible
 * that array is large enough.
3195
 *
3196
 * Caller must hold ProcArrayLock in (at least) shared mode.
3197
 */
3198 3199
static int
KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax)
3200
{
3201
	TransactionId xtmp = InvalidTransactionId;
3202

3203 3204
	return KnownAssignedXidsGetAndSetXmin(xarray, &xtmp, xmax);
}
3205

3206 3207 3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218
/*
 * KnownAssignedXidsGetAndSetXmin - as KnownAssignedXidsGet, plus
 * we reduce *xmin to the lowest xid value seen if not already lower.
 *
 * Caller must hold ProcArrayLock in (at least) shared mode.
 */
static int
KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
							   TransactionId xmax)
{
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
	int			count = 0;
B
Bruce Momjian 已提交
3219 3220
	int			head,
				tail;
3221 3222 3223
	int			i;

	/*
B
Bruce Momjian 已提交
3224 3225 3226 3227 3228
	 * Fetch head just once, since it may change while we loop. We can stop
	 * once we reach the initially seen head, since we are certain that an xid
	 * cannot enter and then leave the array while we hold ProcArrayLock.  We
	 * might miss newly-added xids, but they should be >= xmax so irrelevant
	 * anyway.
3229 3230 3231 3232
	 *
	 * Must take spinlock to ensure we see up-to-date array contents.
	 */
	SpinLockAcquire(&pArray->known_assigned_xids_lck);
3233 3234
	tail = pArray->tailKnownAssignedXids;
	head = pArray->headKnownAssignedXids;
3235 3236 3237 3238 3239 3240
	SpinLockRelease(&pArray->known_assigned_xids_lck);

	for (i = tail; i < head; i++)
	{
		/* Skip any gaps in the array */
		if (KnownAssignedXidsValid[i])
3241
		{
3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255
			TransactionId knownXid = KnownAssignedXids[i];

			/*
			 * Update xmin if required.  Only the first XID need be checked,
			 * since the array is sorted.
			 */
			if (count == 0 &&
				TransactionIdPrecedes(knownXid, *xmin))
				*xmin = knownXid;

			/*
			 * Filter out anything >= xmax, again relying on sorted property
			 * of array.
			 */
3256 3257
			if (TransactionIdIsValid(xmax) &&
				TransactionIdFollowsOrEquals(knownXid, xmax))
3258 3259 3260 3261
				break;

			/* Add knownXid into output array */
			xarray[count++] = knownXid;
3262 3263
		}
	}
3264 3265

	return count;
3266 3267
}

3268 3269 3270 3271 3272
/*
 * Get oldest XID in the KnownAssignedXids array, or InvalidTransactionId
 * if nothing there.
 */
static TransactionId
3273 3274 3275 3276 3277 3278 3279 3280 3281 3282 3283 3284 3285 3286 3287 3288 3289 3290 3291 3292 3293 3294 3295 3296 3297 3298
KnownAssignedXidsGetOldestXmin(void)
{
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
	int			head,
				tail;
	int			i;

	/*
	 * Fetch head just once, since it may change while we loop.
	 */
	SpinLockAcquire(&pArray->known_assigned_xids_lck);
	tail = pArray->tailKnownAssignedXids;
	head = pArray->headKnownAssignedXids;
	SpinLockRelease(&pArray->known_assigned_xids_lck);

	for (i = tail; i < head; i++)
	{
		/* Skip any gaps in the array */
		if (KnownAssignedXidsValid[i])
			return KnownAssignedXids[i];
	}

	return InvalidTransactionId;
}

3299 3300 3301
/*
 * Display KnownAssignedXids to provide debug trail
 *
3302 3303 3304 3305 3306 3307
 * Currently this is only called within startup process, so we need no
 * special locking.
 *
 * Note this is pretty expensive, and much of the expense will be incurred
 * even if the elog message will get discarded.  It's not currently called
 * in any performance-critical places, however, so no need to be tenser.
3308
 */
T
Tom Lane 已提交
3309
static void
3310 3311
KnownAssignedXidsDisplay(int trace_level)
{
3312 3313
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
B
Bruce Momjian 已提交
3314 3315 3316 3317 3318
	StringInfoData buf;
	int			head,
				tail,
				i;
	int			nxids = 0;
3319

3320 3321
	tail = pArray->tailKnownAssignedXids;
	head = pArray->headKnownAssignedXids;
3322 3323 3324

	initStringInfo(&buf);

3325 3326 3327 3328 3329
	for (i = tail; i < head; i++)
	{
		if (KnownAssignedXidsValid[i])
		{
			nxids++;
3330
			appendStringInfo(&buf, "[%d]=%u ", i, KnownAssignedXids[i]);
3331 3332
		}
	}
3333

3334
	elog(trace_level, "%d KnownAssignedXids (num=%d tail=%d head=%d) %s",
3335 3336 3337 3338 3339
		 nxids,
		 pArray->numKnownAssignedXids,
		 pArray->tailKnownAssignedXids,
		 pArray->headKnownAssignedXids,
		 buf.data);
3340 3341 3342

	pfree(buf.data);
}