procarray.c 75.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12
/*-------------------------------------------------------------------------
 *
 * procarray.c
 *	  POSTGRES process array code.
 *
 *
 * This module maintains an unsorted array of the PGPROC structures for all
 * active backends.  Although there are several uses for this, the principal
 * one is as a means of determining the set of currently running transactions.
 *
 * Because of various subtle race conditions it is critical that a backend
 * hold the correct locks while setting or clearing its MyProc->xid field.
13
 * See notes in src/backend/access/transam/README.
14 15 16
 *
 * The process array now also includes PGPROC structures representing
 * prepared transactions.  The xid and subxids fields of these are valid,
17 18
 * as are the myProcLocks lists.  They can be distinguished from regular
 * backend PGPROCs at need by checking for pid == 0.
B
Bruce Momjian 已提交
19
 *
20 21 22 23 24 25 26 27 28 29 30 31 32 33
 * During recovery, we also keep a list of XIDs representing transactions
 * that are known to be running at current point in WAL recovery. This
 * list is kept in the KnownAssignedXids array, and updated by watching
 * the sequence of arriving xids. This is very important because if we leave
 * those xids out of the snapshot then they will appear to be already complete.
 * Later, when they have actually completed this could lead to confusion as to
 * whether those xids are visible or not, blowing a huge hole in MVCC.
 * We need 'em.
 *
 * It is theoretically possible for a FATAL error to explode before writing
 * an abort record. This could tie up KnownAssignedXids indefinitely, so
 * we prune the array when a valid list of running xids arrives. These quirks,
 * if they do ever exist in reality will not effect the correctness of
 * snapshots.
34
 *
35
 * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
36 37 38 39
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
40
 *	  $PostgreSQL: pgsql/src/backend/storage/ipc/procarray.c,v 1.65 2010/04/21 19:08:14 sriggs Exp $
41 42 43 44 45
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

46 47
#include <signal.h>

48
#include "access/clog.h"
49
#include "access/subtrans.h"
50 51
#include "access/transam.h"
#include "access/xact.h"
52
#include "access/twophase.h"
53 54
#include "miscadmin.h"
#include "storage/procarray.h"
55 56
#include "storage/standby.h"
#include "utils/builtins.h"
57
#include "utils/snapmgr.h"
58

B
Bruce Momjian 已提交
59
static RunningTransactionsData CurrentRunningXactsData;
60 61 62 63 64 65 66

/* Our shared memory area */
typedef struct ProcArrayStruct
{
	int			numProcs;		/* number of valid procs entries */
	int			maxProcs;		/* allocated size of procs array */

B
Bruce Momjian 已提交
67 68 69 70 71
	int			numKnownAssignedXids;	/* current number of known assigned
										 * xids */
	int			maxKnownAssignedXids;	/* allocated size of known assigned
										 * xids */

72 73 74 75
	/*
	 * Highest subxid that overflowed KnownAssignedXids array. Similar to
	 * overflowing cached subxids in PGPROC entries.
	 */
B
Bruce Momjian 已提交
76
	TransactionId lastOverflowedXid;
77

78
	/*
B
Bruce Momjian 已提交
79 80
	 * We declare procs[] as 1 entry because C wants a fixed-size array, but
	 * actually it is maxProcs entries long.
81 82 83 84 85 86
	 */
	PGPROC	   *procs[1];		/* VARIABLE LENGTH ARRAY */
} ProcArrayStruct;

static ProcArrayStruct *procArray;

87 88 89 90
/*
 * Bookkeeping for tracking emulated transactions in recovery
 */
static HTAB *KnownAssignedXidsHash;
B
Bruce Momjian 已提交
91
static TransactionId latestObservedXid = InvalidTransactionId;
92 93 94 95 96 97 98 99 100 101 102 103 104

/*
 * If we're in STANDBY_SNAPSHOT_PENDING state, standbySnapshotPendingXmin is
 * the highest xid that might still be running that we don't have in
 * KnownAssignedXids.
 */
static TransactionId standbySnapshotPendingXmin;

/*
 * Oldest transaction still running according to the running-xacts snapshot
 * we initialized standby mode from.
 */
static TransactionId snapshotOldestActiveXid;
105 106 107 108 109

#ifdef XIDCACHE_DEBUG

/* counters for XidCache measurement */
static long xc_by_recent_xmin = 0;
110
static long xc_by_known_xact = 0;
111
static long xc_by_my_xact = 0;
112
static long xc_by_latest_xid = 0;
113 114
static long xc_by_main_xid = 0;
static long xc_by_child_xid = 0;
115
static long xc_no_overflow = 0;
116 117 118
static long xc_slow_answer = 0;

#define xc_by_recent_xmin_inc()		(xc_by_recent_xmin++)
119
#define xc_by_known_xact_inc()		(xc_by_known_xact++)
120
#define xc_by_my_xact_inc()			(xc_by_my_xact++)
121
#define xc_by_latest_xid_inc()		(xc_by_latest_xid++)
122 123
#define xc_by_main_xid_inc()		(xc_by_main_xid++)
#define xc_by_child_xid_inc()		(xc_by_child_xid++)
124
#define xc_no_overflow_inc()		(xc_no_overflow++)
125 126 127 128 129 130
#define xc_slow_answer_inc()		(xc_slow_answer++)

static void DisplayXidCache(void);
#else							/* !XIDCACHE_DEBUG */

#define xc_by_recent_xmin_inc()		((void) 0)
131
#define xc_by_known_xact_inc()		((void) 0)
132
#define xc_by_my_xact_inc()			((void) 0)
133
#define xc_by_latest_xid_inc()		((void) 0)
134 135
#define xc_by_main_xid_inc()		((void) 0)
#define xc_by_child_xid_inc()		((void) 0)
136
#define xc_no_overflow_inc()		((void) 0)
137 138 139
#define xc_slow_answer_inc()		((void) 0)
#endif   /* XIDCACHE_DEBUG */

140
/* Primitives for KnownAssignedXids array handling for standby */
B
Bruce Momjian 已提交
141 142 143
static int	KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax);
static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
							   TransactionId xmax);
144 145 146 147 148
static bool KnownAssignedXidsExist(TransactionId xid);
static void KnownAssignedXidsAdd(TransactionId *xids, int nxids);
static void KnownAssignedXidsRemove(TransactionId xid);
static void KnownAssignedXidsRemoveMany(TransactionId xid, bool keepPreparedXacts);
static void KnownAssignedXidsDisplay(int trace_level);
149 150 151 152

/*
 * Report shared-memory space needed by CreateSharedProcArray.
 */
153
Size
154
ProcArrayShmemSize(void)
155
{
156 157 158
	Size		size;

	size = offsetof(ProcArrayStruct, procs);
159 160 161 162 163 164

	/* Normal processing - MyProc slots */
#define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts)
	size = add_size(size, mul_size(sizeof(PGPROC *), PROCARRAY_MAXPROCS));

	/*
165 166 167 168 169 170 171
	 * During recovery processing we have a data structure called
	 * KnownAssignedXids, created in shared memory. Local data structures are
	 * also created in various backends during GetSnapshotData(),
	 * TransactionIdIsInProgress() and GetRunningTransactionData(). All of the
	 * main structures created in those functions must be identically sized,
	 * since we may at times copy the whole of the data structures around. We
	 * refer to this size as TOTAL_MAX_CACHED_SUBXIDS.
172 173 174
	 */
#define TOTAL_MAX_CACHED_SUBXIDS ((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)
	if (XLogRequestRecoveryConnections)
175 176 177
		size = add_size(size,
						hash_estimate_size(TOTAL_MAX_CACHED_SUBXIDS,
										   sizeof(TransactionId)));
178 179

	return size;
180 181 182 183 184 185
}

/*
 * Initialize the shared PGPROC array during postmaster startup.
 */
void
186
CreateSharedProcArray(void)
187 188 189 190 191
{
	bool		found;

	/* Create or attach to the ProcArray shared structure */
	procArray = (ProcArrayStruct *)
192
		ShmemInitStruct("Proc Array",
193 194
						mul_size(sizeof(PGPROC *), PROCARRAY_MAXPROCS),
						&found);
195 196 197 198 199 200

	if (!found)
	{
		/*
		 * We're the first - initialize.
		 */
201
		/* Normal processing */
202
		procArray->numProcs = 0;
203
		procArray->maxProcs = PROCARRAY_MAXPROCS;
204 205 206 207
		procArray->numKnownAssignedXids = 0;
		procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS;
		procArray->lastOverflowedXid = InvalidTransactionId;
	}
208

209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
	if (XLogRequestRecoveryConnections)
	{
		/* Create or attach to the KnownAssignedXids hash table */
		HASHCTL		info;

		MemSet(&info, 0, sizeof(info));
		info.keysize = sizeof(TransactionId);
		info.entrysize = sizeof(TransactionId);
		info.hash = tag_hash;

		KnownAssignedXidsHash = ShmemInitHash("KnownAssignedXids Hash",
											  TOTAL_MAX_CACHED_SUBXIDS,
											  TOTAL_MAX_CACHED_SUBXIDS,
											  &info,
											  HASH_ELEM | HASH_FUNCTION);
		if (!KnownAssignedXidsHash)
			elog(FATAL, "could not initialize known assigned xids hash table");
226 227 228 229
	}
}

/*
230
 * Add the specified PGPROC to the shared array.
231 232
 */
void
233
ProcArrayAdd(PGPROC *proc)
234 235 236 237 238 239 240 241
{
	ProcArrayStruct *arrayP = procArray;

	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

	if (arrayP->numProcs >= arrayP->maxProcs)
	{
		/*
B
Bruce Momjian 已提交
242 243 244
		 * Ooops, no room.	(This really shouldn't happen, since there is a
		 * fixed supply of PGPROC structs too, and so we should have failed
		 * earlier.)
245 246 247 248 249 250 251
		 */
		LWLockRelease(ProcArrayLock);
		ereport(FATAL,
				(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
				 errmsg("sorry, too many clients already")));
	}

252
	arrayP->procs[arrayP->numProcs] = proc;
253 254 255 256 257 258
	arrayP->numProcs++;

	LWLockRelease(ProcArrayLock);
}

/*
259
 * Remove the specified PGPROC from the shared array.
260 261 262 263 264 265 266
 *
 * When latestXid is a valid XID, we are removing a live 2PC gxact from the
 * array, and thus causing it to appear as "not running" anymore.  In this
 * case we must advance latestCompletedXid.  (This is essentially the same
 * as ProcArrayEndTransaction followed by removal of the PGPROC, but we take
 * the ProcArrayLock only once, and don't damage the content of the PGPROC;
 * twophase.c depends on the latter.)
267 268
 */
void
269
ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
270 271 272 273 274
{
	ProcArrayStruct *arrayP = procArray;
	int			index;

#ifdef XIDCACHE_DEBUG
275 276 277
	/* dump stats at backend shutdown, but not prepared-xact end */
	if (proc->pid != 0)
		DisplayXidCache();
278 279 280 281
#endif

	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

282 283 284 285 286 287 288 289 290 291 292 293 294 295 296
	if (TransactionIdIsValid(latestXid))
	{
		Assert(TransactionIdIsValid(proc->xid));

		/* Advance global latestCompletedXid while holding the lock */
		if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
								  latestXid))
			ShmemVariableCache->latestCompletedXid = latestXid;
	}
	else
	{
		/* Shouldn't be trying to remove a live transaction here */
		Assert(!TransactionIdIsValid(proc->xid));
	}

297 298
	for (index = 0; index < arrayP->numProcs; index++)
	{
299
		if (arrayP->procs[index] == proc)
300 301
		{
			arrayP->procs[index] = arrayP->procs[arrayP->numProcs - 1];
302
			arrayP->procs[arrayP->numProcs - 1] = NULL; /* for debugging */
303 304 305 306 307 308 309 310 311
			arrayP->numProcs--;
			LWLockRelease(ProcArrayLock);
			return;
		}
	}

	/* Ooops */
	LWLockRelease(ProcArrayLock);

312
	elog(LOG, "failed to find proc %p in ProcArray", proc);
313 314 315
}


316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
/*
 * ProcArrayEndTransaction -- mark a transaction as no longer running
 *
 * This is used interchangeably for commit and abort cases.  The transaction
 * commit/abort must already be reported to WAL and pg_clog.
 *
 * proc is currently always MyProc, but we pass it explicitly for flexibility.
 * latestXid is the latest Xid among the transaction's main XID and
 * subtransactions, or InvalidTransactionId if it has no XID.  (We must ask
 * the caller to pass latestXid, instead of computing it from the PGPROC's
 * contents, because the subxid information in the PGPROC might be
 * incomplete.)
 */
void
ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
{
	if (TransactionIdIsValid(latestXid))
	{
		/*
B
Bruce Momjian 已提交
335 336 337
		 * We must lock ProcArrayLock while clearing proc->xid, so that we do
		 * not exit the set of "running" transactions while someone else is
		 * taking a snapshot.  See discussion in
338 339 340 341 342 343 344 345 346
		 * src/backend/access/transam/README.
		 */
		Assert(TransactionIdIsValid(proc->xid));

		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

		proc->xid = InvalidTransactionId;
		proc->lxid = InvalidLocalTransactionId;
		proc->xmin = InvalidTransactionId;
347 348
		/* must be cleared with xid/xmin: */
		proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
B
Bruce Momjian 已提交
349
		proc->inCommit = false; /* be sure this is cleared in abort */
350
		proc->recoveryConflictPending = false;
351 352 353 354 355 356 357 358 359 360 361 362 363 364 365

		/* Clear the subtransaction-XID cache too while holding the lock */
		proc->subxids.nxids = 0;
		proc->subxids.overflowed = false;

		/* Also advance global latestCompletedXid while holding the lock */
		if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
								  latestXid))
			ShmemVariableCache->latestCompletedXid = latestXid;

		LWLockRelease(ProcArrayLock);
	}
	else
	{
		/*
B
Bruce Momjian 已提交
366 367 368
		 * If we have no XID, we don't need to lock, since we won't affect
		 * anyone else's calculation of a snapshot.  We might change their
		 * estimate of global xmin, but that's OK.
369 370 371 372 373
		 */
		Assert(!TransactionIdIsValid(proc->xid));

		proc->lxid = InvalidLocalTransactionId;
		proc->xmin = InvalidTransactionId;
374 375
		/* must be cleared with xid/xmin: */
		proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
B
Bruce Momjian 已提交
376
		proc->inCommit = false; /* be sure this is cleared in abort */
377
		proc->recoveryConflictPending = false;
378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397

		Assert(proc->subxids.nxids == 0);
		Assert(proc->subxids.overflowed == false);
	}
}


/*
 * ProcArrayClearTransaction -- clear the transaction fields
 *
 * This is used after successfully preparing a 2-phase transaction.  We are
 * not actually reporting the transaction's XID as no longer running --- it
 * will still appear as running because the 2PC's gxact is in the ProcArray
 * too.  We just have to clear out our own PGPROC.
 */
void
ProcArrayClearTransaction(PGPROC *proc)
{
	/*
	 * We can skip locking ProcArrayLock here, because this action does not
B
Bruce Momjian 已提交
398 399
	 * actually change anyone's view of the set of running XIDs: our entry is
	 * duplicate with the gxact that has already been inserted into the
400 401 402 403 404
	 * ProcArray.
	 */
	proc->xid = InvalidTransactionId;
	proc->lxid = InvalidLocalTransactionId;
	proc->xmin = InvalidTransactionId;
405
	proc->recoveryConflictPending = false;
406 407 408 409

	/* redundant, but just in case */
	proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
	proc->inCommit = false;
410 411 412 413 414 415

	/* Clear the subtransaction-XID cache too */
	proc->subxids.nxids = 0;
	proc->subxids.overflowed = false;
}

416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441
void
ProcArrayInitRecoveryInfo(TransactionId oldestActiveXid)
{
	snapshotOldestActiveXid = oldestActiveXid;
}

/*
 * ProcArrayApplyRecoveryInfo -- apply recovery info about xids
 *
 * Takes us through 3 states: Uninitialized, Pending and Ready.
 * Normal case is to go all the way to Ready straight away, though there
 * are atypical cases where we need to take it in steps.
 *
 * Use the data about running transactions on master to create the initial
 * state of KnownAssignedXids. We also these records to regularly prune
 * KnownAssignedXids because we know it is possible that some transactions
 * with FATAL errors do not write abort records, which could cause eventual
 * overflow.
 *
 * Only used during recovery. Notice the signature is very similar to a
 * _redo function and its difficult to decide exactly where this code should
 * reside.
 */
void
ProcArrayApplyRecoveryInfo(RunningTransactions running)
{
B
Bruce Momjian 已提交
442 443 444
	int			xid_index;		/* main loop */
	TransactionId *xids;
	int			nxids;
445 446 447 448 449 450

	Assert(standbyState >= STANDBY_INITIALIZED);

	/*
	 * Remove stale transactions, if any.
	 */
451
	Assert(TransactionIdIsValid(running->oldestRunningXid));
452 453 454 455 456 457 458 459 460 461
	ExpireOldKnownAssignedTransactionIds(running->oldestRunningXid);
	StandbyReleaseOldLocks(running->oldestRunningXid);

	/*
	 * If our snapshot is already valid, nothing else to do...
	 */
	if (standbyState == STANDBY_SNAPSHOT_READY)
		return;

	/*
B
Bruce Momjian 已提交
462 463 464 465 466 467 468 469
	 * If our initial RunningXactData had an overflowed snapshot then we knew
	 * we were missing some subxids from our snapshot. We can use this data as
	 * an initial snapshot, but we cannot yet mark it valid. We know that the
	 * missing subxids are equal to or earlier than nextXid. After we
	 * initialise we continue to apply changes during recovery, so once the
	 * oldestRunningXid is later than the nextXid from the initial snapshot we
	 * know that we no longer have missing information and can mark the
	 * snapshot as valid.
470 471 472 473 474 475 476 477
	 */
	if (standbyState == STANDBY_SNAPSHOT_PENDING)
	{
		if (TransactionIdPrecedes(standbySnapshotPendingXmin,
								  running->oldestRunningXid))
		{
			standbyState = STANDBY_SNAPSHOT_READY;
			elog(trace_recovery(DEBUG2),
B
Bruce Momjian 已提交
478
				 "running xact data now proven complete");
479
			elog(trace_recovery(DEBUG2),
B
Bruce Momjian 已提交
480
				 "recovery snapshots are now enabled");
481 482 483 484 485 486 487 488 489 490 491
		}
		return;
	}

	/*
	 * OK, we need to initialise from the RunningXactData record
	 */
	latestObservedXid = running->nextXid;
	TransactionIdRetreat(latestObservedXid);

	/*
B
Bruce Momjian 已提交
492 493 494
	 * If the snapshot overflowed, then we still initialise with what we know,
	 * but the recovery snapshot isn't fully valid yet because we know there
	 * are some subxids missing (ergo we don't know which ones)
495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514
	 */
	if (!running->subxid_overflow)
	{
		standbyState = STANDBY_SNAPSHOT_READY;
		standbySnapshotPendingXmin = InvalidTransactionId;
	}
	else
	{
		standbyState = STANDBY_SNAPSHOT_PENDING;
		standbySnapshotPendingXmin = latestObservedXid;
		ereport(LOG,
				(errmsg("consistent state delayed because recovery snapshot incomplete")));
	}

	nxids = running->xcnt;
	xids = running->xids;

	KnownAssignedXidsDisplay(trace_recovery(DEBUG3));

	/*
B
Bruce Momjian 已提交
515 516 517 518 519 520
	 * Scan through the incoming array of RunningXacts and collect xids. We
	 * don't use SubtransSetParent because it doesn't matter yet. If we aren't
	 * overflowed then all xids will fit in snapshot and so we don't need
	 * subtrans. If we later overflow, an xid assignment record will add xids
	 * to subtrans. If RunningXacts is overflowed then we don't have enough
	 * information to correctly update subtrans anyway.
521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569
	 */

	/*
	 * Nobody else is running yet, but take locks anyhow
	 */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

	/* Reset latestCompletedXid */
	ShmemVariableCache->latestCompletedXid = running->nextXid;
	TransactionIdRetreat(ShmemVariableCache->latestCompletedXid);

	/*
	 * Add our new xids into the array
	 */
	for (xid_index = 0; xid_index < running->xcnt; xid_index++)
	{
		TransactionId xid = running->xids[xid_index];

		/*
		 * The running-xacts snapshot can contain xids that did finish between
		 * when the snapshot was taken and when it was written to WAL. Such
		 * transactions are not running anymore, so ignore them.
		 */
		if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
			continue;

		KnownAssignedXidsAdd(&xid, 1);
	}

	KnownAssignedXidsDisplay(trace_recovery(DEBUG3));

	/*
	 * Update lastOverflowedXid if the snapshot had overflown. We don't know
	 * the exact value for this, so conservatively assume that it's nextXid-1
	 */
	if (running->subxid_overflow &&
		TransactionIdFollows(latestObservedXid, procArray->lastOverflowedXid))
		procArray->lastOverflowedXid = latestObservedXid;
	else if (TransactionIdFollows(running->oldestRunningXid,
								  procArray->lastOverflowedXid))
		procArray->lastOverflowedXid = InvalidTransactionId;

	LWLockRelease(ProcArrayLock);

	/* nextXid must be beyond any observed xid */
	if (TransactionIdFollows(running->nextXid, ShmemVariableCache->nextXid))
		ShmemVariableCache->nextXid = running->nextXid;

	elog(trace_recovery(DEBUG2),
B
Bruce Momjian 已提交
570
		 "running transaction data initialized");
571 572
	if (standbyState == STANDBY_SNAPSHOT_READY)
		elog(trace_recovery(DEBUG2),
B
Bruce Momjian 已提交
573
			 "recovery snapshots are now enabled");
574 575 576 577 578 579 580
}

void
ProcArrayApplyXidAssignment(TransactionId topxid,
							int nsubxids, TransactionId *subxids)
{
	TransactionId max_xid;
B
Bruce Momjian 已提交
581
	int			i;
582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598

	if (standbyState < STANDBY_SNAPSHOT_PENDING)
		return;

	max_xid = TransactionIdLatest(topxid, nsubxids, subxids);

	/*
	 * Mark all the subtransactions as observed.
	 *
	 * NOTE: This will fail if the subxid contains too many previously
	 * unobserved xids to fit into known-assigned-xids. That shouldn't happen
	 * as the code stands, because xid-assignment records should never contain
	 * more than PGPROC_MAX_CACHED_SUBXIDS entries.
	 */
	RecordKnownAssignedTransactionIds(max_xid);

	/*
B
Bruce Momjian 已提交
599 600 601 602 603 604 605 606 607
	 * Notice that we update pg_subtrans with the top-level xid, rather than
	 * the parent xid. This is a difference between normal processing and
	 * recovery, yet is still correct in all cases. The reason is that
	 * subtransaction commit is not marked in clog until commit processing, so
	 * all aborted subtransactions have already been clearly marked in clog.
	 * As a result we are able to refer directly to the top-level
	 * transaction's state rather than skipping through all the intermediate
	 * states in the subtransaction tree. This should be the first time we
	 * have attempted to SubTransSetParent().
608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630
	 */
	for (i = 0; i < nsubxids; i++)
		SubTransSetParent(subxids[i], topxid, false);

	/*
	 * Uses same locking as transaction commit
	 */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

	/*
	 * Remove from known-assigned-xacts.
	 */
	for (i = 0; i < nsubxids; i++)
		KnownAssignedXidsRemove(subxids[i]);

	/*
	 * Advance lastOverflowedXid when required.
	 */
	if (TransactionIdPrecedes(procArray->lastOverflowedXid, max_xid))
		procArray->lastOverflowedXid = max_xid;

	LWLockRelease(ProcArrayLock);
}
631

632 633 634
/*
 * TransactionIdIsInProgress -- is given transaction running in some backend
 *
635 636
 * Aside from some shortcuts such as checking RecentXmin and our own Xid,
 * there are three possibilities for finding a running transaction:
637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656
 *
 * 1. the given Xid is a main transaction Id.  We will find this out cheaply
 * by looking at the PGPROC struct for each backend.
 *
 * 2. the given Xid is one of the cached subxact Xids in the PGPROC array.
 * We can find this out cheaply too.
 *
 * 3. Search the SubTrans tree to find the Xid's topmost parent, and then
 * see if that is running according to PGPROC.	This is the slowest, but
 * sadly it has to be done always if the other two failed, unless we see
 * that the cached subxact sets are complete (none have overflowed).
 *
 * ProcArrayLock has to be held while we do 1 and 2.  If we save the top Xids
 * while doing 1, we can release the ProcArrayLock while we do 3.  This buys
 * back some concurrency (we can't retrieve the main Xids from PGPROC again
 * anyway; see GetNewTransactionId).
 */
bool
TransactionIdIsInProgress(TransactionId xid)
{
657 658
	static TransactionId *xids = NULL;
	int			nxids = 0;
659
	ProcArrayStruct *arrayP = procArray;
660
	TransactionId topxid;
661 662 663 664
	int			i,
				j;

	/*
B
Bruce Momjian 已提交
665
	 * Don't bother checking a transaction older than RecentXmin; it could not
B
Bruce Momjian 已提交
666 667 668
	 * possibly still be running.  (Note: in particular, this guarantees that
	 * we reject InvalidTransactionId, FrozenTransactionId, etc as not
	 * running.)
669 670 671 672 673 674 675
	 */
	if (TransactionIdPrecedes(xid, RecentXmin))
	{
		xc_by_recent_xmin_inc();
		return false;
	}

676 677 678 679 680 681 682 683 684 685 686
	/*
	 * We may have just checked the status of this transaction, so if it is
	 * already known to be completed, we can fall out without any access to
	 * shared memory.
	 */
	if (TransactionIdIsKnownCompleted(xid))
	{
		xc_by_known_xact_inc();
		return false;
	}

687 688 689 690 691 692 693 694 695 696 697
	/*
	 * Also, we can handle our own transaction (and subtransactions) without
	 * any access to shared memory.
	 */
	if (TransactionIdIsCurrentTransactionId(xid))
	{
		xc_by_my_xact_inc();
		return true;
	}

	/*
B
Bruce Momjian 已提交
698 699
	 * If not first time through, get workspace to remember main XIDs in. We
	 * malloc it permanently to avoid repeated palloc/pfree overhead.
700 701 702
	 */
	if (xids == NULL)
	{
703
		/*
B
Bruce Momjian 已提交
704 705 706
		 * In hot standby mode, reserve enough space to hold all xids in the
		 * known-assigned list. If we later finish recovery, we no longer need
		 * the bigger array, but we don't bother to shrink it.
707
		 */
B
Bruce Momjian 已提交
708 709
		int			maxxids = RecoveryInProgress() ?
		arrayP->maxProcs : TOTAL_MAX_CACHED_SUBXIDS;
710 711

		xids = (TransactionId *) malloc(maxxids * sizeof(TransactionId));
712 713 714 715 716
		if (xids == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
	}
717 718 719

	LWLockAcquire(ProcArrayLock, LW_SHARED);

720 721 722 723 724 725 726 727 728 729 730 731
	/*
	 * Now that we have the lock, we can check latestCompletedXid; if the
	 * target Xid is after that, it's surely still running.
	 */
	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, xid))
	{
		LWLockRelease(ProcArrayLock);
		xc_by_latest_xid_inc();
		return true;
	}

	/* No shortcuts, gotta grovel through the array */
732 733
	for (i = 0; i < arrayP->numProcs; i++)
	{
B
Bruce Momjian 已提交
734
		volatile PGPROC *proc = arrayP->procs[i];
735 736 737 738 739
		TransactionId pxid;

		/* Ignore my own proc --- dealt with it above */
		if (proc == MyProc)
			continue;
740 741

		/* Fetch xid just once - see GetNewTransactionId */
742
		pxid = proc->xid;
743 744 745 746 747 748 749 750 751

		if (!TransactionIdIsValid(pxid))
			continue;

		/*
		 * Step 1: check the main Xid
		 */
		if (TransactionIdEquals(pxid, xid))
		{
752
			LWLockRelease(ProcArrayLock);
753
			xc_by_main_xid_inc();
754
			return true;
755 756 757
		}

		/*
B
Bruce Momjian 已提交
758 759
		 * We can ignore main Xids that are younger than the target Xid, since
		 * the target could not possibly be their child.
760 761 762 763 764 765 766 767 768 769 770 771 772 773
		 */
		if (TransactionIdPrecedes(xid, pxid))
			continue;

		/*
		 * Step 2: check the cached child-Xids arrays
		 */
		for (j = proc->subxids.nxids - 1; j >= 0; j--)
		{
			/* Fetch xid just once - see GetNewTransactionId */
			TransactionId cxid = proc->subxids.xids[j];

			if (TransactionIdEquals(cxid, xid))
			{
774
				LWLockRelease(ProcArrayLock);
775
				xc_by_child_xid_inc();
776
				return true;
777 778 779 780
			}
		}

		/*
B
Bruce Momjian 已提交
781 782 783 784 785
		 * Save the main Xid for step 3.  We only need to remember main Xids
		 * that have uncached children.  (Note: there is no race condition
		 * here because the overflowed flag cannot be cleared, only set, while
		 * we hold ProcArrayLock.  So we can't miss an Xid that we need to
		 * worry about.)
786 787 788 789 790
		 */
		if (proc->subxids.overflowed)
			xids[nxids++] = pxid;
	}

791 792 793 794 795 796 797 798 799 800 801 802 803 804 805
	/* In hot standby mode, check the known-assigned-xids list. */
	if (RecoveryInProgress())
	{
		/* none of the PGPROC entries should have XIDs in hot standby mode */
		Assert(nxids == 0);

		if (KnownAssignedXidsExist(xid))
		{
			LWLockRelease(ProcArrayLock);
			/* XXX: should we have a separate counter for this? */
			/* xc_by_main_xid_inc(); */
			return true;
		}

		/*
B
Bruce Momjian 已提交
806 807 808 809
		 * If the KnownAssignedXids overflowed, we have to check pg_subtrans
		 * too. Copy all xids from KnownAssignedXids that are lower than xid,
		 * since if xid is a subtransaction its parent will always have a
		 * lower value.
810 811 812 813 814
		 */
		if (TransactionIdPrecedesOrEquals(xid, procArray->lastOverflowedXid))
			nxids = KnownAssignedXidsGet(xids, xid);
	}

815 816 817 818
	LWLockRelease(ProcArrayLock);

	/*
	 * If none of the relevant caches overflowed, we know the Xid is not
819
	 * running without even looking at pg_subtrans.
820 821
	 */
	if (nxids == 0)
822 823
	{
		xc_no_overflow_inc();
824
		return false;
825
	}
826 827 828 829

	/*
	 * Step 3: have to check pg_subtrans.
	 *
830 831 832 833
	 * At this point, we know it's either a subtransaction of one of the Xids
	 * in xids[], or it's not running.  If it's an already-failed
	 * subtransaction, we want to say "not running" even though its parent may
	 * still be running.  So first, check pg_clog to see if it's been aborted.
834 835 836 837
	 */
	xc_slow_answer_inc();

	if (TransactionIdDidAbort(xid))
838
		return false;
839 840

	/*
B
Bruce Momjian 已提交
841
	 * It isn't aborted, so check whether the transaction tree it belongs to
B
Bruce Momjian 已提交
842 843
	 * is still running (or, more precisely, whether it was running when we
	 * held ProcArrayLock).
844 845 846 847 848 849 850 851
	 */
	topxid = SubTransGetTopmostTransaction(xid);
	Assert(TransactionIdIsValid(topxid));
	if (!TransactionIdEquals(topxid, xid))
	{
		for (i = 0; i < nxids; i++)
		{
			if (TransactionIdEquals(xids[i], topxid))
852
				return true;
853 854 855
		}
	}

856
	return false;
857 858
}

859 860 861 862 863 864 865 866 867 868 869 870 871 872 873
/*
 * TransactionIdIsActive -- is xid the top-level XID of an active backend?
 *
 * This differs from TransactionIdIsInProgress in that it ignores prepared
 * transactions.  Also, we ignore subtransactions since that's not needed
 * for current uses.
 */
bool
TransactionIdIsActive(TransactionId xid)
{
	bool		result = false;
	ProcArrayStruct *arrayP = procArray;
	int			i;

	/*
B
Bruce Momjian 已提交
874 875
	 * Don't bother checking a transaction older than RecentXmin; it could not
	 * possibly still be running.
876 877 878 879 880 881 882 883
	 */
	if (TransactionIdPrecedes(xid, RecentXmin))
		return false;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (i = 0; i < arrayP->numProcs; i++)
	{
B
Bruce Momjian 已提交
884
		volatile PGPROC *proc = arrayP->procs[i];
885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907

		/* Fetch xid just once - see GetNewTransactionId */
		TransactionId pxid = proc->xid;

		if (!TransactionIdIsValid(pxid))
			continue;

		if (proc->pid == 0)
			continue;			/* ignore prepared transactions */

		if (TransactionIdEquals(pxid, xid))
		{
			result = true;
			break;
		}
	}

	LWLockRelease(ProcArrayLock);

	return result;
}


908 909 910 911 912 913 914
/*
 * GetOldestXmin -- returns oldest transaction that was running
 *					when any current transaction was started.
 *
 * If allDbs is TRUE then all backends are considered; if allDbs is FALSE
 * then only backends running in my own database are considered.
 *
915 916
 * If ignoreVacuum is TRUE then backends with the PROC_IN_VACUUM flag set are
 * ignored.
917
 *
918 919 920
 * This is used by VACUUM to decide which deleted tuples must be preserved
 * in a table.	allDbs = TRUE is needed for shared relations, but allDbs =
 * FALSE is sufficient for non-shared relations, since only backends in my
B
Bruce Momjian 已提交
921
 * own database could ever see the tuples in them.	Also, we can ignore
922 923
 * concurrently running lazy VACUUMs because (a) they must be working on other
 * tables, and (b) they don't need to do snapshot-based lookups.
924 925
 *
 * This is also used to determine where to truncate pg_subtrans.  allDbs
926
 * must be TRUE for that case, and ignoreVacuum FALSE.
927
 *
928
 * Note: we include all currently running xids in the set of considered xids.
929 930
 * This ensures that if a just-started xact has not yet set its snapshot,
 * when it does set the snapshot it cannot set xmin less than what we compute.
931
 * See notes in src/backend/access/transam/README.
932 933
 */
TransactionId
934
GetOldestXmin(bool allDbs, bool ignoreVacuum)
935 936 937 938 939
{
	ProcArrayStruct *arrayP = procArray;
	TransactionId result;
	int			index;

940 941 942
	/* Cannot look for individual databases during recovery */
	Assert(allDbs || !RecoveryInProgress());

943 944
	LWLockAcquire(ProcArrayLock, LW_SHARED);

945
	/*
B
Bruce Momjian 已提交
946 947 948 949
	 * We initialize the MIN() calculation with latestCompletedXid + 1. This
	 * is a lower bound for the XIDs that might appear in the ProcArray later,
	 * and so protects us against overestimating the result due to future
	 * additions.
950
	 */
951 952 953
	result = ShmemVariableCache->latestCompletedXid;
	Assert(TransactionIdIsNormal(result));
	TransactionIdAdvance(result);
954 955 956

	for (index = 0; index < arrayP->numProcs; index++)
	{
B
Bruce Momjian 已提交
957
		volatile PGPROC *proc = arrayP->procs[index];
958

959
		if (ignoreVacuum && (proc->vacuumFlags & PROC_IN_VACUUM))
960 961
			continue;

962 963 964 965 966
		if (allDbs || proc->databaseId == MyDatabaseId)
		{
			/* Fetch xid just once - see GetNewTransactionId */
			TransactionId xid = proc->xid;

967 968 969 970 971 972 973 974 975
			/* First consider the transaction's own Xid, if any */
			if (TransactionIdIsNormal(xid) &&
				TransactionIdPrecedes(xid, result))
				result = xid;

			/*
			 * Also consider the transaction's Xmin, if set.
			 *
			 * We must check both Xid and Xmin because a transaction might
B
Bruce Momjian 已提交
976 977
			 * have an Xmin but not (yet) an Xid; conversely, if it has an
			 * Xid, that could determine some not-yet-set Xmin.
978 979 980 981 982
			 */
			xid = proc->xmin;	/* Fetch just once */
			if (TransactionIdIsNormal(xid) &&
				TransactionIdPrecedes(xid, result))
				result = xid;
983 984 985 986 987
		}
	}

	LWLockRelease(ProcArrayLock);

988 989 990 991 992 993 994
	/*
	 * Compute the cutoff XID, being careful not to generate a "permanent" XID
	 */
	result -= vacuum_defer_cleanup_age;
	if (!TransactionIdIsNormal(result))
		result = FirstNormalTransactionId;

995 996 997
	return result;
}

998
/*
999 1000 1001
 * GetSnapshotData -- returns information about running transactions.
 *
 * The returned snapshot includes xmin (lowest still-running xact ID),
1002
 * xmax (highest completed xact ID + 1), and a list of running xact IDs
1003 1004 1005 1006 1007 1008 1009 1010
 * in the range xmin <= xid < xmax.  It is used as follows:
 *		All xact IDs < xmin are considered finished.
 *		All xact IDs >= xmax are considered still running.
 *		For an xact ID xmin <= xid < xmax, consult list to see whether
 *		it is considered running or not.
 * This ensures that the set of transactions seen as "running" by the
 * current xact will not change after it takes the snapshot.
 *
1011 1012 1013 1014 1015
 * All running top-level XIDs are included in the snapshot, except for lazy
 * VACUUM processes.  We also try to include running subtransaction XIDs,
 * but since PGPROC has only a limited cache area for subxact XIDs, full
 * information may not be available.  If we find any overflowed subxid arrays,
 * we have to mark the snapshot's subxid data as overflowed, and extra work
1016
 * *may* need to be done to determine what's running (see XidInMVCCSnapshot()
1017
 * in tqual.c).
1018 1019 1020
 *
 * We also update the following backend-global variables:
 *		TransactionXmin: the oldest xmin of any snapshot in use in the
1021
 *			current transaction (this is the same as MyProc->xmin).
1022 1023 1024
 *		RecentXmin: the xmin computed for the most recent snapshot.  XIDs
 *			older than this are known not running any more.
 *		RecentGlobalXmin: the global xmin (oldest TransactionXmin across all
1025
 *			running transactions, except those running LAZY VACUUM).  This is
T
Tom Lane 已提交
1026
 *			the same computation done by GetOldestXmin(true, true).
1027 1028 1029
 *
 * Note: this function should probably not be called with an argument that's
 * not statically allocated (see xip allocation below).
1030 1031
 */
Snapshot
1032
GetSnapshotData(Snapshot snapshot)
1033 1034 1035 1036 1037 1038 1039
{
	ProcArrayStruct *arrayP = procArray;
	TransactionId xmin;
	TransactionId xmax;
	TransactionId globalxmin;
	int			index;
	int			count = 0;
1040
	int			subcount = 0;
1041
	bool		suboverflowed = false;
1042 1043 1044 1045

	Assert(snapshot != NULL);

	/*
B
Bruce Momjian 已提交
1046 1047
	 * Allocating space for maxProcs xids is usually overkill; numProcs would
	 * be sufficient.  But it seems better to do the malloc while not holding
1048 1049
	 * the lock, so we can't look at numProcs.  Likewise, we allocate much
	 * more subxip storage than is probably needed.
1050 1051
	 *
	 * This does open a possibility for avoiding repeated malloc/free: since
B
Bruce Momjian 已提交
1052
	 * maxProcs does not change at runtime, we can simply reuse the previous
1053
	 * xip arrays if any.  (This relies on the fact that all callers pass
B
Bruce Momjian 已提交
1054
	 * static SnapshotData structs.)
1055 1056 1057 1058
	 */
	if (snapshot->xip == NULL)
	{
		/*
B
Bruce Momjian 已提交
1059 1060
		 * First call for this snapshot. Snapshot is same size whether or not
		 * we are in recovery, see later comments.
1061 1062
		 */
		snapshot->xip = (TransactionId *)
1063
			malloc(arrayP->maxProcs * sizeof(TransactionId));
1064 1065 1066 1067
		if (snapshot->xip == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
1068 1069
		Assert(snapshot->subxip == NULL);
		snapshot->subxip = (TransactionId *)
1070
			malloc(TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
1071 1072 1073 1074
		if (snapshot->subxip == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
1075 1076 1077
	}

	/*
B
Bruce Momjian 已提交
1078
	 * It is sufficient to get shared lock on ProcArrayLock, even if we are
1079
	 * going to set MyProc->xmin.
1080
	 */
1081
	LWLockAcquire(ProcArrayLock, LW_SHARED);
1082

1083 1084 1085 1086
	/* xmax is always latestCompletedXid + 1 */
	xmax = ShmemVariableCache->latestCompletedXid;
	Assert(TransactionIdIsNormal(xmax));
	TransactionIdAdvance(xmax);
1087

1088 1089 1090
	/* initialize xmin calculation with xmax */
	globalxmin = xmin = xmax;

B
Bruce Momjian 已提交
1091
	/*
1092 1093 1094 1095 1096 1097
	 * If we're in recovery then snapshot data comes from a different place,
	 * so decide which route we take before grab the lock. It is possible
	 * for recovery to end before we finish taking snapshot, and for newly
	 * assigned transaction ids to be added to the procarray. Xmax cannot
	 * change while we hold ProcArrayLock, so those newly added transaction
	 * ids would be filtered away, so we need not be concerned about them.
B
Bruce Momjian 已提交
1098
	 */
1099 1100
	snapshot->takenDuringRecovery = RecoveryInProgress();

1101
	if (!snapshot->takenDuringRecovery)
1102 1103
	{
		/*
1104 1105 1106 1107 1108 1109
		 * Spin over procArray checking xid, xmin, and subxids.  The goal is to
		 * gather all active xids, find the lowest xmin, and try to record
		 * subxids. During recovery no xids will be assigned, so all normal
		 * backends can be ignored, nor are there any VACUUMs running. All
		 * prepared transaction xids are held in KnownAssignedXids, so these
		 * will be seen without needing to loop through procs here.
1110
		 */
1111
		for (index = 0; index < arrayP->numProcs; index++)
1112
		{
1113 1114 1115 1116 1117
			volatile PGPROC *proc = arrayP->procs[index];
			TransactionId xid;

			/* Ignore procs running LAZY VACUUM */
			if (proc->vacuumFlags & PROC_IN_VACUUM)
1118
				continue;
1119

1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138
			/* Update globalxmin to be the smallest valid xmin */
			xid = proc->xmin;		/* fetch just once */
			if (TransactionIdIsNormal(xid) &&
				TransactionIdPrecedes(xid, globalxmin))
				globalxmin = xid;

			/* Fetch xid just once - see GetNewTransactionId */
			xid = proc->xid;

			/*
			 * If the transaction has been assigned an xid < xmax we add it to the
			 * snapshot, and update xmin if necessary.	There's no need to store
			 * XIDs >= xmax, since we'll treat them as running anyway.  We don't
			 * bother to examine their subxids either.
			 *
			 * We don't include our own XID (if any) in the snapshot, but we must
			 * include it into xmin.
			 */
			if (TransactionIdIsNormal(xid))
1139
			{
1140 1141 1142 1143 1144 1145 1146
				if (TransactionIdFollowsOrEquals(xid, xmax))
					continue;
				if (proc != MyProc)
					snapshot->xip[count++] = xid;
				if (TransactionIdPrecedes(xid, xmin))
					xmin = xid;
			}
1147

1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166
			/*
			 * Save subtransaction XIDs if possible (if we've already overflowed,
			 * there's no point).  Note that the subxact XIDs must be later than
			 * their parent, so no need to check them against xmin.  We could
			 * filter against xmax, but it seems better not to do that much work
			 * while holding the ProcArrayLock.
			 *
			 * The other backend can add more subxids concurrently, but cannot
			 * remove any.	Hence it's important to fetch nxids just once. Should
			 * be safe to use memcpy, though.  (We needn't worry about missing any
			 * xids added concurrently, because they must postdate xmax.)
			 *
			 * Again, our own XIDs are not included in the snapshot.
			 */
			if (!suboverflowed && proc != MyProc)
			{
				if (proc->subxids.overflowed)
					suboverflowed = true;
				else
1167
				{
1168 1169 1170 1171 1172 1173 1174 1175 1176
					int			nxids = proc->subxids.nxids;

					if (nxids > 0)
					{
						memcpy(snapshot->subxip + subcount,
							   (void *) proc->subxids.xids,
							   nxids * sizeof(TransactionId));
						subcount += nxids;
					}
1177 1178 1179
				}
			}
		}
1180
	}
1181
	else
1182 1183 1184 1185 1186 1187 1188
	{
		/*
		 * We store all xids directly into subxip[]. Here's why:
		 *
		 * In recovery we don't know which xids are top-level and which are
		 * subxacts, a design choice that greatly simplifies xid processing.
		 *
B
Bruce Momjian 已提交
1189 1190 1191 1192
		 * It seems like we would want to try to put xids into xip[] only, but
		 * that is fairly small. We would either need to make that bigger or
		 * to increase the rate at which we WAL-log xid assignment; neither is
		 * an appealing choice.
1193 1194 1195 1196
		 *
		 * We could try to store xids into xip[] first and then into subxip[]
		 * if there are too many xids. That only works if the snapshot doesn't
		 * overflow because we do not search subxip[] in that case. A simpler
B
Bruce Momjian 已提交
1197 1198
		 * way is to just store all xids in the subxact array because this is
		 * by far the bigger array. We just leave the xip array empty.
1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209
		 *
		 * Either way we need to change the way XidInMVCCSnapshot() works
		 * depending upon when the snapshot was taken, or change normal
		 * snapshot processing so it matches.
		 */
		subcount = KnownAssignedXidsGetAndSetXmin(snapshot->subxip, &xmin, xmax);

		if (TransactionIdPrecedes(xmin, procArray->lastOverflowedXid))
			suboverflowed = true;
	}

1210
	if (!TransactionIdIsValid(MyProc->xmin))
1211 1212 1213 1214 1215
		MyProc->xmin = TransactionXmin = xmin;

	LWLockRelease(ProcArrayLock);

	/*
B
Bruce Momjian 已提交
1216 1217 1218
	 * Update globalxmin to include actual process xids.  This is a slightly
	 * different way of computing it than GetOldestXmin uses, but should give
	 * the same result.
1219 1220 1221 1222 1223
	 */
	if (TransactionIdPrecedes(xmin, globalxmin))
		globalxmin = xmin;

	/* Update global variables too */
1224 1225 1226
	RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
	if (!TransactionIdIsNormal(RecentGlobalXmin))
		RecentGlobalXmin = FirstNormalTransactionId;
1227 1228 1229 1230 1231
	RecentXmin = xmin;

	snapshot->xmin = xmin;
	snapshot->xmax = xmax;
	snapshot->xcnt = count;
1232
	snapshot->subxcnt = subcount;
1233
	snapshot->suboverflowed = suboverflowed;
1234

1235
	snapshot->curcid = GetCurrentCommandId(false);
1236

1237
	/*
1238 1239
	 * This is a new snapshot, so set both refcounts are zero, and mark it as
	 * not copied in persistent memory.
1240 1241 1242 1243 1244
	 */
	snapshot->active_count = 0;
	snapshot->regd_count = 0;
	snapshot->copied = false;

1245 1246 1247
	return snapshot;
}

1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281
/*
 * GetRunningTransactionData -- returns information about running transactions.
 *
 * Similar to GetSnapshotData but returning more information. We include
 * all PGPROCs with an assigned TransactionId, even VACUUM processes.
 *
 * This is never executed during recovery so there is no need to look at
 * KnownAssignedXids.
 *
 * We don't worry about updating other counters, we want to keep this as
 * simple as possible and leave GetSnapshotData() as the primary code for
 * that bookkeeping.
 */
RunningTransactions
GetRunningTransactionData(void)
{
	ProcArrayStruct *arrayP = procArray;
	RunningTransactions CurrentRunningXacts = (RunningTransactions) &CurrentRunningXactsData;
	TransactionId latestCompletedXid;
	TransactionId oldestRunningXid;
	TransactionId *xids;
	int			index;
	int			count;
	int			subcount;
	bool		suboverflowed;

	Assert(!RecoveryInProgress());

	/*
	 * Allocating space for maxProcs xids is usually overkill; numProcs would
	 * be sufficient.  But it seems better to do the malloc while not holding
	 * the lock, so we can't look at numProcs.  Likewise, we allocate much
	 * more subxip storage than is probably needed.
	 *
B
Bruce Momjian 已提交
1282 1283
	 * Should only be allocated for bgwriter, since only ever executed during
	 * checkpoints.
1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312
	 */
	if (CurrentRunningXacts->xids == NULL)
	{
		/*
		 * First call
		 */
		CurrentRunningXacts->xids = (TransactionId *)
			malloc(TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
		if (CurrentRunningXacts->xids == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
	}

	xids = CurrentRunningXacts->xids;

	count = subcount = 0;
	suboverflowed = false;

	/*
	 * Ensure that no xids enter or leave the procarray while we obtain
	 * snapshot.
	 */
	LWLockAcquire(ProcArrayLock, LW_SHARED);
	LWLockAcquire(XidGenLock, LW_SHARED);

	latestCompletedXid = ShmemVariableCache->latestCompletedXid;

	oldestRunningXid = ShmemVariableCache->nextXid;
B
Bruce Momjian 已提交
1313

1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338
	/*
	 * Spin over procArray collecting all xids and subxids.
	 */
	for (index = 0; index < arrayP->numProcs; index++)
	{
		volatile PGPROC *proc = arrayP->procs[index];
		TransactionId xid;
		int			nxids;

		/* Fetch xid just once - see GetNewTransactionId */
		xid = proc->xid;

		/*
		 * We don't need to store transactions that don't have a TransactionId
		 * yet because they will not show as running on a standby server.
		 */
		if (!TransactionIdIsValid(xid))
			continue;

		xids[count++] = xid;

		if (TransactionIdPrecedes(xid, oldestRunningXid))
			oldestRunningXid = xid;

		/*
B
Bruce Momjian 已提交
1339 1340
		 * Save subtransaction XIDs. Other backends can't add or remove
		 * entries while we're holding XidGenLock.
1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371
		 */
		nxids = proc->subxids.nxids;
		if (nxids > 0)
		{
			memcpy(&xids[count], (void *) proc->subxids.xids,
				   nxids * sizeof(TransactionId));
			count += nxids;
			subcount += nxids;

			if (proc->subxids.overflowed)
				suboverflowed = true;

			/*
			 * Top-level XID of a transaction is always greater than any of
			 * its subxids, so we don't need to check if any of the subxids
			 * are smaller than oldestRunningXid
			 */
		}
	}

	CurrentRunningXacts->xcnt = count;
	CurrentRunningXacts->subxid_overflow = suboverflowed;
	CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid;
	CurrentRunningXacts->oldestRunningXid = oldestRunningXid;

	LWLockRelease(XidGenLock);
	LWLockRelease(ProcArrayLock);

	return CurrentRunningXacts;
}

1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393
/*
 * GetTransactionsInCommit -- Get the XIDs of transactions that are committing
 *
 * Constructs an array of XIDs of transactions that are currently in commit
 * critical sections, as shown by having inCommit set in their PGPROC entries.
 *
 * *xids_p is set to a palloc'd array that should be freed by the caller.
 * The return value is the number of valid entries.
 *
 * Note that because backends set or clear inCommit without holding any lock,
 * the result is somewhat indeterminate, but we don't really care.  Even in
 * a multiprocessor with delayed writes to shared memory, it should be certain
 * that setting of inCommit will propagate to shared memory when the backend
 * takes the WALInsertLock, so we cannot fail to see an xact as inCommit if
 * it's already inserted its commit record.  Whether it takes a little while
 * for clearing of inCommit to propagate is unimportant for correctness.
 */
int
GetTransactionsInCommit(TransactionId **xids_p)
{
	ProcArrayStruct *arrayP = procArray;
	TransactionId *xids;
B
Bruce Momjian 已提交
1394 1395
	int			nxids;
	int			index;
1396 1397 1398 1399 1400 1401 1402 1403

	xids = (TransactionId *) palloc(arrayP->maxProcs * sizeof(TransactionId));
	nxids = 0;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
B
Bruce Momjian 已提交
1404 1405
		volatile PGPROC *proc = arrayP->procs[index];

1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430
		/* Fetch xid just once - see GetNewTransactionId */
		TransactionId pxid = proc->xid;

		if (proc->inCommit && TransactionIdIsValid(pxid))
			xids[nxids++] = pxid;
	}

	LWLockRelease(ProcArrayLock);

	*xids_p = xids;
	return nxids;
}

/*
 * HaveTransactionsInCommit -- Are any of the specified XIDs in commit?
 *
 * This is used with the results of GetTransactionsInCommit to see if any
 * of the specified XIDs are still in their commit critical sections.
 *
 * Note: this is O(N^2) in the number of xacts that are/were in commit, but
 * those numbers should be small enough for it not to be a problem.
 */
bool
HaveTransactionsInCommit(TransactionId *xids, int nxids)
{
B
Bruce Momjian 已提交
1431
	bool		result = false;
1432
	ProcArrayStruct *arrayP = procArray;
B
Bruce Momjian 已提交
1433
	int			index;
1434 1435 1436 1437 1438

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
B
Bruce Momjian 已提交
1439 1440
		volatile PGPROC *proc = arrayP->procs[index];

1441 1442 1443 1444 1445
		/* Fetch xid just once - see GetNewTransactionId */
		TransactionId pxid = proc->xid;

		if (proc->inCommit && TransactionIdIsValid(pxid))
		{
B
Bruce Momjian 已提交
1446
			int			i;
1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465

			for (i = 0; i < nxids; i++)
			{
				if (xids[i] == pxid)
				{
					result = true;
					break;
				}
			}
			if (result)
				break;
		}
	}

	LWLockRelease(ProcArrayLock);

	return result;
}

1466 1467
/*
 * BackendPidGetProc -- get a backend's PGPROC given its PID
1468 1469 1470 1471
 *
 * Returns NULL if not found.  Note that it is up to the caller to be
 * sure that the question remains meaningful for long enough for the
 * answer to be used ...
1472
 */
1473
PGPROC *
1474 1475 1476 1477 1478 1479
BackendPidGetProc(int pid)
{
	PGPROC	   *result = NULL;
	ProcArrayStruct *arrayP = procArray;
	int			index;

1480 1481 1482
	if (pid == 0)				/* never match dummy PGPROCs */
		return NULL;

1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500
	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
		PGPROC	   *proc = arrayP->procs[index];

		if (proc->pid == pid)
		{
			result = proc;
			break;
		}
	}

	LWLockRelease(ProcArrayLock);

	return result;
}

T
Tatsuo Ishii 已提交
1501 1502 1503 1504 1505 1506
/*
 * BackendXidGetPid -- get a backend's pid given its XID
 *
 * Returns 0 if not found or it's a prepared transaction.  Note that
 * it is up to the caller to be sure that the question remains
 * meaningful for long enough for the answer to be used ...
B
Bruce Momjian 已提交
1507
 *
T
Tatsuo Ishii 已提交
1508 1509
 * Only main transaction Ids are considered.  This function is mainly
 * useful for determining what backend owns a lock.
1510
 *
B
Bruce Momjian 已提交
1511
 * Beware that not every xact has an XID assigned.	However, as long as you
1512
 * only call this using an XID found on disk, you're safe.
T
Tatsuo Ishii 已提交
1513 1514 1515 1516
 */
int
BackendXidGetPid(TransactionId xid)
{
B
Bruce Momjian 已提交
1517
	int			result = 0;
T
Tatsuo Ishii 已提交
1518 1519 1520 1521 1522 1523 1524 1525 1526 1527
	ProcArrayStruct *arrayP = procArray;
	int			index;

	if (xid == InvalidTransactionId)	/* never match invalid xid */
		return 0;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
B
Bruce Momjian 已提交
1528
		volatile PGPROC *proc = arrayP->procs[index];
T
Tatsuo Ishii 已提交
1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541

		if (proc->xid == xid)
		{
			result = proc->pid;
			break;
		}
	}

	LWLockRelease(ProcArrayLock);

	return result;
}

1542 1543 1544 1545 1546 1547 1548 1549 1550
/*
 * IsBackendPid -- is a given pid a running backend
 */
bool
IsBackendPid(int pid)
{
	return (BackendPidGetProc(pid) != NULL);
}

1551 1552 1553 1554

/*
 * GetCurrentVirtualXIDs -- returns an array of currently active VXIDs.
 *
1555
 * The array is palloc'd. The number of valid entries is returned into *nvxids.
1556
 *
1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570
 * The arguments allow filtering the set of VXIDs returned.  Our own process
 * is always skipped.  In addition:
 *	If limitXmin is not InvalidTransactionId, skip processes with
 *		xmin > limitXmin.
 *	If excludeXmin0 is true, skip processes with xmin = 0.
 *	If allDbs is false, skip processes attached to other databases.
 *	If excludeVacuum isn't zero, skip processes for which
 *		(vacuumFlags & excludeVacuum) is not zero.
 *
 * Note: the purpose of the limitXmin and excludeXmin0 parameters is to
 * allow skipping backends whose oldest live snapshot is no older than
 * some snapshot we have.  Since we examine the procarray with only shared
 * lock, there are race conditions: a backend could set its xmin just after
 * we look.  Indeed, on multiprocessors with weak memory ordering, the
1571
 * other backend could have set its xmin *before* we look.	We know however
1572 1573 1574 1575 1576
 * that such a backend must have held shared ProcArrayLock overlapping our
 * own hold of ProcArrayLock, else we would see its xmin update.  Therefore,
 * any snapshot the other backend is taking concurrently with our scan cannot
 * consider any transactions as still running that we think are committed
 * (since backends must hold ProcArrayLock exclusive to commit).
1577 1578
 */
VirtualTransactionId *
1579 1580 1581
GetCurrentVirtualXIDs(TransactionId limitXmin, bool excludeXmin0,
					  bool allDbs, int excludeVacuum,
					  int *nvxids)
1582 1583 1584 1585 1586 1587
{
	VirtualTransactionId *vxids;
	ProcArrayStruct *arrayP = procArray;
	int			count = 0;
	int			index;

1588
	/* allocate what's certainly enough result space */
1589
	vxids = (VirtualTransactionId *)
1590
		palloc(sizeof(VirtualTransactionId) * arrayP->maxProcs);
1591 1592 1593 1594 1595

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
B
Bruce Momjian 已提交
1596
		volatile PGPROC *proc = arrayP->procs[index];
1597 1598 1599 1600

		if (proc == MyProc)
			continue;

1601 1602 1603
		if (excludeVacuum & proc->vacuumFlags)
			continue;

1604
		if (allDbs || proc->databaseId == MyDatabaseId)
1605
		{
1606
			/* Fetch xmin just once - might change on us */
1607 1608
			TransactionId pxmin = proc->xmin;

1609 1610 1611
			if (excludeXmin0 && !TransactionIdIsValid(pxmin))
				continue;

1612
			/*
1613 1614
			 * InvalidTransactionId precedes all other XIDs, so a proc that
			 * hasn't set xmin yet will not be rejected by this test.
1615 1616
			 */
			if (!TransactionIdIsValid(limitXmin) ||
1617
				TransactionIdPrecedesOrEquals(pxmin, limitXmin))
1618 1619
			{
				VirtualTransactionId vxid;
1620

1621 1622 1623 1624
				GET_VXID_FROM_PGPROC(vxid, *proc);
				if (VirtualTransactionIdIsValid(vxid))
					vxids[count++] = vxid;
			}
1625 1626 1627 1628 1629
		}
	}

	LWLockRelease(ProcArrayLock);

1630
	*nvxids = count;
1631 1632 1633
	return vxids;
}

1634 1635 1636 1637 1638 1639
/*
 * GetConflictingVirtualXIDs -- returns an array of currently active VXIDs.
 *
 * Usage is limited to conflict resolution during recovery on standby servers.
 * limitXmin is supplied as either latestRemovedXid, or InvalidTransactionId
 * in cases where we cannot accurately determine a value for latestRemovedXid.
1640
 *
1641 1642 1643
 * If limitXmin is InvalidTransactionId then we want to kill everybody,
 * so we're not worried if they have a snapshot or not, nor does it really
 * matter what type of lock we hold.
1644
 *
1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657
 * All callers that are checking xmins always now supply a valid and useful
 * value for limitXmin. The limitXmin is always lower than the lowest
 * numbered KnownAssignedXid that is not already a FATAL error. This is
 * because we only care about cleanup records that are cleaning up tuple
 * versions from committed transactions. In that case they will only occur
 * at the point where the record is less than the lowest running xid. That
 * allows us to say that if any backend takes a snapshot concurrently with
 * us then the conflict assessment made here would never include the snapshot
 * that is being derived. So we take LW_SHARED on the ProcArray and allow
 * concurrent snapshots when limitXmin is valid. We might think about adding
 *   Assert(limitXmin < lowest(KnownAssignedXids))
 * but that would not be true in the case of FATAL errors lagging in array,
 * but we already know those are bogus anyway, so we skip that test.
1658
 *
1659
 * If dbOid is valid we skip backends attached to other databases.
1660 1661 1662 1663 1664
 *
 * Be careful to *not* pfree the result from this function. We reuse
 * this array sufficiently often that we use malloc for the result.
 */
VirtualTransactionId *
1665
GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
1666 1667 1668 1669 1670 1671 1672 1673
{
	static VirtualTransactionId *vxids;
	ProcArrayStruct *arrayP = procArray;
	int			count = 0;
	int			index;

	/*
	 * If not first time through, get workspace to remember main XIDs in. We
B
Bruce Momjian 已提交
1674 1675
	 * malloc it permanently to avoid repeated palloc/pfree overhead. Allow
	 * result space, remembering room for a terminator.
1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686
	 */
	if (vxids == NULL)
	{
		vxids = (VirtualTransactionId *)
			malloc(sizeof(VirtualTransactionId) * (arrayP->maxProcs + 1));
		if (vxids == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
	}

1687
	LWLockAcquire(ProcArrayLock, LW_SHARED);
1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703

	for (index = 0; index < arrayP->numProcs; index++)
	{
		volatile PGPROC *proc = arrayP->procs[index];

		/* Exclude prepared transactions */
		if (proc->pid == 0)
			continue;

		if (!OidIsValid(dbOid) ||
			proc->databaseId == dbOid)
		{
			/* Fetch xmin just once - can't change on us, but good coding */
			TransactionId pxmin = proc->xmin;

			/*
B
Bruce Momjian 已提交
1704 1705 1706
			 * We ignore an invalid pxmin because this means that backend has
			 * no snapshot and cannot get another one while we hold exclusive
			 * lock.
1707
			 */
1708 1709
			if (!TransactionIdIsValid(limitXmin) ||
				(TransactionIdIsValid(pxmin) && !TransactionIdFollows(pxmin, limitXmin)))
1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734
			{
				VirtualTransactionId vxid;

				GET_VXID_FROM_PGPROC(vxid, *proc);
				if (VirtualTransactionIdIsValid(vxid))
					vxids[count++] = vxid;
			}
		}
	}

	LWLockRelease(ProcArrayLock);

	/* add the terminator */
	vxids[count].backendId = InvalidBackendId;
	vxids[count].localTransactionId = InvalidLocalTransactionId;

	return vxids;
}

/*
 * CancelVirtualTransaction - used in recovery conflict processing
 *
 * Returns pid of the process signaled, or 0 if not found.
 */
pid_t
1735
CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode)
1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752
{
	ProcArrayStruct *arrayP = procArray;
	int			index;
	pid_t		pid = 0;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
		VirtualTransactionId procvxid;
		PGPROC	   *proc = arrayP->procs[index];

		GET_VXID_FROM_PGPROC(procvxid, *proc);

		if (procvxid.backendId == vxid.backendId &&
			procvxid.localTransactionId == vxid.localTransactionId)
		{
1753
			proc->recoveryConflictPending = true;
1754
			pid = proc->pid;
1755 1756 1757
			if (pid != 0)
			{
				/*
B
Bruce Momjian 已提交
1758 1759
				 * Kill the pid if it's still here. If not, that's what we
				 * wanted so ignore any errors.
1760 1761 1762
				 */
				(void) SendProcSignal(pid, sigmode, vxid.backendId);
			}
1763 1764 1765 1766 1767 1768 1769 1770
			break;
		}
	}

	LWLockRelease(ProcArrayLock);

	return pid;
}
1771

1772 1773 1774 1775 1776
/*
 * CountActiveBackends --- count backends (other than myself) that are in
 *		active transactions.  This is used as a heuristic to decide if
 *		a pre-XLOG-flush delay is worthwhile during commit.
 *
1777 1778
 * Do not count backends that are blocked waiting for locks, since they are
 * not going to get to run until someone else commits.
1779 1780 1781 1782 1783 1784 1785 1786 1787 1788
 */
int
CountActiveBackends(void)
{
	ProcArrayStruct *arrayP = procArray;
	int			count = 0;
	int			index;

	/*
	 * Note: for speed, we don't acquire ProcArrayLock.  This is a little bit
B
Bruce Momjian 已提交
1789 1790
	 * bogus, but since we are only testing fields for zero or nonzero, it
	 * should be OK.  The result is only used for heuristic purposes anyway...
1791 1792 1793
	 */
	for (index = 0; index < arrayP->numProcs; index++)
	{
B
Bruce Momjian 已提交
1794
		volatile PGPROC *proc = arrayP->procs[index];
1795

1796 1797 1798 1799 1800 1801 1802
		/*
		 * Since we're not holding a lock, need to check that the pointer is
		 * valid. Someone holding the lock could have incremented numProcs
		 * already, but not yet inserted a valid pointer to the array.
		 *
		 * If someone just decremented numProcs, 'proc' could also point to a
		 * PGPROC entry that's no longer in the array. It still points to a
1803 1804 1805
		 * PGPROC struct, though, because freed PGPPROC entries just go to the
		 * free list and are recycled. Its contents are nonsense in that case,
		 * but that's acceptable for this function.
1806
		 */
1807
		if (proc == NULL)
1808 1809
			continue;

1810 1811
		if (proc == MyProc)
			continue;			/* do not count myself */
1812 1813 1814
		if (proc->pid == 0)
			continue;			/* do not count prepared xacts */
		if (proc->xid == InvalidTransactionId)
1815
			continue;			/* do not count if no XID assigned */
1816 1817 1818 1819 1820 1821 1822 1823
		if (proc->waitLock != NULL)
			continue;			/* do not count if blocked on a lock */
		count++;
	}

	return count;
}

1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837
/*
 * CountDBBackends --- count backends that are using specified database
 */
int
CountDBBackends(Oid databaseid)
{
	ProcArrayStruct *arrayP = procArray;
	int			count = 0;
	int			index;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
B
Bruce Momjian 已提交
1838
		volatile PGPROC *proc = arrayP->procs[index];
1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850

		if (proc->pid == 0)
			continue;			/* do not count prepared xacts */
		if (proc->databaseId == databaseid)
			count++;
	}

	LWLockRelease(ProcArrayLock);

	return count;
}

1851 1852 1853 1854
/*
 * CancelDBBackends --- cancel backends that are using specified database
 */
void
1855
CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPending)
1856 1857 1858
{
	ProcArrayStruct *arrayP = procArray;
	int			index;
1859
	pid_t		pid = 0;
1860 1861 1862 1863 1864 1865 1866 1867

	/* tell all backends to die */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

	for (index = 0; index < arrayP->numProcs; index++)
	{
		volatile PGPROC *proc = arrayP->procs[index];

1868
		if (databaseid == InvalidOid || proc->databaseId == databaseid)
1869
		{
1870 1871 1872 1873
			VirtualTransactionId procvxid;

			GET_VXID_FROM_PGPROC(procvxid, *proc);

1874
			proc->recoveryConflictPending = conflictPending;
1875 1876 1877 1878
			pid = proc->pid;
			if (pid != 0)
			{
				/*
B
Bruce Momjian 已提交
1879 1880
				 * Kill the pid if it's still here. If not, that's what we
				 * wanted so ignore any errors.
1881
				 */
1882
				(void) SendProcSignal(pid, sigmode, procvxid.backendId);
1883
			}
1884 1885 1886 1887 1888 1889
		}
	}

	LWLockRelease(ProcArrayLock);
}

1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903
/*
 * CountUserBackends --- count backends that are used by specified user
 */
int
CountUserBackends(Oid roleid)
{
	ProcArrayStruct *arrayP = procArray;
	int			count = 0;
	int			index;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
B
Bruce Momjian 已提交
1904
		volatile PGPROC *proc = arrayP->procs[index];
1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916

		if (proc->pid == 0)
			continue;			/* do not count prepared xacts */
		if (proc->roleId == roleid)
			count++;
	}

	LWLockRelease(ProcArrayLock);

	return count;
}

1917
/*
1918
 * CountOtherDBBackends -- check for other backends running in the given DB
1919 1920 1921
 *
 * If there are other backends in the DB, we will wait a maximum of 5 seconds
 * for them to exit.  Autovacuum backends are encouraged to exit early by
1922
 * sending them SIGTERM, but normal user backends are just waited for.
1923 1924 1925 1926 1927
 *
 * The current backend is always ignored; it is caller's responsibility to
 * check whether the current backend uses the given DB, if it's important.
 *
 * Returns TRUE if there are (still) other backends in the DB, FALSE if not.
1928 1929
 * Also, *nbackends and *nprepared are set to the number of other backends
 * and prepared transactions in the DB, respectively.
1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940
 *
 * This function is used to interlock DROP DATABASE and related commands
 * against there being any active backends in the target DB --- dropping the
 * DB while active backends remain would be a Bad Thing.  Note that we cannot
 * detect here the possibility of a newly-started backend that is trying to
 * connect to the doomed database, so additional interlocking is needed during
 * backend startup.  The caller should normally hold an exclusive lock on the
 * target DB before calling this, which is one reason we mustn't wait
 * indefinitely.
 */
bool
1941
CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
1942 1943
{
	ProcArrayStruct *arrayP = procArray;
1944 1945

#define MAXAUTOVACPIDS	10		/* max autovacs to SIGTERM per iteration */
1946
	int			autovac_pids[MAXAUTOVACPIDS];
1947 1948 1949 1950 1951
	int			tries;

	/* 50 tries with 100ms sleep between tries makes 5 sec total wait */
	for (tries = 0; tries < 50; tries++)
	{
1952
		int			nautovacs = 0;
1953 1954 1955 1956 1957
		bool		found = false;
		int			index;

		CHECK_FOR_INTERRUPTS();

1958 1959
		*nbackends = *nprepared = 0;

1960 1961 1962 1963
		LWLockAcquire(ProcArrayLock, LW_SHARED);

		for (index = 0; index < arrayP->numProcs; index++)
		{
B
Bruce Momjian 已提交
1964
			volatile PGPROC *proc = arrayP->procs[index];
1965 1966 1967 1968 1969 1970 1971 1972

			if (proc->databaseId != databaseId)
				continue;
			if (proc == MyProc)
				continue;

			found = true;

1973 1974
			if (proc->pid == 0)
				(*nprepared)++;
1975 1976
			else
			{
1977 1978 1979 1980
				(*nbackends)++;
				if ((proc->vacuumFlags & PROC_IS_AUTOVACUUM) &&
					nautovacs < MAXAUTOVACPIDS)
					autovac_pids[nautovacs++] = proc->pid;
1981 1982 1983
			}
		}

1984 1985
		LWLockRelease(ProcArrayLock);

1986
		if (!found)
B
Bruce Momjian 已提交
1987
			return false;		/* no conflicting backends, so done */
1988

1989
		/*
1990 1991 1992 1993
		 * Send SIGTERM to any conflicting autovacuums before sleeping. We
		 * postpone this step until after the loop because we don't want to
		 * hold ProcArrayLock while issuing kill(). We have no idea what might
		 * block kill() inside the kernel...
1994 1995 1996 1997 1998
		 */
		for (index = 0; index < nautovacs; index++)
			(void) kill(autovac_pids[index], SIGTERM);	/* ignore any error */

		/* sleep, then try again */
B
Bruce Momjian 已提交
1999
		pg_usleep(100 * 1000L); /* 100ms */
2000 2001
	}

B
Bruce Momjian 已提交
2002
	return true;				/* timed out, still conflicts */
2003 2004
}

2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017

#define XidCacheRemove(i) \
	do { \
		MyProc->subxids.xids[i] = MyProc->subxids.xids[MyProc->subxids.nxids - 1]; \
		MyProc->subxids.nxids--; \
	} while (0)

/*
 * XidCacheRemoveRunningXids
 *
 * Remove a bunch of TransactionIds from the list of known-running
 * subtransactions for my backend.	Both the specified xid and those in
 * the xids[] array (of length nxids) are removed from the subxids cache.
2018
 * latestXid must be the latest XID among the group.
2019 2020
 */
void
2021 2022 2023
XidCacheRemoveRunningXids(TransactionId xid,
						  int nxids, const TransactionId *xids,
						  TransactionId latestXid)
2024 2025 2026 2027
{
	int			i,
				j;

2028
	Assert(TransactionIdIsValid(xid));
2029 2030 2031

	/*
	 * We must hold ProcArrayLock exclusively in order to remove transactions
2032 2033 2034 2035
	 * from the PGPROC array.  (See src/backend/access/transam/README.)  It's
	 * possible this could be relaxed since we know this routine is only used
	 * to abort subtransactions, but pending closer analysis we'd best be
	 * conservative.
2036 2037 2038 2039
	 */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

	/*
B
Bruce Momjian 已提交
2040 2041 2042
	 * Under normal circumstances xid and xids[] will be in increasing order,
	 * as will be the entries in subxids.  Scan backwards to avoid O(N^2)
	 * behavior when removing a lot of xids.
2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055
	 */
	for (i = nxids - 1; i >= 0; i--)
	{
		TransactionId anxid = xids[i];

		for (j = MyProc->subxids.nxids - 1; j >= 0; j--)
		{
			if (TransactionIdEquals(MyProc->subxids.xids[j], anxid))
			{
				XidCacheRemove(j);
				break;
			}
		}
B
Bruce Momjian 已提交
2056

2057
		/*
B
Bruce Momjian 已提交
2058 2059 2060 2061 2062
		 * Ordinarily we should have found it, unless the cache has
		 * overflowed. However it's also possible for this routine to be
		 * invoked multiple times for the same subtransaction, in case of an
		 * error during AbortSubTransaction.  So instead of Assert, emit a
		 * debug warning.
2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079
		 */
		if (j < 0 && !MyProc->subxids.overflowed)
			elog(WARNING, "did not find subXID %u in MyProc", anxid);
	}

	for (j = MyProc->subxids.nxids - 1; j >= 0; j--)
	{
		if (TransactionIdEquals(MyProc->subxids.xids[j], xid))
		{
			XidCacheRemove(j);
			break;
		}
	}
	/* Ordinarily we should have found it, unless the cache has overflowed */
	if (j < 0 && !MyProc->subxids.overflowed)
		elog(WARNING, "did not find subXID %u in MyProc", xid);

2080 2081 2082 2083 2084
	/* Also advance global latestCompletedXid while holding the lock */
	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
							  latestXid))
		ShmemVariableCache->latestCompletedXid = latestXid;

2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096
	LWLockRelease(ProcArrayLock);
}

#ifdef XIDCACHE_DEBUG

/*
 * Print stats about effectiveness of XID cache
 */
static void
DisplayXidCache(void)
{
	fprintf(stderr,
2097
			"XidCache: xmin: %ld, known: %ld, myxact: %ld, latest: %ld, mainxid: %ld, childxid: %ld, nooflo: %ld, slow: %ld\n",
2098
			xc_by_recent_xmin,
2099
			xc_by_known_xact,
2100
			xc_by_my_xact,
2101
			xc_by_latest_xid,
2102 2103
			xc_by_main_xid,
			xc_by_child_xid,
2104
			xc_no_overflow,
2105 2106 2107
			xc_slow_answer);
}
#endif   /* XIDCACHE_DEBUG */
2108 2109

/* ----------------------------------------------
B
Bruce Momjian 已提交
2110
 *		KnownAssignedTransactions sub-module
2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171
 * ----------------------------------------------
 */

/*
 * In Hot Standby mode, we maintain a list of transactions that are (or were)
 * running in the master at the current point in WAL.
 *
 * RecordKnownAssignedTransactionIds() should be run for *every* WAL record
 * type apart from XLOG_XACT_RUNNING_XACTS, since that initialises the first
 * snapshot so that RecordKnownAssignedTransactionIds() can be callsed. Uses
 * local variables, so should only be called by Startup process.
 *
 * We record all xids that we know have been assigned. That includes
 * all the xids on the WAL record, plus all unobserved xids that
 * we can deduce have been assigned. We can deduce the existence of
 * unobserved xids because we know xids are in sequence, with no gaps.
 *
 * During recovery we do not fret too much about the distinction between
 * top-level xids and subtransaction xids. We hold both together in
 * a hash table called KnownAssignedXids. In backends, this is copied into
 * snapshots in GetSnapshotData(), taking advantage
 * of the fact that XidInMVCCSnapshot() doesn't care about the distinction
 * either. Subtransaction xids are effectively treated as top-level xids
 * and in the typical case pg_subtrans is *not* maintained (and that
 * does not effect visibility).
 *
 * KnownAssignedXids expands as new xids are observed or inferred, and
 * contracts when transaction completion records arrive. We have room in a
 * snapshot to hold maxProcs * (1 + PGPROC_MAX_CACHED_SUBXIDS) xids, so
 * every transaction must report their subtransaction xids in a special
 * WAL assignment record every PGPROC_MAX_CACHED_SUBXIDS. This allows us
 * to remove the subtransaction xids and update pg_subtrans instead. Snapshots
 * are still correct yet we don't overflow SnapshotData structure. When we do
 * this we need
 * to keep track of which xids caused the snapshot to overflow. We do that
 * by simply tracking the lastOverflowedXid - if it is within the bounds of
 * the KnownAssignedXids then we know the snapshot overflowed. (Note that
 * subxid overflow occurs on primary when 65th subxid arrives, whereas on
 * standby it occurs when 64th subxid arrives - that is not an error).
 *
 * Should FATAL errors result in a backend on primary disappearing before
 * it can write an abort record then we just leave those xids in
 * KnownAssignedXids. They actually aborted but we think they were running;
 * the distinction is irrelevant because either way any changes done by the
 * transaction are not visible to backends in the standby.
 * We prune KnownAssignedXids when XLOG_XACT_RUNNING_XACTS arrives, to
 * ensure we do not overflow.
 *
 * If we are in STANDBY_SNAPSHOT_PENDING state, then we may try to remove
 * xids that are not present.
 */
void
RecordKnownAssignedTransactionIds(TransactionId xid)
{
	/*
	 * Skip processing if the current snapshot is not initialized.
	 */
	if (standbyState < STANDBY_SNAPSHOT_PENDING)
		return;

	/*
B
Bruce Momjian 已提交
2172 2173 2174 2175
	 * We can see WAL records before the running-xacts snapshot that contain
	 * XIDs that are not in the running-xacts snapshot, but that we know to
	 * have finished before the running-xacts snapshot was taken. Don't waste
	 * precious shared memory by keeping them in the hash table.
2176 2177 2178
	 *
	 * We can also see WAL records before the running-xacts snapshot that
	 * contain XIDs that are not in the running-xacts snapshot for a different
B
Bruce Momjian 已提交
2179 2180 2181 2182 2183
	 * reason: the transaction started *after* the running-xacts snapshot was
	 * taken, but before it was written to WAL. We must be careful to not
	 * ignore such XIDs. Because such a transaction started after the
	 * running-xacts snapshot was taken, it must have an XID larger than the
	 * oldest XID according to the running-xacts snapshot.
2184 2185 2186 2187
	 */
	if (TransactionIdPrecedes(xid, snapshotOldestActiveXid))
		return;

2188 2189
	elog(trace_recovery(DEBUG4), "record known xact %u latestObservedXid %u",
					xid, latestObservedXid);
2190 2191

	/*
B
Bruce Momjian 已提交
2192 2193 2194
	 * When a newly observed xid arrives, it is frequently the case that it is
	 * *not* the next xid in sequence. When this occurs, we must treat the
	 * intervening xids as running also.
2195 2196 2197
	 */
	if (TransactionIdFollows(xid, latestObservedXid))
	{
B
Bruce Momjian 已提交
2198 2199
		TransactionId next_expected_xid = latestObservedXid;

2200 2201 2202
		TransactionIdAdvance(next_expected_xid);

		/*
B
Bruce Momjian 已提交
2203 2204 2205 2206 2207
		 * Locking requirement is currently higher than for xid assignment in
		 * normal running. However, we only get called here for new high xids
		 * - so on a multi-processor where it is common that xids arrive out
		 * of order the average number of locks per assignment will actually
		 * reduce. So not too worried about this locking.
2208
		 *
B
Bruce Momjian 已提交
2209 2210 2211 2212 2213
		 * XXX It does seem possible that we could add a whole range of
		 * numbers atomically to KnownAssignedXids, if we use a sorted list
		 * for KnownAssignedXids. But that design also increases the length of
		 * time we hold lock when we process commits/aborts, so on balance
		 * don't worry about this.
2214 2215 2216 2217 2218 2219
		 */
		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

		while (TransactionIdPrecedesOrEquals(next_expected_xid, xid))
		{
			if (TransactionIdPrecedes(next_expected_xid, xid))
2220 2221 2222
				elog(trace_recovery(DEBUG4),
					 "recording unobserved xid %u (latestObservedXid %u)",
						next_expected_xid, latestObservedXid);
2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299
			KnownAssignedXidsAdd(&next_expected_xid, 1);

			/*
			 * Extend clog and subtrans like we do in GetNewTransactionId()
			 * during normal operation
			 */
			ExtendCLOG(next_expected_xid);
			ExtendSUBTRANS(next_expected_xid);

			TransactionIdAdvance(next_expected_xid);
		}

		LWLockRelease(ProcArrayLock);

		latestObservedXid = xid;
	}

	/* nextXid must be beyond any observed xid */
	if (TransactionIdFollowsOrEquals(latestObservedXid,
									 ShmemVariableCache->nextXid))
	{
		ShmemVariableCache->nextXid = latestObservedXid;
		TransactionIdAdvance(ShmemVariableCache->nextXid);
	}
}

void
ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
									  TransactionId *subxids)
{
	int			i;
	TransactionId max_xid;

	if (standbyState == STANDBY_DISABLED)
		return;

	max_xid = TransactionIdLatest(xid, nsubxids, subxids);

	/*
	 * Uses same locking as transaction commit
	 */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

	if (TransactionIdIsValid(xid))
		KnownAssignedXidsRemove(xid);
	for (i = 0; i < nsubxids; i++)
		KnownAssignedXidsRemove(subxids[i]);

	/* Like in ProcArrayRemove, advance latestCompletedXid */
	if (TransactionIdFollowsOrEquals(max_xid,
									 ShmemVariableCache->latestCompletedXid))
		ShmemVariableCache->latestCompletedXid = max_xid;

	LWLockRelease(ProcArrayLock);
}

void
ExpireAllKnownAssignedTransactionIds(void)
{
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
	KnownAssignedXidsRemoveMany(InvalidTransactionId, false);
	LWLockRelease(ProcArrayLock);
}

void
ExpireOldKnownAssignedTransactionIds(TransactionId xid)
{
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
	KnownAssignedXidsRemoveMany(xid, true);
	LWLockRelease(ProcArrayLock);
}

/*
 * Private module functions to manipulate KnownAssignedXids
 *
 * There are 3 main users of the KnownAssignedXids data structure:
 *
B
Bruce Momjian 已提交
2300 2301 2302
 *	 * backends taking snapshots
 *	 * startup process adding new knownassigned xids
 *	 * startup process removing xids as transactions end
2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326
 *
 * If we make KnownAssignedXids a simple sorted array then the first two
 * operations are fast, but the last one is at least O(N). If we make
 * KnownAssignedXids a hash table then the last two operations are fast,
 * though we have to do more work at snapshot time. Doing more work at
 * commit could slow down taking snapshots anyway because of lwlock
 * contention. Scanning the hash table is O(N) on the max size of the array,
 * so performs poorly in comparison when we have very low numbers of
 * write transactions to process. But at least it is constant overhead
 * and a sequential memory scan will utilise hardware memory readahead
 * to give much improved performance. In any case the emphasis must be on
 * having the standby process changes quickly so that it can provide
 * high availability. So we choose to implement as a hash table.
 */

/*
 * Add xids into KnownAssignedXids.
 *
 * Must be called while holding ProcArrayLock in Exclusive mode
 */
static void
KnownAssignedXidsAdd(TransactionId *xids, int nxids)
{
	TransactionId *result;
B
Bruce Momjian 已提交
2327 2328
	bool		found;
	int			i;
2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340

	for (i = 0; i < nxids; i++)
	{
		Assert(TransactionIdIsValid(xids[i]));

		elog(trace_recovery(DEBUG4), "adding KnownAssignedXid %u", xids[i]);

		procArray->numKnownAssignedXids++;
		if (procArray->numKnownAssignedXids > procArray->maxKnownAssignedXids)
		{
			KnownAssignedXidsDisplay(LOG);
			LWLockRelease(ProcArrayLock);
2341
			elog(ERROR, "too many KnownAssignedXids (%u)", procArray->maxKnownAssignedXids);
2342 2343 2344
		}

		result = (TransactionId *) hash_search(KnownAssignedXidsHash, &xids[i], HASH_ENTER,
B
Bruce Momjian 已提交
2345
											   &found);
2346 2347 2348 2349 2350

		if (!result)
		{
			LWLockRelease(ProcArrayLock);
			ereport(ERROR,
B
Bruce Momjian 已提交
2351 2352
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of shared memory")));
2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371
		}

		if (found)
		{
			KnownAssignedXidsDisplay(LOG);
			LWLockRelease(ProcArrayLock);
			elog(ERROR, "found duplicate KnownAssignedXid %u", xids[i]);
		}
	}
}

/*
 * Is an xid present in KnownAssignedXids?
 *
 * Must be called while holding ProcArrayLock in shared mode
 */
static bool
KnownAssignedXidsExist(TransactionId xid)
{
B
Bruce Momjian 已提交
2372 2373
	bool		found;

2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385
	(void) hash_search(KnownAssignedXidsHash, &xid, HASH_FIND, &found);
	return found;
}

/*
 * Remove one xid from anywhere in KnownAssignedXids.
 *
 * Must be called while holding ProcArrayLock in Exclusive mode
 */
static void
KnownAssignedXidsRemove(TransactionId xid)
{
B
Bruce Momjian 已提交
2386
	bool		found;
2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398

	Assert(TransactionIdIsValid(xid));

	elog(trace_recovery(DEBUG4), "remove KnownAssignedXid %u", xid);

	(void) hash_search(KnownAssignedXidsHash, &xid, HASH_REMOVE, &found);

	if (found)
		procArray->numKnownAssignedXids--;
	Assert(procArray->numKnownAssignedXids >= 0);

	/*
B
Bruce Momjian 已提交
2399 2400 2401
	 * We can fail to find an xid if the xid came from a subtransaction that
	 * aborts, though the xid hadn't yet been reported and no WAL records have
	 * been written using the subxid. In that case the abort record will
2402 2403
	 * contain that subxid and we haven't seen it before.
	 *
B
Bruce Momjian 已提交
2404 2405 2406
	 * If we fail to find it for other reasons it might be a problem, but it
	 * isn't much use to log that it happened, since we can't divine much from
	 * just an isolated xid value.
2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431
	 */
}

/*
 * KnownAssignedXidsGet - Get an array of xids by scanning KnownAssignedXids.
 * We filter out anything higher than xmax.
 *
 * Must be called while holding ProcArrayLock (in shared mode)
 */
static int
KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax)
{
	TransactionId xtmp = InvalidTransactionId;

	return KnownAssignedXidsGetAndSetXmin(xarray, &xtmp, xmax);
}

/*
 * KnownAssignedXidsGetAndSetXmin - as KnownAssignedXidsGet, plus we reduce *xmin
 * to the lowest xid value seen if not already lower.
 *
 * Must be called while holding ProcArrayLock (in shared mode)
 */
static int
KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
B
Bruce Momjian 已提交
2432
							   TransactionId xmax)
2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467
{
	HASH_SEQ_STATUS status;
	TransactionId *knownXid;
	int			count = 0;

	hash_seq_init(&status, KnownAssignedXidsHash);
	while ((knownXid = (TransactionId *) hash_seq_search(&status)) != NULL)
	{
		/*
		 * Filter out anything higher than xmax
		 */
		if (TransactionIdPrecedes(xmax, *knownXid))
			continue;

		*xarray = *knownXid;
		xarray++;
		count++;

		/* update xmin if required */
		if (TransactionIdPrecedes(*knownXid, *xmin))
			*xmin = *knownXid;
	}

	return count;
}

/*
 * Prune KnownAssignedXids up to, but *not* including xid. If xid is invalid
 * then clear the whole table.
 *
 * Must be called while holding ProcArrayLock in Exclusive mode.
 */
static void
KnownAssignedXidsRemoveMany(TransactionId xid, bool keepPreparedXacts)
{
B
Bruce Momjian 已提交
2468
	TransactionId *knownXid;
2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479
	HASH_SEQ_STATUS status;

	if (TransactionIdIsValid(xid))
		elog(trace_recovery(DEBUG4), "prune KnownAssignedXids to %u", xid);
	else
		elog(trace_recovery(DEBUG4), "removing all KnownAssignedXids");

	hash_seq_init(&status, KnownAssignedXidsHash);
	while ((knownXid = (TransactionId *) hash_seq_search(&status)) != NULL)
	{
		TransactionId removeXid = *knownXid;
B
Bruce Momjian 已提交
2480
		bool		found;
2481 2482 2483

		if (!TransactionIdIsValid(xid) || TransactionIdPrecedes(removeXid, xid))
		{
2484
			if (keepPreparedXacts && StandbyTransactionIdIsPrepared(removeXid))
2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502
				continue;
			else
			{
				(void) hash_search(KnownAssignedXidsHash, &removeXid,
								   HASH_REMOVE, &found);
				if (found)
					procArray->numKnownAssignedXids--;
				Assert(procArray->numKnownAssignedXids >= 0);
			}
		}
	}
}

/*
 * Display KnownAssignedXids to provide debug trail
 *
 * Must be called while holding ProcArrayLock (in shared mode)
 */
T
Tom Lane 已提交
2503
static void
2504 2505 2506 2507 2508
KnownAssignedXidsDisplay(int trace_level)
{
	HASH_SEQ_STATUS status;
	TransactionId *knownXid;
	StringInfoData buf;
B
Bruce Momjian 已提交
2509 2510 2511
	TransactionId *xids;
	int			nxids;
	int			i;
2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530

	xids = palloc(sizeof(TransactionId) * TOTAL_MAX_CACHED_SUBXIDS);
	nxids = 0;

	hash_seq_init(&status, KnownAssignedXidsHash);
	while ((knownXid = (TransactionId *) hash_seq_search(&status)) != NULL)
		xids[nxids++] = *knownXid;

	qsort(xids, nxids, sizeof(TransactionId), xidComparator);

	initStringInfo(&buf);

	for (i = 0; i < nxids; i++)
		appendStringInfo(&buf, "%u ", xids[i]);

	elog(trace_level, "%d KnownAssignedXids %s", nxids, buf.data);

	pfree(buf.data);
}