procarray.c 132.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12
/*-------------------------------------------------------------------------
 *
 * procarray.c
 *	  POSTGRES process array code.
 *
 *
 * This module maintains an unsorted array of the PGPROC structures for all
 * active backends.  Although there are several uses for this, the principal
 * one is as a means of determining the set of currently running transactions.
 *
 * Because of various subtle race conditions it is critical that a backend
 * hold the correct locks while setting or clearing its MyProc->xid field.
13
 * See notes in src/backend/access/transam/README.
14 15 16
 *
 * The process array now also includes PGPROC structures representing
 * prepared transactions.  The xid and subxids fields of these are valid,
17 18
 * as are the myProcLocks lists.  They can be distinguished from regular
 * backend PGPROCs at need by checking for pid == 0.
B
Bruce Momjian 已提交
19
 *
20 21
 * During hot standby, we also keep a list of XIDs representing transactions
 * that are known to be running in the master (or more precisely, were running
B
Bruce Momjian 已提交
22
 * as of the current point in the WAL stream).	This list is kept in the
23 24 25
 * KnownAssignedXids array, and is updated by watching the sequence of
 * arriving XIDs.  This is necessary because if we leave those XIDs out of
 * snapshots taken for standby queries, then they will appear to be already
B
Bruce Momjian 已提交
26
 * complete, leading to MVCC failures.	Note that in hot standby, the PGPROC
27 28
 * array represents standby processes, which by definition are not running
 * transactions that have XIDs.
29
 *
30 31 32 33
 * It is perhaps possible for a backend on the master to terminate without
 * writing an abort record for its transaction.  While that shouldn't really
 * happen, it would tie up KnownAssignedXids indefinitely, so we protect
 * ourselves by pruning the array when a valid list of running XIDs arrives.
34
 *
35
 * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
36 37 38 39
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
B
Bruce Momjian 已提交
40
 *	  $PostgreSQL: pgsql/src/backend/storage/ipc/procarray.c,v 1.72 2010/07/06 19:18:57 momjian Exp $
41 42 43 44 45
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

46
#include <signal.h>
47

48
#include "access/clog.h"
49
#include "access/distributedlog.h"
50
#include "access/subtrans.h"
51 52
#include "access/transam.h"
#include "access/xact.h"
53
#include "access/twophase.h"
54
#include "miscadmin.h"
G
Gang Xiong 已提交
55
#include "port/atomics.h"
56
#include "storage/procarray.h"
57
#include "storage/spin.h"
58 59
#include "storage/standby.h"
#include "utils/builtins.h"
60
#include "utils/combocid.h"
61
#include "utils/snapmgr.h"
62
#include "utils/tqual.h"
63 64 65 66
#include "utils/guc.h"
#include "utils/memutils.h"

#include "access/xact.h"		/* setting the shared xid */
67

68
#include "cdb/cdbtm.h"
69
#include "cdb/cdbvars.h"
70 71
#include "utils/faultinjector.h"
#include "utils/sharedsnapshot.h"
72 73 74 75 76 77 78

/* Our shared memory area */
typedef struct ProcArrayStruct
{
	int			numProcs;		/* number of valid procs entries */
	int			maxProcs;		/* allocated size of procs array */

79 80 81 82 83 84 85
	/*
	 * Known assigned XIDs handling
	 */
	int			maxKnownAssignedXids;	/* allocated size of array */
	int			numKnownAssignedXids;	/* currrent # of valid entries */
	int			tailKnownAssignedXids;	/* index of oldest valid element */
	int			headKnownAssignedXids;	/* index of newest element, + 1 */
B
Bruce Momjian 已提交
86
	slock_t		known_assigned_xids_lck;		/* protects head/tail pointers */
B
Bruce Momjian 已提交
87

88
	/*
89 90 91 92 93
	 * Highest subxid that has been removed from KnownAssignedXids array to
	 * prevent overflow; or InvalidTransactionId if none.  We track this for
	 * similar reasons to tracking overflowing cached subxids in PGPROC
	 * entries.  Must hold exclusive ProcArrayLock to change this, and shared
	 * lock to read it.
94
	 */
B
Bruce Momjian 已提交
95
	TransactionId lastOverflowedXid;
96

97
	/*
B
Bruce Momjian 已提交
98 99
	 * We declare procs[] as 1 entry because C wants a fixed-size array, but
	 * actually it is maxProcs entries long.
100 101 102 103 104 105
	 */
	PGPROC	   *procs[1];		/* VARIABLE LENGTH ARRAY */
} ProcArrayStruct;

static ProcArrayStruct *procArray;

106 107 108
/*
 * Bookkeeping for tracking emulated transactions in recovery
 */
109 110
static TransactionId *KnownAssignedXids;
static bool *KnownAssignedXidsValid;
B
Bruce Momjian 已提交
111
static TransactionId latestObservedXid = InvalidTransactionId;
112 113 114 115 116 117 118

/*
 * If we're in STANDBY_SNAPSHOT_PENDING state, standbySnapshotPendingXmin is
 * the highest xid that might still be running that we don't have in
 * KnownAssignedXids.
 */
static TransactionId standbySnapshotPendingXmin;
119 120 121 122 123

#ifdef XIDCACHE_DEBUG

/* counters for XidCache measurement */
static long xc_by_recent_xmin = 0;
124
static long xc_by_known_xact = 0;
125
static long xc_by_my_xact = 0;
126
static long xc_by_latest_xid = 0;
127 128
static long xc_by_main_xid = 0;
static long xc_by_child_xid = 0;
129
static long xc_by_known_assigned = 0;
130
static long xc_no_overflow = 0;
131 132 133
static long xc_slow_answer = 0;

#define xc_by_recent_xmin_inc()		(xc_by_recent_xmin++)
134
#define xc_by_known_xact_inc()		(xc_by_known_xact++)
135
#define xc_by_my_xact_inc()			(xc_by_my_xact++)
136
#define xc_by_latest_xid_inc()		(xc_by_latest_xid++)
137 138
#define xc_by_main_xid_inc()		(xc_by_main_xid++)
#define xc_by_child_xid_inc()		(xc_by_child_xid++)
139
#define xc_by_known_assigned_inc()	(xc_by_known_assigned++)
140
#define xc_no_overflow_inc()		(xc_no_overflow++)
141 142 143 144 145 146
#define xc_slow_answer_inc()		(xc_slow_answer++)

static void DisplayXidCache(void);
#else							/* !XIDCACHE_DEBUG */

#define xc_by_recent_xmin_inc()		((void) 0)
147
#define xc_by_known_xact_inc()		((void) 0)
148
#define xc_by_my_xact_inc()			((void) 0)
149
#define xc_by_latest_xid_inc()		((void) 0)
150 151
#define xc_by_main_xid_inc()		((void) 0)
#define xc_by_child_xid_inc()		((void) 0)
152
#define xc_by_known_assigned_inc()	((void) 0)
153
#define xc_no_overflow_inc()		((void) 0)
154 155 156
#define xc_slow_answer_inc()		((void) 0)
#endif   /* XIDCACHE_DEBUG */

157
/* Primitives for KnownAssignedXids array handling for standby */
158 159
static void KnownAssignedXidsCompress(bool force);
static void KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
B
Bruce Momjian 已提交
160
					 bool exclusive_lock);
161 162
static bool KnownAssignedXidsSearch(TransactionId xid, bool remove);
static bool KnownAssignedXidExists(TransactionId xid);
163
static void KnownAssignedXidsRemove(TransactionId xid);
164
static void KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
B
Bruce Momjian 已提交
165
							TransactionId *subxids);
166
static void KnownAssignedXidsRemovePreceding(TransactionId xid);
B
Bruce Momjian 已提交
167
static int	KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax);
168
static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray,
B
Bruce Momjian 已提交
169 170
							   TransactionId *xmin,
							   TransactionId xmax);
171
static void KnownAssignedXidsDisplay(int trace_level);
172

173 174 175
/*
 * Report shared-memory space needed by CreateSharedProcArray.
 */
176
Size
177
ProcArrayShmemSize(void)
178
{
179 180
	Size		size;

181 182
	/* Size of the ProcArray structure itself */
#define PROCARRAY_MAXPROCS	(MaxBackends + max_prepared_xacts)
183

184
	size = offsetof(ProcArrayStruct, procs);
185 186 187
	size = add_size(size, mul_size(sizeof(PGPROC *), PROCARRAY_MAXPROCS));

	/*
188
	 * During Hot Standby processing we have a data structure called
189 190 191 192 193 194
	 * KnownAssignedXids, created in shared memory. Local data structures are
	 * also created in various backends during GetSnapshotData(),
	 * TransactionIdIsInProgress() and GetRunningTransactionData(). All of the
	 * main structures created in those functions must be identically sized,
	 * since we may at times copy the whole of the data structures around. We
	 * refer to this size as TOTAL_MAX_CACHED_SUBXIDS.
195
	 *
B
Bruce Momjian 已提交
196 197 198
	 * Ideally we'd only create this structure if we were actually doing hot
	 * standby in the current run, but we don't know that yet at the time
	 * shared memory is being set up.
199
	 */
200 201 202
#define TOTAL_MAX_CACHED_SUBXIDS \
	((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)

203
	if (EnableHotStandby)
204 205 206 207
	{
		size = add_size(size,
						mul_size(sizeof(TransactionId),
								 TOTAL_MAX_CACHED_SUBXIDS));
208
		size = add_size(size,
209 210
						mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS));
	}
211 212

	return size;
213 214 215 216 217 218
}

/*
 * Initialize the shared PGPROC array during postmaster startup.
 */
void
219
CreateSharedProcArray(void)
220 221 222 223 224
{
	bool		found;

	/* Create or attach to the ProcArray shared structure */
	procArray = (ProcArrayStruct *)
225
		ShmemInitStruct("Proc Array",
226 227 228
						add_size(offsetof(ProcArrayStruct, procs),
								 mul_size(sizeof(PGPROC *),
										  PROCARRAY_MAXPROCS)),
229
						&found);
230 231 232 233 234 235 236

	if (!found)
	{
		/*
		 * We're the first - initialize.
		 */
		procArray->numProcs = 0;
237
		procArray->maxProcs = PROCARRAY_MAXPROCS;
238
		procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS;
239 240 241 242
		procArray->numKnownAssignedXids = 0;
		procArray->tailKnownAssignedXids = 0;
		procArray->headKnownAssignedXids = 0;
		SpinLockInit(&procArray->known_assigned_xids_lck);
243 244
		procArray->lastOverflowedXid = InvalidTransactionId;
	}
245

246
	/* Create or attach to the KnownAssignedXids arrays too, if needed */
247
	if (EnableHotStandby)
248
	{
249 250 251 252 253 254 255 256 257
		KnownAssignedXids = (TransactionId *)
			ShmemInitStruct("KnownAssignedXids",
							mul_size(sizeof(TransactionId),
									 TOTAL_MAX_CACHED_SUBXIDS),
							&found);
		KnownAssignedXidsValid = (bool *)
			ShmemInitStruct("KnownAssignedXidsValid",
							mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS),
							&found);
258 259 260 261
	}
}

/*
262
 * Add the specified PGPROC to the shared array.
263 264
 */
void
265
ProcArrayAdd(PGPROC *proc)
266 267 268 269 270
{
	ProcArrayStruct *arrayP = procArray;

	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

271
	SIMPLE_FAULT_INJECTOR(ProcArray_Add);
272

273 274 275
	if (arrayP->numProcs >= arrayP->maxProcs)
	{
		/*
B
Bruce Momjian 已提交
276 277 278
		 * Ooops, no room.	(This really shouldn't happen, since there is a
		 * fixed supply of PGPROC structs too, and so we should have failed
		 * earlier.)
279 280 281 282 283 284 285
		 */
		LWLockRelease(ProcArrayLock);
		ereport(FATAL,
				(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
				 errmsg("sorry, too many clients already")));
	}

286
	arrayP->procs[arrayP->numProcs] = proc;
287 288 289 290 291 292
	arrayP->numProcs++;

	LWLockRelease(ProcArrayLock);
}

/*
293
 * Remove the specified PGPROC from the shared array.
294 295 296 297 298 299 300
 *
 * When latestXid is a valid XID, we are removing a live 2PC gxact from the
 * array, and thus causing it to appear as "not running" anymore.  In this
 * case we must advance latestCompletedXid.  (This is essentially the same
 * as ProcArrayEndTransaction followed by removal of the PGPROC, but we take
 * the ProcArrayLock only once, and don't damage the content of the PGPROC;
 * twophase.c depends on the latter.)
301 302
 */
void
303
ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
304 305 306 307 308
{
	ProcArrayStruct *arrayP = procArray;
	int			index;

#ifdef XIDCACHE_DEBUG
309 310 311
	/* dump stats at backend shutdown, but not prepared-xact end */
	if (proc->pid != 0)
		DisplayXidCache();
312 313 314 315
#endif

	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

316 317 318 319 320 321 322 323 324 325 326 327 328 329 330
	if (TransactionIdIsValid(latestXid))
	{
		Assert(TransactionIdIsValid(proc->xid));

		/* Advance global latestCompletedXid while holding the lock */
		if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
								  latestXid))
			ShmemVariableCache->latestCompletedXid = latestXid;
	}
	else
	{
		/* Shouldn't be trying to remove a live transaction here */
		Assert(!TransactionIdIsValid(proc->xid));
	}

331 332
	for (index = 0; index < arrayP->numProcs; index++)
	{
333
		if (arrayP->procs[index] == proc)
334 335
		{
			arrayP->procs[index] = arrayP->procs[arrayP->numProcs - 1];
336
			arrayP->procs[arrayP->numProcs - 1] = NULL; /* for debugging */
337 338 339 340 341 342 343 344 345
			arrayP->numProcs--;
			LWLockRelease(ProcArrayLock);
			return;
		}
	}

	/* Ooops */
	LWLockRelease(ProcArrayLock);

346
	elog(LOG, "failed to find proc %p in ProcArray", proc);
347 348 349
}


G
Gang Xiong 已提交
350 351 352 353 354 355 356
void
ProcArrayEndGxact(void)
{
	Assert(LWLockHeldByMe(ProcArrayLock));
	initGxact(&MyProc->gxact);
}

357 358 359 360 361 362 363 364 365 366 367 368
/*
 * ProcArrayEndTransaction -- mark a transaction as no longer running
 *
 * This is used interchangeably for commit and abort cases.  The transaction
 * commit/abort must already be reported to WAL and pg_clog.
 *
 * proc is currently always MyProc, but we pass it explicitly for flexibility.
 * latestXid is the latest Xid among the transaction's main XID and
 * subtransactions, or InvalidTransactionId if it has no XID.  (We must ask
 * the caller to pass latestXid, instead of computing it from the PGPROC's
 * contents, because the subxid information in the PGPROC might be
 * incomplete.)
369 370
 *
 * GPDB: If this is a global transaction, we might need to do this action
371 372 373 374
 * later, rather than now. In that case, this function returns true for
 * needNotifyCommittedDtxTransaction, and does *not* change the state of the
 * PGPROC entry. This can only happen for commit; when !isCommit, this always
 * clears the PGPROC entry.
375
 */
376 377
bool
ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid, bool isCommit)
378
{
379
	bool needNotifyCommittedDtxTransaction;
380

381 382 383 384 385
	/*
	 * MyProc->localDistribXactData is only used for debugging purpose by
	 * backend itself on segments only hence okay to modify without holding
	 * the lock.
	 */
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
	if (MyProc->localDistribXactData.state != LOCALDISTRIBXACT_STATE_NONE)
	{
		switch (DistributedTransactionContext)
		{
			case DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER:
			case DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER:
			case DTX_CONTEXT_QE_AUTO_COMMIT_IMPLICIT:
				LocalDistribXact_ChangeState(MyProc,
											 isCommit ?
											 LOCALDISTRIBXACT_STATE_COMMITTED :
											 LOCALDISTRIBXACT_STATE_ABORTED);
				break;

			case DTX_CONTEXT_QE_READER:
			case DTX_CONTEXT_QE_ENTRY_DB_SINGLETON:
				// QD or QE Writer will handle it.
				break;

404
			case DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE:
405 406 407 408 409 410 411 412 413 414 415 416 417
			case DTX_CONTEXT_QD_RETRY_PHASE_2:
			case DTX_CONTEXT_QE_PREPARED:
			case DTX_CONTEXT_QE_FINISH_PREPARED:
				elog(PANIC, "Unexpected distribute transaction context: '%s'",
					 DtxContextToString(DistributedTransactionContext));

			default:
				elog(PANIC, "Unrecognized DTX transaction context: %d",
					 (int) DistributedTransactionContext);
		}
	}

	if (isCommit && notifyCommittedDtxTransactionIsNeeded())
418 419 420
		needNotifyCommittedDtxTransaction = true;
	else
		needNotifyCommittedDtxTransaction = false;
421
	
422 423
	if (TransactionIdIsValid(latestXid))
	{
424
		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
425
		/*
B
Bruce Momjian 已提交
426 427 428
		 * We must lock ProcArrayLock while clearing proc->xid, so that we do
		 * not exit the set of "running" transactions while someone else is
		 * taking a snapshot.  See discussion in
429 430
		 * src/backend/access/transam/README.
		 */
431 432
		Assert(TransactionIdIsValid(proc->xid) ||
			   (IsBootstrapProcessingMode() && latestXid == BootstrapTransactionId));
433

434
		if (! needNotifyCommittedDtxTransaction)
435 436 437 438 439 440 441
		{
			proc->xid = InvalidTransactionId;
			proc->lxid = InvalidLocalTransactionId;
			proc->xmin = InvalidTransactionId;
			/* must be cleared with xid/xmin: */
			proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
			proc->inCommit = false; /* be sure this is cleared in abort */
442
			proc->recoveryConflictPending = false;
443 444 445 446 447 448 449
			proc->serializableIsoLevel = false;
			proc->inDropTransaction = false;

			/* Clear the subtransaction-XID cache too while holding the lock */
			proc->subxids.nxids = 0;
			proc->subxids.overflowed = false;
		}
450 451

		/* Also advance global latestCompletedXid while holding the lock */
452 453 454 455 456 457 458
		/*
		 * Note: we do this in GPDB even if we didn't clear our XID entry
		 * just yet. There is no harm in advancing latestCompletedXid a
		 * little bit earlier than strictly necessary, and this way we don't
		 * need to remember out latest XID when we later actually clear the
		 * entry.
		 */
459 460 461
		if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
								  latestXid))
			ShmemVariableCache->latestCompletedXid = latestXid;
462 463

		LWLockRelease(ProcArrayLock);
464 465 466 467
	}
	else
	{
		/*
B
Bruce Momjian 已提交
468 469 470
		 * If we have no XID, we don't need to lock, since we won't affect
		 * anyone else's calculation of a snapshot.  We might change their
		 * estimate of global xmin, but that's OK.
471 472 473 474 475
		 */
		Assert(!TransactionIdIsValid(proc->xid));

		proc->lxid = InvalidLocalTransactionId;
		proc->xmin = InvalidTransactionId;
476 477
		/* must be cleared with xid/xmin: */
		proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
B
Bruce Momjian 已提交
478
		proc->inCommit = false; /* be sure this is cleared in abort */
479
		proc->recoveryConflictPending = false;
480 481
		proc->serializableIsoLevel = false;
		proc->inDropTransaction = false;
482 483 484 485

		Assert(proc->subxids.nxids == 0);
		Assert(proc->subxids.overflowed == false);
	}
486 487

	return needNotifyCommittedDtxTransaction;
488 489 490 491 492 493 494 495 496 497
}


/*
 * ProcArrayClearTransaction -- clear the transaction fields
 *
 * This is used after successfully preparing a 2-phase transaction.  We are
 * not actually reporting the transaction's XID as no longer running --- it
 * will still appear as running because the 2PC's gxact is in the ProcArray
 * too.  We just have to clear out our own PGPROC.
498
 *
499 500
 */
void
501
ProcArrayClearTransaction(PGPROC *proc, bool commit)
502 503 504
{
	/*
	 * We can skip locking ProcArrayLock here, because this action does not
B
Bruce Momjian 已提交
505 506
	 * actually change anyone's view of the set of running XIDs: our entry is
	 * duplicate with the gxact that has already been inserted into the
507 508 509 510
	 * ProcArray.
	 */
	proc->xid = InvalidTransactionId;
	proc->xmin = InvalidTransactionId;
511
	proc->recoveryConflictPending = false;
512

513 514
	proc->localDistribXactData.state = LOCALDISTRIBXACT_STATE_NONE;

515 516
	/* redundant, but just in case */
	proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
517 518
	proc->serializableIsoLevel = false;
	proc->inDropTransaction = false;
519 520 521 522

	/* Clear the subtransaction-XID cache too */
	proc->subxids.nxids = 0;
	proc->subxids.overflowed = false;
523 524 525 526 527 528 529 530 531 532 533

	/* For commit, inCommit and lxid are cleared in CommitTransaction after
	 * performing PT operations. It's done this way to correctly block
	 * checkpoint till CommitTransaction completes the persistent table
	 * updates.
	 */
	if (! commit)
	{
		proc->lxid = InvalidLocalTransactionId;
		proc->inCommit = false;
	}
534 535
}

536 537 538 539 540 541
/*
 * Clears the current transaction from PGPROC.
 *
 * Must be called while holding the ProcArrayLock.
 */
void
542
ClearTransactionFromPgProc_UnderLock(PGPROC *proc, bool commit)
543 544 545 546 547
{
	/*
	 * ProcArrayClearTransaction() doesn't take the lock, so we can just call it
	 * directly.
	 */
548
	ProcArrayClearTransaction(proc, commit);
549
}
550

551 552 553 554 555 556
/*
 * ProcArrayInitRecoveryInfo
 *
 * When trying to assemble our snapshot we only care about xids after this value.
 * See comments for LogStandbySnapshot().
 */
557 558 559
void
ProcArrayInitRecoveryInfo(TransactionId oldestActiveXid)
{
560 561
	latestObservedXid = oldestActiveXid;
	TransactionIdRetreat(latestObservedXid);
562 563 564 565 566 567 568 569 570 571
}

/*
 * ProcArrayApplyRecoveryInfo -- apply recovery info about xids
 *
 * Takes us through 3 states: Uninitialized, Pending and Ready.
 * Normal case is to go all the way to Ready straight away, though there
 * are atypical cases where we need to take it in steps.
 *
 * Use the data about running transactions on master to create the initial
572
 * state of KnownAssignedXids. We also use these records to regularly prune
573
 * KnownAssignedXids because we know it is possible that some transactions
574
 * with FATAL errors fail to write abort records, which could cause eventual
575 576
 * overflow.
 *
577
 * See comments for LogStandbySnapshot().
578 579 580 581
 */
void
ProcArrayApplyRecoveryInfo(RunningTransactions running)
{
B
Bruce Momjian 已提交
582
	TransactionId *xids;
B
Bruce Momjian 已提交
583
	int			nxids;
584
	TransactionId nextXid;
B
Bruce Momjian 已提交
585
	int			i;
586 587

	Assert(standbyState >= STANDBY_INITIALIZED);
588 589 590
	Assert(TransactionIdIsValid(running->nextXid));
	Assert(TransactionIdIsValid(running->oldestRunningXid));
	Assert(TransactionIdIsNormal(running->latestCompletedXid));
591 592 593 594 595 596 597 598 599 600 601 602 603 604

	/*
	 * Remove stale transactions, if any.
	 */
	ExpireOldKnownAssignedTransactionIds(running->oldestRunningXid);
	StandbyReleaseOldLocks(running->oldestRunningXid);

	/*
	 * If our snapshot is already valid, nothing else to do...
	 */
	if (standbyState == STANDBY_SNAPSHOT_READY)
		return;

	/*
B
Bruce Momjian 已提交
605 606 607 608 609 610 611 612
	 * If our initial RunningXactData had an overflowed snapshot then we knew
	 * we were missing some subxids from our snapshot. We can use this data as
	 * an initial snapshot, but we cannot yet mark it valid. We know that the
	 * missing subxids are equal to or earlier than nextXid. After we
	 * initialise we continue to apply changes during recovery, so once the
	 * oldestRunningXid is later than the nextXid from the initial snapshot we
	 * know that we no longer have missing information and can mark the
	 * snapshot as valid.
613 614 615 616 617 618 619 620
	 */
	if (standbyState == STANDBY_SNAPSHOT_PENDING)
	{
		if (TransactionIdPrecedes(standbySnapshotPendingXmin,
								  running->oldestRunningXid))
		{
			standbyState = STANDBY_SNAPSHOT_READY;
			elog(trace_recovery(DEBUG2),
B
Bruce Momjian 已提交
621
				 "running xact data now proven complete");
622
			elog(trace_recovery(DEBUG2),
B
Bruce Momjian 已提交
623
				 "recovery snapshots are now enabled");
624
		}
625 626 627
		else
			elog(trace_recovery(DEBUG2),
				 "recovery snapshot waiting for %u oldest active xid on standby is %u",
B
Bruce Momjian 已提交
628 629
				 standbySnapshotPendingXmin,
				 running->oldestRunningXid);
630 631 632
		return;
	}

633 634
	Assert(standbyState == STANDBY_INITIALIZED);

635 636 637 638 639
	/*
	 * OK, we need to initialise from the RunningXactData record
	 */

	/*
640
	 * Remove all xids except xids later than the snapshot. We don't know
B
Bruce Momjian 已提交
641 642
	 * exactly which ones that is until precisely now, so that is why we allow
	 * xids to be added only to remove most of them again here.
643
	 */
644 645
	ExpireOldKnownAssignedTransactionIds(running->nextXid);
	StandbyReleaseOldLocks(running->nextXid);
646

647 648 649 650
	/*
	 * Nobody else is running yet, but take locks anyhow
	 */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
651 652

	/*
653
	 * Combine the running xact data with already known xids, if any exist.
B
Bruce Momjian 已提交
654 655
	 * KnownAssignedXids is sorted so we cannot just add new xids, we have to
	 * combine them first, sort them and then re-add to KnownAssignedXids.
656
	 *
B
Bruce Momjian 已提交
657 658 659 660 661 662
	 * Some of the new xids are top-level xids and some are subtransactions.
	 * We don't call SubtransSetParent because it doesn't matter yet. If we
	 * aren't overflowed then all xids will fit in snapshot and so we don't
	 * need subtrans. If we later overflow, an xid assignment record will add
	 * xids to subtrans. If RunningXacts is overflowed then we don't have
	 * enough information to correctly update subtrans anyway.
663 664 665
	 */

	/*
B
Bruce Momjian 已提交
666 667
	 * Allocate a temporary array so we can combine xids. The total of both
	 * arrays should never normally exceed TOTAL_MAX_CACHED_SUBXIDS.
668
	 */
669 670 671
	xids = palloc(sizeof(TransactionId) * TOTAL_MAX_CACHED_SUBXIDS);

	/*
B
Bruce Momjian 已提交
672 673
	 * Get the remaining KnownAssignedXids. In most cases there won't be any
	 * at all since this exists only to catch a theoretical race condition.
674 675 676 677
	 */
	nxids = KnownAssignedXidsGet(xids, InvalidTransactionId);
	if (nxids > 0)
		KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
678

679
	/*
B
Bruce Momjian 已提交
680 681
	 * Now we have a copy of any KnownAssignedXids we can zero the array
	 * before we re-insertion of combined snapshot.
682 683
	 */
	KnownAssignedXidsRemovePreceding(InvalidTransactionId);
684 685

	/*
B
Bruce Momjian 已提交
686 687
	 * Add to the temp array any xids which have not already completed, taking
	 * care not to overflow in extreme cases.
688
	 */
689
	for (i = 0; i < running->xcnt; i++)
690
	{
691
		TransactionId xid = running->xids[i];
692 693

		/*
694 695 696
		 * The running-xacts snapshot can contain xids that were running at
		 * the time of the snapshot, yet complete before the snapshot was
		 * written to WAL. They're running now, so ignore them.
697 698 699 700
		 */
		if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
			continue;

701 702 703 704 705 706 707 708
		xids[nxids++] = xid;

		/*
		 * Test for overflow only after we have filtered out already complete
		 * transactions.
		 */
		if (nxids > TOTAL_MAX_CACHED_SUBXIDS)
			elog(ERROR, "too many xids to add into KnownAssignedXids");
709 710
	}

711 712 713
	if (nxids > 0)
	{
		/*
B
Bruce Momjian 已提交
714 715
		 * Sort the array so that we can add them safely into
		 * KnownAssignedXids.
716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737
		 */
		qsort(xids, nxids, sizeof(TransactionId), xidComparator);

		/*
		 * Re-initialise latestObservedXid to the highest xid we've seen.
		 */
		latestObservedXid = xids[nxids - 1];

		/*
		 * Add the sorted snapshot into KnownAssignedXids
		 */
		for (i = 0; i < nxids; i++)
		{
			TransactionId xid = xids[i];

			KnownAssignedXidsAdd(xid, xid, true);
		}

		KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
	}

	pfree(xids);
738 739

	/*
B
Bruce Momjian 已提交
740 741
	 * Now we've got the running xids we need to set the global values thare
	 * used to track snapshots as they evolve further
742
	 *
B
Bruce Momjian 已提交
743 744
	 * * latestCompletedXid which will be the xmax for snapshots *
	 * lastOverflowedXid which shows whether snapshots overflow * nextXid
745 746 747
	 *
	 * If the snapshot overflowed, then we still initialise with what we know,
	 * but the recovery snapshot isn't fully valid yet because we know there
B
Bruce Momjian 已提交
748 749 750
	 * are some subxids missing. We don't know the specific subxids that are
	 * missing, so conservatively assume the last one is latestObservedXid.
	 * If no missing subxids, try to clear lastOverflowedXid.
751 752
	 *
	 * If the snapshot didn't overflow it's still possible that an overflow
B
Bruce Momjian 已提交
753 754
	 * occurred in the gap between taking snapshot and logging record, so we
	 * also need to check if lastOverflowedXid is already ahead of us.
755
	 */
756 757
	if (running->subxid_overflow)
	{
758 759 760
		standbyState = STANDBY_SNAPSHOT_PENDING;

		standbySnapshotPendingXmin = latestObservedXid;
761 762 763 764
		if (TransactionIdFollows(latestObservedXid,
								 procArray->lastOverflowedXid))
			procArray->lastOverflowedXid = latestObservedXid;
	}
765
	else if (TransactionIdFollows(procArray->lastOverflowedXid,
B
Bruce Momjian 已提交
766
								  latestObservedXid))
767 768 769 770 771 772 773 774 775 776 777
	{
		standbyState = STANDBY_SNAPSHOT_PENDING;

		standbySnapshotPendingXmin = procArray->lastOverflowedXid;
	}
	else
	{
		standbyState = STANDBY_SNAPSHOT_READY;

		standbySnapshotPendingXmin = InvalidTransactionId;
		if (TransactionIdFollows(running->oldestRunningXid,
B
Bruce Momjian 已提交
778
								 procArray->lastOverflowedXid))
779 780 781 782 783 784 785 786 787 788 789
			procArray->lastOverflowedXid = InvalidTransactionId;
	}

	/*
	 * If a transaction completed in the gap between taking and logging the
	 * snapshot then latestCompletedXid may already be higher than the value
	 * from the snapshot, so check before we use the incoming value.
	 */
	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
							  running->latestCompletedXid))
		ShmemVariableCache->latestCompletedXid = running->latestCompletedXid;
790 791

	/* nextXid must be beyond any observed xid */
792 793 794 795
	nextXid = latestObservedXid;
	TransactionIdAdvance(nextXid);
	if (TransactionIdFollows(nextXid, ShmemVariableCache->nextXid))
		ShmemVariableCache->nextXid = nextXid;
796

797 798 799
	Assert(TransactionIdIsNormal(ShmemVariableCache->latestCompletedXid));
	Assert(TransactionIdIsValid(ShmemVariableCache->nextXid));

800 801 802
	LWLockRelease(ProcArrayLock);

	elog(trace_recovery(DEBUG2), "running transaction data initialized");
803
	KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
804
	if (standbyState == STANDBY_SNAPSHOT_READY)
805
		elog(trace_recovery(DEBUG2), "recovery snapshots are now enabled");
806 807 808
	else
		ereport(LOG,
				(errmsg("consistent state delayed because recovery snapshot incomplete")));
809 810
}

811 812 813 814
/*
 * ProcArrayApplyXidAssignment
 *		Process an XLOG_XACT_ASSIGNMENT WAL record
 */
815 816 817 818 819
void
ProcArrayApplyXidAssignment(TransactionId topxid,
							int nsubxids, TransactionId *subxids)
{
	TransactionId max_xid;
B
Bruce Momjian 已提交
820
	int			i;
821

822
	Assert(standbyState >= STANDBY_INITIALIZED);
823 824 825 826 827 828 829 830 831 832 833 834 835 836

	max_xid = TransactionIdLatest(topxid, nsubxids, subxids);

	/*
	 * Mark all the subtransactions as observed.
	 *
	 * NOTE: This will fail if the subxid contains too many previously
	 * unobserved xids to fit into known-assigned-xids. That shouldn't happen
	 * as the code stands, because xid-assignment records should never contain
	 * more than PGPROC_MAX_CACHED_SUBXIDS entries.
	 */
	RecordKnownAssignedTransactionIds(max_xid);

	/*
B
Bruce Momjian 已提交
837 838 839 840 841 842 843 844 845
	 * Notice that we update pg_subtrans with the top-level xid, rather than
	 * the parent xid. This is a difference between normal processing and
	 * recovery, yet is still correct in all cases. The reason is that
	 * subtransaction commit is not marked in clog until commit processing, so
	 * all aborted subtransactions have already been clearly marked in clog.
	 * As a result we are able to refer directly to the top-level
	 * transaction's state rather than skipping through all the intermediate
	 * states in the subtransaction tree. This should be the first time we
	 * have attempted to SubTransSetParent().
846 847 848 849 850 851 852 853 854 855
	 */
	for (i = 0; i < nsubxids; i++)
		SubTransSetParent(subxids[i], topxid, false);

	/*
	 * Uses same locking as transaction commit
	 */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

	/*
856
	 * Remove subxids from known-assigned-xacts.
857
	 */
858
	KnownAssignedXidsRemoveTree(InvalidTransactionId, nsubxids, subxids);
859 860

	/*
861
	 * Advance lastOverflowedXid to be at least the last of these subxids.
862 863 864 865 866 867
	 */
	if (TransactionIdPrecedes(procArray->lastOverflowedXid, max_xid))
		procArray->lastOverflowedXid = max_xid;

	LWLockRelease(ProcArrayLock);
}
868

869 870 871
/*
 * TransactionIdIsInProgress -- is given transaction running in some backend
 *
872
 * Aside from some shortcuts such as checking RecentXmin and our own Xid,
873
 * there are four possibilities for finding a running transaction:
874
 *
875
 * 1. The given Xid is a main transaction Id.  We will find this out cheaply
876 877
 * by looking at the PGPROC struct for each backend.
 *
878
 * 2. The given Xid is one of the cached subxact Xids in the PGPROC array.
879 880
 * We can find this out cheaply too.
 *
881 882 883 884 885 886 887 888
 * 3. In Hot Standby mode, we must search the KnownAssignedXids list to see
 * if the Xid is running on the master.
 *
 * 4. Search the SubTrans tree to find the Xid's topmost parent, and then
 * see if that is running according to PGPROC or KnownAssignedXids.  This is
 * the slowest way, but sadly it has to be done always if the others failed,
 * unless we see that the cached subxact sets are complete (none have
 * overflowed).
889
 *
890 891 892 893
 * ProcArrayLock has to be held while we do 1, 2, 3.  If we save the top Xids
 * while doing 1 and 3, we can release the ProcArrayLock while we do 4.
 * This buys back some concurrency (and we can't retrieve the main Xids from
 * PGPROC again anyway; see GetNewTransactionId).
894 895 896 897
 */
bool
TransactionIdIsInProgress(TransactionId xid)
{
898 899
	static TransactionId *xids = NULL;
	int			nxids = 0;
900
	ProcArrayStruct *arrayP = procArray;
901
	TransactionId topxid;
902 903 904 905
	int			i,
				j;

	/*
B
Bruce Momjian 已提交
906
	 * Don't bother checking a transaction older than RecentXmin; it could not
907 908 909
	 * possibly still be running.  (Note: in particular, this guarantees that
	 * we reject InvalidTransactionId, FrozenTransactionId, etc as not
	 * running.)
910 911 912 913 914 915 916
	 */
	if (TransactionIdPrecedes(xid, RecentXmin))
	{
		xc_by_recent_xmin_inc();
		return false;
	}

917 918 919 920 921 922 923 924 925 926 927
	/*
	 * We may have just checked the status of this transaction, so if it is
	 * already known to be completed, we can fall out without any access to
	 * shared memory.
	 */
	if (TransactionIdIsKnownCompleted(xid))
	{
		xc_by_known_xact_inc();
		return false;
	}

928 929 930 931 932 933 934 935 936 937 938
	/*
	 * Also, we can handle our own transaction (and subtransactions) without
	 * any access to shared memory.
	 */
	if (TransactionIdIsCurrentTransactionId(xid))
	{
		xc_by_my_xact_inc();
		return true;
	}

	/*
B
Bruce Momjian 已提交
939 940
	 * If not first time through, get workspace to remember main XIDs in. We
	 * malloc it permanently to avoid repeated palloc/pfree overhead.
941 942 943
	 */
	if (xids == NULL)
	{
944
		/*
B
Bruce Momjian 已提交
945 946 947
		 * In hot standby mode, reserve enough space to hold all xids in the
		 * known-assigned list. If we later finish recovery, we no longer need
		 * the bigger array, but we don't bother to shrink it.
948
		 */
949
		int			maxxids = RecoveryInProgress() ? TOTAL_MAX_CACHED_SUBXIDS : arrayP->maxProcs;
950 951

		xids = (TransactionId *) malloc(maxxids * sizeof(TransactionId));
952 953 954 955 956
		if (xids == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
	}
957 958 959

	LWLockAcquire(ProcArrayLock, LW_SHARED);

960 961 962 963 964 965 966 967 968 969 970 971
	/*
	 * Now that we have the lock, we can check latestCompletedXid; if the
	 * target Xid is after that, it's surely still running.
	 */
	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, xid))
	{
		LWLockRelease(ProcArrayLock);
		xc_by_latest_xid_inc();
		return true;
	}

	/* No shortcuts, gotta grovel through the array */
972 973
	for (i = 0; i < arrayP->numProcs; i++)
	{
974
		volatile PGPROC *proc = arrayP->procs[i];
975 976 977 978 979
		TransactionId pxid;

		/* Ignore my own proc --- dealt with it above */
		if (proc == MyProc)
			continue;
980 981

		/* Fetch xid just once - see GetNewTransactionId */
982
		pxid = proc->xid;
983 984 985 986 987 988 989 990 991

		if (!TransactionIdIsValid(pxid))
			continue;

		/*
		 * Step 1: check the main Xid
		 */
		if (TransactionIdEquals(pxid, xid))
		{
992
			LWLockRelease(ProcArrayLock);
993
			xc_by_main_xid_inc();
994
			return true;
995 996 997
		}

		/*
B
Bruce Momjian 已提交
998 999
		 * We can ignore main Xids that are younger than the target Xid, since
		 * the target could not possibly be their child.
1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013
		 */
		if (TransactionIdPrecedes(xid, pxid))
			continue;

		/*
		 * Step 2: check the cached child-Xids arrays
		 */
		for (j = proc->subxids.nxids - 1; j >= 0; j--)
		{
			/* Fetch xid just once - see GetNewTransactionId */
			TransactionId cxid = proc->subxids.xids[j];

			if (TransactionIdEquals(cxid, xid))
			{
1014
				LWLockRelease(ProcArrayLock);
1015
				xc_by_child_xid_inc();
1016
				return true;
1017 1018 1019 1020
			}
		}

		/*
1021
		 * Save the main Xid for step 4.  We only need to remember main Xids
B
Bruce Momjian 已提交
1022 1023 1024 1025
		 * that have uncached children.  (Note: there is no race condition
		 * here because the overflowed flag cannot be cleared, only set, while
		 * we hold ProcArrayLock.  So we can't miss an Xid that we need to
		 * worry about.)
1026 1027 1028 1029 1030
		 */
		if (proc->subxids.overflowed)
			xids[nxids++] = pxid;
	}

1031 1032 1033 1034
	/*
	 * Step 3: in hot standby mode, check the known-assigned-xids list.  XIDs
	 * in the list must be treated as running.
	 */
1035 1036 1037 1038 1039
	if (RecoveryInProgress())
	{
		/* none of the PGPROC entries should have XIDs in hot standby mode */
		Assert(nxids == 0);

1040
		if (KnownAssignedXidExists(xid))
1041 1042
		{
			LWLockRelease(ProcArrayLock);
1043
			xc_by_known_assigned_inc();
1044 1045 1046 1047
			return true;
		}

		/*
B
Bruce Momjian 已提交
1048
		 * If the KnownAssignedXids overflowed, we have to check pg_subtrans
B
Bruce Momjian 已提交
1049 1050 1051 1052
		 * too.  Fetch all xids from KnownAssignedXids that are lower than
		 * xid, since if xid is a subtransaction its parent will always have a
		 * lower value.  Note we will collect both main and subXIDs here, but
		 * there's no help for it.
1053 1054 1055 1056 1057
		 */
		if (TransactionIdPrecedesOrEquals(xid, procArray->lastOverflowedXid))
			nxids = KnownAssignedXidsGet(xids, xid);
	}

1058 1059 1060 1061
	LWLockRelease(ProcArrayLock);

	/*
	 * If none of the relevant caches overflowed, we know the Xid is not
1062
	 * running without even looking at pg_subtrans.
1063 1064
	 */
	if (nxids == 0)
1065 1066
	{
		xc_no_overflow_inc();
1067
		return false;
1068
	}
1069 1070

	/*
1071
	 * Step 4: have to check pg_subtrans.
1072
	 *
1073 1074 1075 1076
	 * At this point, we know it's either a subtransaction of one of the Xids
	 * in xids[], or it's not running.  If it's an already-failed
	 * subtransaction, we want to say "not running" even though its parent may
	 * still be running.  So first, check pg_clog to see if it's been aborted.
1077 1078 1079 1080
	 */
	xc_slow_answer_inc();

	if (TransactionIdDidAbort(xid))
1081
		return false;
1082 1083

	/*
B
Bruce Momjian 已提交
1084
	 * It isn't aborted, so check whether the transaction tree it belongs to
1085 1086
	 * is still running (or, more precisely, whether it was running when we
	 * held ProcArrayLock).
1087 1088 1089 1090 1091 1092 1093 1094
	 */
	topxid = SubTransGetTopmostTransaction(xid);
	Assert(TransactionIdIsValid(topxid));
	if (!TransactionIdEquals(topxid, xid))
	{
		for (i = 0; i < nxids; i++)
		{
			if (TransactionIdEquals(xids[i], topxid))
1095
				return true;
1096 1097 1098
		}
	}

1099
	return false;
1100 1101
}

1102 1103 1104 1105
/*
 * TransactionIdIsActive -- is xid the top-level XID of an active backend?
 *
 * This differs from TransactionIdIsInProgress in that it ignores prepared
1106 1107
 * transactions, as well as transactions running on the master if we're in
 * hot standby.  Also, we ignore subtransactions since that's not needed
1108 1109 1110 1111 1112 1113 1114 1115 1116 1117
 * for current uses.
 */
bool
TransactionIdIsActive(TransactionId xid)
{
	bool		result = false;
	ProcArrayStruct *arrayP = procArray;
	int			i;

	/*
B
Bruce Momjian 已提交
1118 1119
	 * Don't bother checking a transaction older than RecentXmin; it could not
	 * possibly still be running.
1120 1121 1122 1123 1124 1125 1126 1127
	 */
	if (TransactionIdPrecedes(xid, RecentXmin))
		return false;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (i = 0; i < arrayP->numProcs; i++)
	{
1128
		volatile PGPROC *proc = arrayP->procs[i];
1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150

		/* Fetch xid just once - see GetNewTransactionId */
		TransactionId pxid = proc->xid;

		if (!TransactionIdIsValid(pxid))
			continue;

		if (proc->pid == 0)
			continue;			/* ignore prepared transactions */

		if (TransactionIdEquals(pxid, xid))
		{
			result = true;
			break;
		}
	}

	LWLockRelease(ProcArrayLock);

	return result;
}

1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176
/*
 * Returns true if there are any UAO drop transaction active (except the current
 * one).
 *
 * If allDbs is TRUE then all backends are considered; if allDbs is FALSE
 * then only backends running in my own database are considered.
 */
bool
HasDropTransaction(bool allDbs)
{
	ProcArrayStruct *arrayP = procArray;
	bool result = false; /* Assumes */
	int			index;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
		volatile PGPROC *proc = arrayP->procs[index];
		if (proc->pid == 0)
			continue;			/* do not count prepared xacts */

		if (allDbs || proc->databaseId == MyDatabaseId)
		{
			if (proc->inDropTransaction && proc != MyProc)
			{
1177 1178 1179
				ereport((Debug_print_snapshot_dtm ? LOG : DEBUG3),
						(errmsg("Found drop transaction: database %d, pid %d, xid %d, xmin %d",
								proc->databaseId, proc->pid, proc->xid, proc->xmin)));
1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215
				result = true;
			}
		}
	}

	LWLockRelease(ProcArrayLock);

	return result;
}

/*
 * Returns true if there are of serializable backends (except the current
 * one).
 *
 * If allDbs is TRUE then all backends are considered; if allDbs is FALSE
 * then only backends running in my own database are considered.
 */
bool
HasSerializableBackends(bool allDbs)
{
	ProcArrayStruct *arrayP = procArray;
	bool result = false; /* Assumes */
	int			index;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
		volatile PGPROC *proc = arrayP->procs[index];
		if (proc->pid == 0)
			continue;			/* do not count prepared xacts */

		if (allDbs || proc->databaseId == MyDatabaseId)
		{
			if (proc->serializableIsoLevel && proc != MyProc)
			{
1216 1217 1218
				ereport((Debug_print_snapshot_dtm ? LOG : DEBUG3),
						(errmsg("Found serializable transaction: database %d, pid %d, xid %d, xmin %d",
								proc->databaseId, proc->pid, proc->xid, proc->xmin)));
1219 1220 1221 1222 1223 1224 1225 1226 1227
				result = true;
			}
		}
	}

	LWLockRelease(ProcArrayLock);

	return result;
}
1228

1229 1230 1231 1232 1233 1234 1235 1236 1237 1238
/*
 * GetOldestXmin -- returns oldest transaction that was running
 *					when any current transaction was started.
 *
 * If allDbs is TRUE then all backends are considered; if allDbs is FALSE
 * then only backends running in my own database are considered.
 *
 * This is used by VACUUM to decide which deleted tuples must be preserved
 * in a table.	allDbs = TRUE is needed for shared relations, but allDbs =
 * FALSE is sufficient for non-shared relations, since only backends in my
B
Bruce Momjian 已提交
1239
 * own database could ever see the tuples in them.	Also, we can ignore
1240 1241
 * concurrently running lazy VACUUMs because (a) they must be working on other
 * tables, and (b) they don't need to do snapshot-based lookups.
1242 1243
 *
 * This is also used to determine where to truncate pg_subtrans.  allDbs
1244
 * must be TRUE for that case, and ignoreVacuum FALSE.
1245
 *
1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257
 * Note: it's possible for the calculated value to move backwards on repeated
 * calls. The calculated value is conservative, so that anything older is
 * definitely not considered as running by anyone anymore, but the exact
 * value calculated depends on a number of things. For example, if allDbs is
 * TRUE and there are no transactions running in the current database,
 * GetOldestXmin() returns latestCompletedXid. If a transaction begins after
 * that, its xmin will include in-progress transactions in other databases
 * that started earlier, so another call will return an lower value. The
 * return value is also adjusted with vacuum_defer_cleanup_age, so increasing
 * that setting on the fly is an easy way to have GetOldestXmin() move
 * backwards.
 *
1258
 * Note: we include all currently running xids in the set of considered xids.
1259 1260
 * This ensures that if a just-started xact has not yet set its snapshot,
 * when it does set the snapshot it cannot set xmin less than what we compute.
1261
 * See notes in src/backend/access/transam/README.
1262 1263 1264 1265 1266 1267 1268
 *
 * GPDB: This also needs to deal with distributed snapshots. We keep track of
 * the oldest local XID that is still visible to any distributed snapshot,
 * in the DistributedLog subsystem. DistributedLog doesn't distinguish between
 * different databases, nor vacuums, however. So in GPDB, the 'allDbs' and
 * 'ignoreVacuum' arguments don't do much, because the value from the
 * distributed log will include everything.
1269 1270
 */
TransactionId
1271
GetOldestXmin(bool allDbs, bool ignoreVacuum)
1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300
{
	TransactionId result;

	result = GetLocalOldestXmin(allDbs, ignoreVacuum);

	/*
	 * In QD node, all distributed transactions have an entry in the proc array,
	 * so we're done.
	 */
	if (Gp_role != GP_ROLE_DISPATCH)
	{
		TransactionId distribOldestXmin;

		distribOldestXmin = DistributedLog_GetOldestXmin(result);

		if (TransactionIdIsValid(distribOldestXmin) &&
			TransactionIdPrecedes(distribOldestXmin, result))
			result = distribOldestXmin;
	}

	return result;
}

/*
 * This is the upstream version of GetOldestXmin(). It doesn't take
 * distributed transactions into account.
 */
TransactionId
GetLocalOldestXmin(bool allDbs, bool ignoreVacuum)
1301 1302 1303 1304 1305
{
	ProcArrayStruct *arrayP = procArray;
	TransactionId result;
	int			index;

1306 1307 1308
	/* Cannot look for individual databases during recovery */
	Assert(allDbs || !RecoveryInProgress());

1309 1310
	LWLockAcquire(ProcArrayLock, LW_SHARED);

1311
	/*
B
Bruce Momjian 已提交
1312 1313 1314 1315
	 * We initialize the MIN() calculation with latestCompletedXid + 1. This
	 * is a lower bound for the XIDs that might appear in the ProcArray later,
	 * and so protects us against overestimating the result due to future
	 * additions.
1316
	 */
1317 1318 1319
	result = ShmemVariableCache->latestCompletedXid;
	Assert(TransactionIdIsNormal(result));
	TransactionIdAdvance(result);
1320 1321 1322

	for (index = 0; index < arrayP->numProcs; index++)
	{
1323
		volatile PGPROC *proc = arrayP->procs[index];
1324

1325 1326 1327 1328 1329
		if (allDbs || proc->databaseId == MyDatabaseId)
		{
			/* Fetch xid just once - see GetNewTransactionId */
			TransactionId xid = proc->xid;

1330 1331 1332 1333 1334 1335 1336 1337 1338
			/* First consider the transaction's own Xid, if any */
			if (TransactionIdIsNormal(xid) &&
				TransactionIdPrecedes(xid, result))
				result = xid;

			/*
			 * Also consider the transaction's Xmin, if set.
			 *
			 * We must check both Xid and Xmin because a transaction might
B
Bruce Momjian 已提交
1339 1340
			 * have an Xmin but not (yet) an Xid; conversely, if it has an
			 * Xid, that could determine some not-yet-set Xmin.
1341 1342 1343 1344 1345
			 */
			xid = proc->xmin;	/* Fetch just once */
			if (TransactionIdIsNormal(xid) &&
				TransactionIdPrecedes(xid, result))
				result = xid;
1346 1347 1348 1349 1350
		}
	}

	LWLockRelease(ProcArrayLock);

1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366
	/*
	 * Compute the cutoff XID, being careful not to generate a "permanent"
	 * XID.
	 *
	 * vacuum_defer_cleanup_age provides some additional "slop" for the
	 * benefit of hot standby queries on slave servers.  This is quick and
	 * dirty, and perhaps not all that useful unless the master has a
	 * predictable transaction rate, but it's what we've got.  Note that we
	 * are assuming vacuum_defer_cleanup_age isn't large enough to cause
	 * wraparound --- so guc.c should limit it to no more than the
	 * xidStopLimit threshold in varsup.c.
	 */
	result -= vacuum_defer_cleanup_age;
	if (!TransactionIdIsNormal(result))
		result = FirstNormalTransactionId;

1367 1368 1369
	return result;
}

1370 1371 1372 1373 1374 1375 1376 1377 1378
void
updateSharedLocalSnapshot(DtxContextInfo *dtxContextInfo, Snapshot snapshot, char *debugCaller)
{
	int combocidSize;

	Assert(SharedLocalSnapshotSlot != NULL);

	Assert(snapshot != NULL);

1379 1380 1381 1382 1383 1384 1385
	ereport((Debug_print_full_dtm ? LOG : DEBUG5),
			(errmsg("updateSharedLocalSnapshot for DistributedTransactionContext = '%s' passed local snapshot (xmin: %u xmax: %u xcnt: %u) curcid: %d",
					DtxContextToString(DistributedTransactionContext),
					snapshot->xmin,
					snapshot->xmax,
					snapshot->xcnt,
					snapshot->curcid)));
1386 1387 1388

	LWLockAcquire(SharedLocalSnapshotSlot->slotLock, LW_EXCLUSIVE);

1389 1390 1391 1392 1393 1394 1395 1396
	SharedLocalSnapshotSlot->snapshot.xmin = snapshot->xmin;
	SharedLocalSnapshotSlot->snapshot.xmax = snapshot->xmax;
	SharedLocalSnapshotSlot->snapshot.xcnt = snapshot->xcnt;

	if (snapshot->xcnt > 0)
	{
		Assert(snapshot->xip != NULL);

1397 1398 1399
		ereport((Debug_print_full_dtm ? LOG : DEBUG5),
				(errmsg("updateSharedLocalSnapshot count of in-doubt ids %u",
						SharedLocalSnapshotSlot->snapshot.xcnt)));
1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412

		memcpy(SharedLocalSnapshotSlot->snapshot.xip, snapshot->xip, snapshot->xcnt * sizeof(TransactionId));
	}
	
	/* combocid stuff */
	combocidSize = ((usedComboCids < MaxComboCids) ? usedComboCids : MaxComboCids );

	SharedLocalSnapshotSlot->combocidcnt = combocidSize;	
	memcpy((void *)SharedLocalSnapshotSlot->combocids, comboCids,
		   combocidSize * sizeof(ComboCidKeyData));

	SharedLocalSnapshotSlot->snapshot.curcid = snapshot->curcid;

1413 1414 1415
	ereport((Debug_print_full_dtm ? LOG : DEBUG5),
			(errmsg("updateSharedLocalSnapshot: combocidsize is now %d max %d segmateSync %d->%d",
					combocidSize, MaxComboCids, SharedLocalSnapshotSlot->segmateSync, dtxContextInfo->segmateSync)));
1416

1417
	SetSharedTransactionId_writer();
1418 1419 1420 1421 1422 1423 1424 1425
	
	SharedLocalSnapshotSlot->QDcid = dtxContextInfo->curcid;
	SharedLocalSnapshotSlot->QDxid = dtxContextInfo->distributedXid;
		
	SharedLocalSnapshotSlot->ready = true;

	SharedLocalSnapshotSlot->segmateSync = dtxContextInfo->segmateSync;

1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445
	ereport((Debug_print_full_dtm ? LOG : DEBUG5),
			(errmsg("updateSharedLocalSnapshot for DistributedTransactionContext = '%s' setting shared local snapshot xid = %u (xmin: %u xmax: %u xcnt: %u) curcid: %d, QDxid = %u, QDcid = %u",
					DtxContextToString(DistributedTransactionContext),
					SharedLocalSnapshotSlot->xid,
					SharedLocalSnapshotSlot->snapshot.xmin,
					SharedLocalSnapshotSlot->snapshot.xmax,
					SharedLocalSnapshotSlot->snapshot.xcnt,
					SharedLocalSnapshotSlot->snapshot.curcid,
					SharedLocalSnapshotSlot->QDxid,
					SharedLocalSnapshotSlot->QDcid)));

	ereport((Debug_print_snapshot_dtm ? LOG : DEBUG5),
			(errmsg("[Distributed Snapshot #%u] *Writer Set Shared* gxid %u, currcid %d (gxid = %u, slot #%d, '%s', '%s')",
					QEDtxContextInfo.distributedSnapshot.distribSnapshotId,
					SharedLocalSnapshotSlot->QDxid,
					SharedLocalSnapshotSlot->QDcid,
					getDistributedTransactionId(),
					SharedLocalSnapshotSlot->slotid,
					debugCaller,
					DtxContextToString(DistributedTransactionContext))));
1446
	LWLockRelease(SharedLocalSnapshotSlot->slotLock);
1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466
}

static int
GetDistributedSnapshotMaxCount(void)
{
	switch (DistributedTransactionContext)
	{
	case DTX_CONTEXT_LOCAL_ONLY:
	case DTX_CONTEXT_QD_RETRY_PHASE_2:
	case DTX_CONTEXT_QE_FINISH_PREPARED:
		return 0;

	case DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE:
		return max_prepared_xacts;

	case DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER:
	case DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER:
	case DTX_CONTEXT_QE_AUTO_COMMIT_IMPLICIT:
	case DTX_CONTEXT_QE_ENTRY_DB_SINGLETON:
	case DTX_CONTEXT_QE_READER:
1467 1468
		if (QEDtxContextInfo.distributedSnapshot.distribSnapshotId != 0)
			return QEDtxContextInfo.distributedSnapshot.maxCount;
1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485
		else
			return max_prepared_xacts;		/* UNDONE: For now? */
	
	case DTX_CONTEXT_QE_PREPARED:
		elog(FATAL, "Unexpected segment distribute transaction context: '%s'",
			 DtxContextToString(DistributedTransactionContext));
		break;
	
	default:
		elog(FATAL, "Unrecognized DTX transaction context: %d",
			(int) DistributedTransactionContext);
		break;
	}

	return 0;
}

1486 1487 1488 1489
/*
 * Fill in the array of in-progress distributed XIDS in 'snapshot' from the
 * information that the QE sent us (if any).
 */
1490 1491 1492
static void
FillInDistributedSnapshot(Snapshot snapshot)
{
1493 1494 1495 1496
	ereport((Debug_print_full_dtm ? LOG : DEBUG5),
			(errmsg("FillInDistributedSnapshot DTX Context = '%s'",
					DtxContextToString(DistributedTransactionContext))));

1497 1498 1499 1500 1501 1502 1503 1504 1505
	switch (DistributedTransactionContext)
	{
	case DTX_CONTEXT_LOCAL_ONLY:
	case DTX_CONTEXT_QD_RETRY_PHASE_2:
	case DTX_CONTEXT_QE_FINISH_PREPARED:
		/*
		 * No distributed snapshot.
		 */
		snapshot->haveDistribSnapshot = false;
1506
		snapshot->distribSnapshotWithLocalMapping.ds.count = 0;
1507 1508 1509 1510
		break;

	case DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE:
		/*
1511 1512
		 * GetSnapshotData() should've acquired the distributed snapshot
		 * while holding ProcArrayLock, not here.
1513
		 */
1514 1515
		elog(ERROR, "FillInDistributedSnapshot called in context '%s'",
			 DtxContextToString(DistributedTransactionContext));
1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528
		break;

	case DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER:
	case DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER:
	case DTX_CONTEXT_QE_AUTO_COMMIT_IMPLICIT:
	case DTX_CONTEXT_QE_ENTRY_DB_SINGLETON:
	case DTX_CONTEXT_QE_READER:
		/*
		 * Copy distributed snapshot from the one sent by the QD.
		 */
		{
			DistributedSnapshot *ds = &QEDtxContextInfo.distributedSnapshot;

1529
			if (ds->distribSnapshotId != 0)
1530 1531 1532
			{
				snapshot->haveDistribSnapshot = true;

1533 1534
				Assert(ds->xminAllDistributedSnapshots);
				Assert(ds->xminAllDistributedSnapshots <= ds->xmin);
1535

1536
				DistributedSnapshot_Copy(&snapshot->distribSnapshotWithLocalMapping.ds, ds);
1537 1538 1539 1540
			}
			else
			{
				snapshot->haveDistribSnapshot = false;
1541
				snapshot->distribSnapshotWithLocalMapping.ds.count = 0;
1542 1543 1544
			}
		}
		break;
1545

1546 1547 1548 1549
	case DTX_CONTEXT_QE_PREPARED:
		elog(FATAL, "Unexpected segment distribute transaction context: '%s'",
			 DtxContextToString(DistributedTransactionContext));
		break;
1550

1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574
	default:
		elog(FATAL, "Unrecognized DTX transaction context: %d",
			(int) DistributedTransactionContext);
		break;
	}

	/*
	 * Nice that we may have collected it, but turn it off...
	 */
	if (Debug_disable_distributed_snapshot)
		snapshot->haveDistribSnapshot = false;
}

/*
 * QEDtxContextInfo and SharedLocalSnapshotSlot are both global.
 */
static bool
QEwriterSnapshotUpToDate(void)
{
	Assert(!Gp_is_writer);

	if (SharedLocalSnapshotSlot == NULL)
		elog(ERROR, "SharedLocalSnapshotSlot is NULL");

1575 1576
	LWLockAcquire(SharedLocalSnapshotSlot->slotLock, LW_SHARED);
	bool result = QEDtxContextInfo.distributedXid == SharedLocalSnapshotSlot->QDxid &&
1577 1578
		QEDtxContextInfo.curcid == SharedLocalSnapshotSlot->QDcid &&
		QEDtxContextInfo.segmateSync == SharedLocalSnapshotSlot->segmateSync &&
1579 1580
		SharedLocalSnapshotSlot->ready;
	LWLockRelease(SharedLocalSnapshotSlot->slotLock);
1581

1582
	return result;
1583 1584
}

G
Gang Xiong 已提交
1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785
void
getAllDistributedXactStatus(TMGALLXACTSTATUS **allDistributedXactStatus)
{
	TMGALLXACTSTATUS *all;
	int			count;
	ProcArrayStruct *arrayP = procArray;

	all = palloc(sizeof(TMGALLXACTSTATUS));
	all->next = 0;
	all->count = 0;
	all->statusArray = NULL;

	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
	count = arrayP->numProcs;
	if (count > 0)
	{
		int			i;

		all->statusArray =
			palloc(MAXALIGN(count * sizeof(TMGXACTSTATUS)));
		for (i = 0; i < count; i++)
		{
			PGPROC *proc = arrayP->procs[i];
			TMGXACT *gxact = &proc->gxact;

			all->statusArray[i].gxid = gxact->gxid;
			if (strlen(gxact->gid) >= TMGIDSIZE)
				elog(PANIC, "Distribute transaction identifier too long (%d)",
						(int) strlen(gxact->gid));
			memcpy(all->statusArray[i].gid, gxact->gid, TMGIDSIZE);
			all->statusArray[i].state = gxact->state;
			all->statusArray[i].sessionId = gxact->sessionId;
			all->statusArray[i].xminDistributedSnapshot = gxact->xminDistributedSnapshot;
		}

		all->count = count;
	}

	LWLockRelease(ProcArrayLock);

	*allDistributedXactStatus = all;
}

/*
 * Get check point information
 *
 * Whether DTM started or not, we must always store DTM information in
 * this checkpoint record.  A possible case to consider is we might have
 * in-progress global transactions in shared memory after postmaster reset,
 * and shutting down without performing DTM recovery.  The subsequent
 * recovery after this shutdown will read this checkpoint, so we would
 * lose the in-progress global transaction information if we didn't write it
 * here.  Note we will certainly read this global transaction information
 * even if this is a clean shutdown (i.e. not performing multi-pass recovery.)
 */
void
getDtxCheckPointInfo(char **result, int *result_size)
{
	TMGXACT_CHECKPOINT *gxact_checkpoint;
	TMGXACT_LOG *gxact_log_array;
	int			i;
	int			actual;
	ProcArrayStruct *arrayP = procArray;

	if (GpIdentity.segindex != MASTER_CONTENT_ID)
	{
		gxact_checkpoint = palloc(TMGXACT_CHECKPOINT_BYTES(0));
		gxact_checkpoint->committedCount = 0;
		*result = (char*) gxact_checkpoint;
		*result_size = TMGXACT_CHECKPOINT_BYTES(0);
		return;
	}

	gxact_checkpoint = palloc(TMGXACT_CHECKPOINT_BYTES(arrayP->numProcs + *shmNumCommittedGxacts));
	gxact_log_array = &gxact_checkpoint->committedGxactArray[0];

	actual = 0;
	for (i = 0; i < *shmNumCommittedGxacts; i++)
		gxact_log_array[actual++] = shmCommittedGxactArray[i++];

	SIMPLE_FAULT_INJECTOR(CheckPointDtxInfo);

	/*
	 * If a transaction inserted 'commit' record logically before the checkpoint
	 * REDO pointer, and it hasn't inserted the 'forget' record. we will see its
	 * 'TMGXACT->state' is between 'DTX_STATE_INSERTED_COMMITTED' and
	 * 'DTX_STATE_INSERTING_FORGET_COMMITTED'. such transactions should be included
	 * in the checkpoint record so that the second phase of 2PC can be executed
	 * during crash recovery.
	 *
	 * NOTE: the REDO pointer is obtained much earlier in CreateCheckpoint().
	 * It is possible to include transactions having their commit records
	 * *after* the REDO pointer in checkpoint record.  Second phase of 2PC for
	 * such transactions will be executed twice during crash recovery.
	 * Although redundant, this is not a problem.
	 */
	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (i = 0; i < arrayP->numProcs; i++)
	{
		TMGXACT_LOG *gxact_log;
		PGPROC  *proc = arrayP->procs[i];
		TMGXACT *gxact = &proc->gxact;

		if (!includeInCheckpointIsNeeded(gxact))
			continue;

		gxact_log = &gxact_log_array[actual];
		if (strlen(gxact->gid) >= TMGIDSIZE)
			elog(PANIC, "Distribute transaction identifier too long (%d)",
				 (int) strlen(gxact->gid));
		memcpy(gxact_log->gid, gxact->gid, TMGIDSIZE);
		gxact_log->gxid = gxact->gxid;

		elog((Debug_print_full_dtm ? LOG : DEBUG5),
			 "Add DTM checkpoint entry gid = %s.", gxact->gid);

		actual++;
	}

	LWLockRelease(ProcArrayLock);

	gxact_checkpoint->committedCount = actual;

	*result = (char *) gxact_checkpoint;
	*result_size = TMGXACT_CHECKPOINT_BYTES(actual);

	elog((Debug_print_full_dtm ? LOG : DEBUG5),
		 "Filled in DTM checkpoint information (count = %d).", actual);
}

/*
 * DistributedSnapshotMappedEntry_Compare: A compare function for
 * DistributedTransactionId for use with qsort.
 */
static int
DistributedSnapshotMappedEntry_Compare(const void *p1, const void *p2)
{
	const DistributedTransactionId distribXid1 = *(DistributedTransactionId *) p1;
	const DistributedTransactionId distribXid2 = *(DistributedTransactionId *) p2;

	if (distribXid1 == distribXid2)
		return 0;
	else if (distribXid1 > distribXid2)
		return 1;
	else
		return -1;
}

/*
 * create distributed snapshot based on current visible distributed transaction
 */
static bool
CreateDistributedSnapshot(DistributedSnapshotWithLocalMapping *distribSnapshotWithLocalMapping)
{
	int			i;
	int			count;
	DistributedTransactionId xmin;
	DistributedTransactionId xmax;
	DistributedSnapshotId distribSnapshotId;
	DistributedTransactionId globalXminDistributedSnapshots;
	DistributedSnapshot *ds;
	ProcArrayStruct *arrayP = procArray;

	Assert(LWLockHeldByMe(ProcArrayLock));
	if (*shmNumCommittedGxacts != 0)
		elog(ERROR, "Create distributed snapshot before DTM recovery finish");

	xmin = LastDistributedTransactionId;

	/*
	 * This is analogous to the code in GetSnapshotData() (which calls
	 * ReadNewTransactionId(), the distributed-xmax of a transaction is the
	 * last distributed-xmax available
	 */
	xmax = getMaxDistributedXid();

	/*
	 * initialize for calculation with xmax, the calculation for this is on
	 * same lines as globalxmin for local snapshot.
	 */
	globalXminDistributedSnapshots = xmax;
	count = 0;
	ds = &distribSnapshotWithLocalMapping->ds;

	/*
	 * Gather up current in-progress global transactions for the distributed
	 * snapshot.
	 */
	for (i = 0; i < arrayP->numProcs; i++)
	{
		PGPROC	*proc = arrayP->procs[i];
		TMGXACT	*gxact_candidate = &proc->gxact;
		volatile DistributedTransactionId gxid;
		DistributedTransactionId dxid;

		/* just fetch once */
		gxid = gxact_candidate->gxid;
		if (gxid == InvalidDistributedTransactionId)
			continue;

1786 1787 1788 1789 1790
		/*
		 * NB: We must include transactions in DTX_STATE_ACTIVE_NOT_DISTRIBUTED
		 * state. All transactions start in that state, even if they become
		 * distribute later on.
		 */
G
Gang Xiong 已提交
1791

1792
		Assert(gxact_candidate->state != DTX_STATE_NONE);
G
Gang Xiong 已提交
1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889

		/* Update globalXminDistributedSnapshots to be the smallest valid dxid */
		dxid = gxact_candidate->xminDistributedSnapshot;
		if ((dxid != InvalidDistributedTransactionId) &&
			dxid < globalXminDistributedSnapshots)
		{
			globalXminDistributedSnapshots = dxid;
		}

		/*
		 * Include the current distributed transaction in the min/max
		 * calculation.
		 */
		if (gxid < xmin)
		{
			xmin = gxid;
		}
		if (gxid > xmax)
		{
			xmax = gxid;
		}

		if (proc == MyProc)
			continue;

		if (count >= ds->maxCount)
			elog(ERROR, "Too many distributed transactions for snapshot");

		ds->inProgressXidArray[count++] = gxid;

		elog((Debug_print_full_dtm ? LOG : DEBUG5),
			 "CreateDistributedSnapshot added inProgressDistributedXid = %u to snapshot",
			 gxid);
	}

	distribSnapshotId = pg_atomic_add_fetch_u32((pg_atomic_uint32 *)shmNextSnapshotId, 1);

	/*
	 * Above globalXminDistributedSnapshots was calculated based on lowest
	 * dxid in all snapshots but update it to also include actual process
	 * dxids.
	 */
	if (xmin < globalXminDistributedSnapshots)
		globalXminDistributedSnapshots = xmin;

	/*
	 * Sort the entry {distribXid} to support the QEs doing culls on their
	 * DisribToLocalXact sorted lists.
	 */
	qsort(
		  ds->inProgressXidArray,
		  count,
		  sizeof(DistributedTransactionId),
		  DistributedSnapshotMappedEntry_Compare);

	/*
	 * Copy the information we just captured under lock and then sorted into
	 * the distributed snapshot.
	 */
	ds->distribTransactionTimeStamp = *shmDistribTimeStamp;
	ds->xminAllDistributedSnapshots = globalXminDistributedSnapshots;
	ds->distribSnapshotId = distribSnapshotId;
	ds->xmin = xmin;
	ds->xmax = xmax;
	ds->count = count;

	if (xmin < MyProc->gxact.xminDistributedSnapshot)
		MyProc->gxact.xminDistributedSnapshot = xmin;

	elog((Debug_print_full_dtm ? LOG : DEBUG5),
		 "CreateDistributedSnapshot distributed snapshot has xmin = %u, count = %u, xmax = %u.",
		 xmin, count, xmax);
	elog((Debug_print_snapshot_dtm ? LOG : DEBUG5),
		 "[Distributed Snapshot #%u] *Create* (gxid = %u, '%s')",
		 distribSnapshotId,
		 MyProc->gxact.gxid,
		 DtxContextToString(DistributedTransactionContext));

	/*
	 * At snapshot creation time, local xid cache is empty. Gets populated as
	 * reverse mapping takes place during visibility checks using this
	 * snapshot.
	 */
	distribSnapshotWithLocalMapping->currentLocalXidsCount = 0;
	distribSnapshotWithLocalMapping->minCachedLocalXid = InvalidTransactionId;
	distribSnapshotWithLocalMapping->maxCachedLocalXid = InvalidTransactionId;

	Assert(distribSnapshotWithLocalMapping->maxLocalXidsCount != 0);
	Assert(distribSnapshotWithLocalMapping->inProgressMappedLocalXids != NULL);

	memset(distribSnapshotWithLocalMapping->inProgressMappedLocalXids,
		   InvalidTransactionId,
		   sizeof(TransactionId) * distribSnapshotWithLocalMapping->maxLocalXidsCount);

	return true;
}

1890 1891 1892 1893
/*----------
 * GetSnapshotData -- returns information about running transactions.
 *
 * The returned snapshot includes xmin (lowest still-running xact ID),
1894
 * xmax (highest completed xact ID + 1), and a list of running xact IDs
1895 1896 1897 1898 1899 1900 1901 1902
 * in the range xmin <= xid < xmax.  It is used as follows:
 *		All xact IDs < xmin are considered finished.
 *		All xact IDs >= xmax are considered still running.
 *		For an xact ID xmin <= xid < xmax, consult list to see whether
 *		it is considered running or not.
 * This ensures that the set of transactions seen as "running" by the
 * current xact will not change after it takes the snapshot.
 *
1903 1904 1905 1906 1907
 * All running top-level XIDs are included in the snapshot, except for lazy
 * VACUUM processes.  We also try to include running subtransaction XIDs,
 * but since PGPROC has only a limited cache area for subxact XIDs, full
 * information may not be available.  If we find any overflowed subxid arrays,
 * we have to mark the snapshot's subxid data as overflowed, and extra work
1908
 * *may* need to be done to determine what's running (see XidInMVCCSnapshot()
1909
 * in tqual.c).
1910 1911 1912
 *
 * We also update the following backend-global variables:
 *		TransactionXmin: the oldest xmin of any snapshot in use in the
1913
 *			current transaction (this is the same as MyProc->xmin).
1914 1915 1916
 *		RecentXmin: the xmin computed for the most recent snapshot.  XIDs
 *			older than this are known not running any more.
 *		RecentGlobalXmin: the global xmin (oldest TransactionXmin across all
1917
 *			running transactions, except those running LAZY VACUUM).  This is
T
Tom Lane 已提交
1918
 *			the same computation done by GetOldestXmin(true, true).
1919 1920 1921
 *
 * Note: this function should probably not be called with an argument that's
 * not statically allocated (see xip allocation below).
1922 1923
 */
Snapshot
1924
GetSnapshotData(Snapshot snapshot)
1925 1926 1927 1928 1929 1930 1931
{
	ProcArrayStruct *arrayP = procArray;
	TransactionId xmin;
	TransactionId xmax;
	TransactionId globalxmin;
	int			index;
	int			count = 0;
1932
	int			subcount = 0;
1933
	bool		suboverflowed = false;
1934 1935 1936 1937

	Assert(snapshot != NULL);

	/*
B
Bruce Momjian 已提交
1938 1939
	 * Allocating space for maxProcs xids is usually overkill; numProcs would
	 * be sufficient.  But it seems better to do the malloc while not holding
1940 1941
	 * the lock, so we can't look at numProcs.  Likewise, we allocate much
	 * more subxip storage than is probably needed.
1942 1943
	 *
	 * This does open a possibility for avoiding repeated malloc/free: since
B
Bruce Momjian 已提交
1944
	 * maxProcs does not change at runtime, we can simply reuse the previous
1945
	 * xip arrays if any.  (This relies on the fact that all callers pass
B
Bruce Momjian 已提交
1946
	 * static SnapshotData structs.)
1947 1948 1949 1950
	 */
	if (snapshot->xip == NULL)
	{
		/*
B
Bruce Momjian 已提交
1951 1952
		 * First call for this snapshot. Snapshot is same size whether or not
		 * we are in recovery, see later comments.
1953
		 */
1954 1955
		snapshot->xip = (TransactionId *)
			malloc(arrayP->maxProcs * sizeof(TransactionId));
1956
		if (snapshot->xip == NULL)
1957 1958 1959
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
1960

1961
		Assert(snapshot->subxip == NULL);
1962 1963 1964 1965
	}

	if (snapshot->subxip == NULL)
	{
1966
		snapshot->subxip = (TransactionId *)
1967
			malloc(TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
1968
		if (snapshot->subxip == NULL)
1969 1970 1971
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
1972 1973 1974 1975 1976
	}

	/*
	 * GP: Distributed snapshot.
	 */
1977 1978 1979 1980
	ereport((Debug_print_full_dtm ? LOG : DEBUG5),
			(errmsg("GetSnapshotData maxCount %d, inProgressEntryArray %p",
					snapshot->distribSnapshotWithLocalMapping.ds.maxCount,
					snapshot->distribSnapshotWithLocalMapping.ds.inProgressXidArray)));
1981 1982

	if (snapshot->distribSnapshotWithLocalMapping.ds.inProgressXidArray == NULL)
1983
	{
1984
		int maxCount = GetDistributedSnapshotMaxCount();
1985 1986
		if (maxCount > 0)
		{
1987 1988 1989 1990 1991 1992 1993
			snapshot->distribSnapshotWithLocalMapping.ds.inProgressXidArray =
				(DistributedTransactionId*)malloc(maxCount * sizeof(DistributedTransactionId));
			if (snapshot->distribSnapshotWithLocalMapping.ds.inProgressXidArray == NULL)
			{
				ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory")));
			}
			snapshot->distribSnapshotWithLocalMapping.ds.maxCount = maxCount;
1994

1995 1996 1997 1998 1999 2000 2001
			/*
			 * Allocate memory for local xid cache, currently allocating it
			 * same size as distributed, not necessary.
			 */
			snapshot->distribSnapshotWithLocalMapping.inProgressMappedLocalXids =
				(TransactionId*)malloc(maxCount * sizeof(TransactionId));
			if (snapshot->distribSnapshotWithLocalMapping.inProgressMappedLocalXids == NULL)
2002 2003 2004
			{
				ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory")));
			}
2005
			snapshot->distribSnapshotWithLocalMapping.maxLocalXidsCount = maxCount;
2006 2007 2008 2009
		}
	}

	/*
2010
	 * MPP Addition. if we are in EXECUTE mode and not the writer... then we
2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051
	 * want to just get the shared snapshot and make it our own.
	 *
	 * code for the writer is at the bottom of this function.
	 *
	 * NOTE: we could be dispatched and get here before the WRITER can set the
	 * shared snapshot.  if this happens we'll have to wait around, hopefully
	 * its never for a very long time.
	 *
	 */
	if (DistributedTransactionContext == DTX_CONTEXT_QE_READER ||
		DistributedTransactionContext == DTX_CONTEXT_QE_ENTRY_DB_SINGLETON)
	{
		/* the pg_usleep() call below is in units of us (microseconds), interconnect
		 * timeout is in seconds.  Start with 1 millisecond. */
		uint64		segmate_timeout_us;
		uint64		sleep_per_check_us = 1 * 1000;
		uint64	   	total_sleep_time_us = 0;
		uint64		warning_sleep_time_us = 0;

		segmate_timeout_us = (3 * (uint64)Max(interconnect_setup_timeout, 1) * 1000* 1000) / 4;

		/*
		 * Make a copy of the distributed snapshot information; this
		 * doesn't use the shared-snapshot-slot stuff it is just
		 * making copies from the QEDtxContextInfo structure sent by
		 * the QD.
		 */
		FillInDistributedSnapshot(snapshot);

		/*
		 * If we're a cursor-reader, we get out snapshot from the
		 * writer via a tempfile in the filesystem. Otherwise it is
		 * too easy for the writer to race ahead of cursor readers.
		 */
		if (QEDtxContextInfo.cursorContext)
		{
			readSharedLocalSnapshot_forCursor(snapshot);

			return snapshot;
		}

2052 2053 2054 2055 2056 2057
		ereport((Debug_print_snapshot_dtm ? LOG : DEBUG5),
				(errmsg("[Distributed Snapshot #%u] *Start Reader Match* gxid = %u and currcid %d (%s)",
						QEDtxContextInfo.distributedSnapshot.distribSnapshotId,
						QEDtxContextInfo.distributedXid,
						QEDtxContextInfo.curcid,
						DtxContextToString(DistributedTransactionContext))));
2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080

		/*
		 * This is the second phase of the handshake we started in
		 * StartTransaction().  Here we get a "good" snapshot from our
		 * writer. In the process it is possible that we will change
		 * our transaction's xid (see phase-one in StartTransaction()).
		 *
		 * Here we depend on the absolute correctness of our
		 * writer-gang's info. We need the segmateSync to match *as
		 * well* as the distributed-xid since the QD may send multiple
		 * statements with the same distributed-xid/cid but
		 * *different* local-xids (MPP-3228). The dispatcher will
		 * distinguish such statements by the segmateSync.
		 *
		 * I believe that we still want the older sync mechanism ("ready" flag).
		 * since it tells the code in TransactionIdIsCurrentTransactionId() that the
		 * writer may be changing the local-xid (otherwise it would be possible for
		 * cursor reader gangs to get confused).
		 */
		for (;;)
		{
			if (QEwriterSnapshotUpToDate())
			{
2081 2082
				LWLockAcquire(SharedLocalSnapshotSlot->slotLock, LW_SHARED);

2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096
				/*
				 * YAY we found it.  set the contents of the
				 * SharedLocalSnapshot to this and move on.
				 */
				snapshot->xmin = SharedLocalSnapshotSlot->snapshot.xmin;
				snapshot->xmax = SharedLocalSnapshotSlot->snapshot.xmax;
				snapshot->xcnt = SharedLocalSnapshotSlot->snapshot.xcnt;

				/* We now capture our current view of the xip/combocid arrays */
				memcpy(snapshot->xip, SharedLocalSnapshotSlot->snapshot.xip, snapshot->xcnt * sizeof(TransactionId));
				memset(snapshot->xip + snapshot->xcnt, 0, (arrayP->maxProcs - snapshot->xcnt) * sizeof(TransactionId));

				snapshot->curcid = SharedLocalSnapshotSlot->snapshot.curcid;

2097 2098
				snapshot->subxcnt = -1;

2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113
				/* combocid */
				if (usedComboCids != SharedLocalSnapshotSlot->combocidcnt)
				{
					if (usedComboCids == 0)
					{
						MemoryContext oldCtx =  MemoryContextSwitchTo(TopTransactionContext);
						comboCids = palloc(SharedLocalSnapshotSlot->combocidcnt * sizeof(ComboCidKeyData));
						MemoryContextSwitchTo(oldCtx);
					}
					else
						repalloc(comboCids, SharedLocalSnapshotSlot->combocidcnt * sizeof(ComboCidKeyData));
				}
				memcpy(comboCids, (char *)SharedLocalSnapshotSlot->combocids, SharedLocalSnapshotSlot->combocidcnt * sizeof(ComboCidKeyData));
				usedComboCids = ((SharedLocalSnapshotSlot->combocidcnt < MaxComboCids) ? SharedLocalSnapshotSlot->combocidcnt : MaxComboCids);

2114 2115 2116 2117 2118
				uint32 segmateSync = SharedLocalSnapshotSlot->segmateSync;
				uint32 comboCidCnt = SharedLocalSnapshotSlot->combocidcnt;

				LWLockRelease(SharedLocalSnapshotSlot->slotLock);

2119 2120 2121 2122
				ereport((Debug_print_snapshot_dtm ? LOG : DEBUG5),
						(errmsg("Reader qExec usedComboCids: %d shared %d segmateSync %d",
								usedComboCids, comboCidCnt, segmateSync)));

2123 2124
				SetSharedTransactionId_reader(SharedLocalSnapshotSlot->xid,
											  SharedLocalSnapshotSlot->snapshot.curcid);
2125

2126 2127 2128
				ereport((Debug_print_snapshot_dtm ? LOG : DEBUG5),
						(errmsg("Reader qExec setting shared local snapshot to: xmin: %d xmax: %d curcid: %d",
								snapshot->xmin, snapshot->xmax, snapshot->curcid)));
2129

2130 2131 2132
				ereport((Debug_print_snapshot_dtm ? LOG : DEBUG5),
						(errmsg("GetSnapshotData(): READER currentcommandid %d curcid %d segmatesync %d",
								GetCurrentCommandId(false), snapshot->curcid, segmateSync)));
2133

X
xiong-gang 已提交
2134 2135 2136
				if (TransactionIdPrecedes(snapshot->xmin, TransactionXmin))
					TransactionXmin = snapshot->xmin;

2137 2138 2139 2140 2141
				return snapshot;
			}
			else
			{
				/*
2142
				 * didn't find it. we'll sleep for a small amount of time and
2143 2144
				 * then try again.
				 *
2145
				 * TODO: is there a semaphore or something better we can do here.
2146 2147 2148 2149 2150 2151 2152 2153
				 */
				pg_usleep(sleep_per_check_us);

				CHECK_FOR_INTERRUPTS();

				warning_sleep_time_us += sleep_per_check_us;
				total_sleep_time_us += sleep_per_check_us;

2154 2155
				LWLockAcquire(SharedLocalSnapshotSlot->slotLock, LW_SHARED);

2156 2157 2158 2159 2160 2161
				if (total_sleep_time_us >= segmate_timeout_us)
				{
					ereport(ERROR,
							(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
							 errmsg("GetSnapshotData timed out waiting for Writer to set the shared snapshot."),
							 errdetail("We are waiting for the shared snapshot to have XID: %d but the value "
2162
									   "is currently: %d."
2163 2164 2165 2166 2167
									   " waiting for cid to be %d but is currently %d.  ready=%d."
									   "DistributedTransactionContext = %s. "
									   " Our slotindex is: %d \n"
									   "Dump of all sharedsnapshots in shmem: %s",
									   QEDtxContextInfo.distributedXid, SharedLocalSnapshotSlot->QDxid,
2168
									   QEDtxContextInfo.curcid,
2169
									   SharedLocalSnapshotSlot->QDcid, SharedLocalSnapshotSlot->ready,
2170 2171 2172 2173 2174 2175 2176 2177
									   DtxContextToString(DistributedTransactionContext),
									   SharedLocalSnapshotSlot->slotindex, SharedSnapshotDump())));
				}
				else if (warning_sleep_time_us > 1000 * 1000)
				{
					/*
					 * Every second issue warning.
					 */
2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201
					ereport((Debug_print_snapshot_dtm ? LOG : DEBUG5),
							(errmsg("[Distributed Snapshot #%u] *No Match* gxid %u = %u and currcid %d = %d (%s)",
									QEDtxContextInfo.distributedSnapshot.distribSnapshotId,
									QEDtxContextInfo.distributedXid,
									SharedLocalSnapshotSlot->QDxid,
									QEDtxContextInfo.curcid,
									SharedLocalSnapshotSlot->QDcid,
									DtxContextToString(DistributedTransactionContext))));


					ereport(LOG,
							(errmsg("GetSnapshotData did not find shared local snapshot information. "
									"We are waiting for the shared snapshot to have XID: %d/%u but the value "
									"is currently: %d/%u."
									" waiting for cid to be %d but is currently %d.  ready=%d."
									" Our slotindex is: %d \n"
									"DistributedTransactionContext = %s.",
									QEDtxContextInfo.distributedXid, QEDtxContextInfo.segmateSync,
									SharedLocalSnapshotSlot->QDxid, SharedLocalSnapshotSlot->segmateSync,
									QEDtxContextInfo.curcid,
									SharedLocalSnapshotSlot->QDcid,
									SharedLocalSnapshotSlot->ready,
									SharedLocalSnapshotSlot->slotindex,
									DtxContextToString(DistributedTransactionContext))));
2202 2203 2204
					warning_sleep_time_us = 0;
				}

2205
				LWLockRelease(SharedLocalSnapshotSlot->slotLock);
2206 2207 2208
				/* UNDONE: Back-off from checking every millisecond... */
			}
		}
2209 2210
	}

2211 2212 2213 2214
	/* We must not be a reader. */
	Assert(DistributedTransactionContext != DTX_CONTEXT_QE_READER);
	Assert(DistributedTransactionContext != DTX_CONTEXT_QE_ENTRY_DB_SINGLETON);

2215
	/*
B
Bruce Momjian 已提交
2216
	 * It is sufficient to get shared lock on ProcArrayLock, even if we are
2217
	 * going to set MyProc->xmin.
2218
	 */
2219
	LWLockAcquire(ProcArrayLock, LW_SHARED);
2220

2221 2222 2223 2224
	/* xmax is always latestCompletedXid + 1 */
	xmax = ShmemVariableCache->latestCompletedXid;
	Assert(TransactionIdIsNormal(xmax));
	TransactionIdAdvance(xmax);
2225

2226 2227
	/* initialize xmin calculation with xmax */
	globalxmin = xmin = xmax;
2228

2229 2230 2231
	ereport((Debug_print_full_dtm ? LOG : DEBUG5),
			(errmsg("GetSnapshotData setting globalxmin and xmin to %u",
					xmin)));
2232

2233 2234 2235 2236 2237 2238 2239 2240 2241 2242
	/*
	 * Get the distributed snapshot if needed and copy it into the field 
	 * called distribSnapshotWithLocalMapping in the snapshot structure.
	 *
	 * For a distributed transaction:
	 *   => The corrresponding distributed snapshot is made up of distributed
	 *      xids from the DTM that are considered in-progress will be kept in
	 *      the snapshot structure separately from any local in-progress xact.
	 *
	 *      The MVCC function XidInSnapshot is used to evaluate whether
2243 2244
	 *      a tuple is visible through a snapshot. Only committed xids are
	 *      given to XidInSnapshot for evaluation. XidInSnapshot will first
2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263
	 *      determine if the committed tuple is for a distributed transaction.  
	 *      If the xact is distributed it will be evaluated only against the
	 *      distributed snapshot and not the local snapshot.
	 *
	 *      Otherwise, when the committed transaction being evaluated is local,
	 *      then it will be evaluated only against the local portion of the
	 *      snapshot.
	 *
	 * For a local transaction:
	 *   => Only the local portion of the snapshot: xmin, xmax, xcnt,
	 *      in-progress (xip), etc, will be filled in.
	 *
	 *      Note that in-progress distributed transactions that have reached
	 *      this database instance and are active will be represented in the
	 *      local in-progress (xip) array with the distributed transaction's
	 *      local xid.
	 *
	 * In summary: This 2 snapshot scheme (optional distributed, required local)
	 * handles late arriving distributed transactions properly since that work
2264
	 * is only evaluated against the distributed snapshot. And, the scheme
2265 2266 2267 2268
	 * handles local transaction work seeing distributed work properly by
	 * including distributed transactions in the local snapshot via their
	 * local xids.
	 */
2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280
	if (DistributedTransactionContext == DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE)
	{
		snapshot->haveDistribSnapshot = CreateDistributedSnapshot(&snapshot->distribSnapshotWithLocalMapping);

		ereport((Debug_print_full_dtm ? LOG : DEBUG5),
				(errmsg("Got distributed snapshot from DistributedSnapshotWithLocalXids_Create = %s",
						(snapshot->haveDistribSnapshot ? "true" : "false"))));

		/* Nice that we may have collected it, but turn it off... */
		if (Debug_disable_distributed_snapshot)
			snapshot->haveDistribSnapshot = false;
	}
2281 2282

	/*
2283
	 * If we're in recovery then snapshot data comes from a different place,
B
Bruce Momjian 已提交
2284 2285
	 * so decide which route we take before grab the lock. It is possible for
	 * recovery to end before we finish taking snapshot, and for newly
2286 2287 2288
	 * assigned transaction ids to be added to the procarray. Xmax cannot
	 * change while we hold ProcArrayLock, so those newly added transaction
	 * ids would be filtered away, so we need not be concerned about them.
2289
	 */
2290 2291
	snapshot->takenDuringRecovery = RecoveryInProgress();

2292
	if (!snapshot->takenDuringRecovery)
2293 2294
	{
		/*
B
Bruce Momjian 已提交
2295 2296
		 * Spin over procArray checking xid, xmin, and subxids.  The goal is
		 * to gather all active xids, find the lowest xmin, and try to record
2297 2298 2299 2300
		 * subxids. During recovery no xids will be assigned, so all normal
		 * backends can be ignored, nor are there any VACUUMs running. All
		 * prepared transaction xids are held in KnownAssignedXids, so these
		 * will be seen without needing to loop through procs here.
2301
		 */
2302
		for (index = 0; index < arrayP->numProcs; index++)
2303
		{
2304 2305
			volatile PGPROC *proc = arrayP->procs[index];
			TransactionId xid;
2306

2307
#if 0 /* Upstream code not applicable to GPDB, why explained in vacuumStatement_Relation */
2308 2309
			/* Ignore procs running LAZY VACUUM */
			if (proc->vacuumFlags & PROC_IN_VACUUM)
2310
				continue;
2311
#endif
2312

2313
			/* Update globalxmin to be the smallest valid xmin */
B
Bruce Momjian 已提交
2314
			xid = proc->xmin;	/* fetch just once */
2315 2316 2317
			if (TransactionIdIsNormal(xid) &&
				TransactionIdPrecedes(xid, globalxmin))
				globalxmin = xid;
2318

2319 2320
			/* Fetch xid just once - see GetNewTransactionId */
			xid = proc->xid;
2321

2322
			/*
B
Bruce Momjian 已提交
2323 2324 2325 2326
			 * If the transaction has been assigned an xid < xmax we add it to
			 * the snapshot, and update xmin if necessary.	There's no need to
			 * store XIDs >= xmax, since we'll treat them as running anyway.
			 * We don't bother to examine their subxids either.
2327
			 *
B
Bruce Momjian 已提交
2328 2329
			 * We don't include our own XID (if any) in the snapshot, but we
			 * must include it into xmin.
2330 2331
			 */
			if (TransactionIdIsNormal(xid))
2332
			{
2333 2334 2335 2336 2337 2338 2339
				if (TransactionIdFollowsOrEquals(xid, xmax))
					continue;
				if (proc != MyProc)
					snapshot->xip[count++] = xid;
				if (TransactionIdPrecedes(xid, xmin))
					xmin = xid;
			}
2340

2341
			/*
B
Bruce Momjian 已提交
2342 2343 2344 2345 2346
			 * Save subtransaction XIDs if possible (if we've already
			 * overflowed, there's no point).  Note that the subxact XIDs must
			 * be later than their parent, so no need to check them against
			 * xmin.  We could filter against xmax, but it seems better not to
			 * do that much work while holding the ProcArrayLock.
2347 2348
			 *
			 * The other backend can add more subxids concurrently, but cannot
B
Bruce Momjian 已提交
2349 2350 2351 2352
			 * remove any.	Hence it's important to fetch nxids just once.
			 * Should be safe to use memcpy, though.  (We needn't worry about
			 * missing any xids added concurrently, because they must postdate
			 * xmax.)
2353 2354 2355 2356 2357 2358 2359 2360
			 *
			 * Again, our own XIDs are not included in the snapshot.
			 */
			if (!suboverflowed && proc != MyProc)
			{
				if (proc->subxids.overflowed)
					suboverflowed = true;
				else
2361
				{
2362 2363 2364 2365 2366 2367 2368 2369 2370
					int			nxids = proc->subxids.nxids;

					if (nxids > 0)
					{
						memcpy(snapshot->subxip + subcount,
							   (void *) proc->subxids.xids,
							   nxids * sizeof(TransactionId));
						subcount += nxids;
					}
2371 2372 2373
				}
			}
		}
2374
	}
2375
	else
2376 2377
	{
		/*
2378 2379
		 * We're in hot standby, so get XIDs from KnownAssignedXids.
		 *
2380 2381 2382 2383 2384
		 * We store all xids directly into subxip[]. Here's why:
		 *
		 * In recovery we don't know which xids are top-level and which are
		 * subxacts, a design choice that greatly simplifies xid processing.
		 *
B
Bruce Momjian 已提交
2385 2386 2387 2388
		 * It seems like we would want to try to put xids into xip[] only, but
		 * that is fairly small. We would either need to make that bigger or
		 * to increase the rate at which we WAL-log xid assignment; neither is
		 * an appealing choice.
2389 2390 2391 2392
		 *
		 * We could try to store xids into xip[] first and then into subxip[]
		 * if there are too many xids. That only works if the snapshot doesn't
		 * overflow because we do not search subxip[] in that case. A simpler
B
Bruce Momjian 已提交
2393 2394
		 * way is to just store all xids in the subxact array because this is
		 * by far the bigger array. We just leave the xip array empty.
2395 2396 2397 2398 2399
		 *
		 * Either way we need to change the way XidInMVCCSnapshot() works
		 * depending upon when the snapshot was taken, or change normal
		 * snapshot processing so it matches.
		 */
2400 2401
		subcount = KnownAssignedXidsGetAndSetXmin(snapshot->subxip, &xmin,
												  xmax);
2402

2403
		if (TransactionIdPrecedesOrEquals(xmin, procArray->lastOverflowedXid))
2404 2405
			suboverflowed = true;
	}
2406

2407
	if (!TransactionIdIsValid(MyProc->xmin))
2408 2409 2410
	{
		/* Not that these values are not set atomically. However,
		 * each of these assignments is itself assumed to be atomic. */
2411
		MyProc->xmin = TransactionXmin = xmin;
2412 2413 2414 2415 2416
	}
	if (IsXactIsoLevelSerializable)
	{
		MyProc->serializableIsoLevel = true;

2417 2418 2419
		ereport((Debug_print_snapshot_dtm ? LOG : DEBUG3),
				(errmsg("Got serializable snapshot: database %d, pid %d, xid %d, xmin %d",
						MyProc->databaseId, MyProc->pid, MyProc->xid, MyProc->xmin)));
2420
	}
2421 2422 2423 2424

	LWLockRelease(ProcArrayLock);

	/*
B
Bruce Momjian 已提交
2425 2426 2427
	 * Update globalxmin to include actual process xids.  This is a slightly
	 * different way of computing it than GetOldestXmin uses, but should give
	 * the same result.
2428 2429 2430 2431
	 */
	if (TransactionIdPrecedes(xmin, globalxmin))
		globalxmin = xmin;

2432
	if (DistributedTransactionContext == DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE)
2433
	{
2434
		DistributedLog_AdvanceOldestXminOnQD(globalxmin);
2435 2436
	}
	else
2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462
	{
		/*
		 * Fill in the distributed snapshot information we received from the
		 * the QD.  Unless we are the QD, in which case we already created a
		 * new distributed snapshot above.
		 *
		 * (We do this after releasing ProcArrayLock, to reduce contention.)
		 */
		FillInDistributedSnapshot(snapshot);

		/*
		 * In computing RecentGlobalXmin, also take distributed snapshots into
		 * account.
		 */
		if (snapshot->haveDistribSnapshot)
		{
			DistributedSnapshot *ds = &snapshot->distribSnapshotWithLocalMapping.ds;

			globalxmin =
				DistributedLog_AdvanceOldestXmin(globalxmin,
												 ds->distribTransactionTimeStamp,
												 ds->xminAllDistributedSnapshots);
		}
		else
			globalxmin = DistributedLog_GetOldestXmin(globalxmin);
	}
2463

2464
	/* Update global variables too */
2465 2466 2467
	RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
	if (!TransactionIdIsNormal(RecentGlobalXmin))
		RecentGlobalXmin = FirstNormalTransactionId;
2468 2469 2470 2471 2472
	RecentXmin = xmin;

	snapshot->xmin = xmin;
	snapshot->xmax = xmax;
	snapshot->xcnt = count;
2473
	snapshot->subxcnt = subcount;
2474
	snapshot->suboverflowed = suboverflowed;
2475

2476
	snapshot->curcid = GetCurrentCommandId(false);
2477

2478
	/*
2479 2480
	 * This is a new snapshot, so set both refcounts are zero, and mark it as
	 * not copied in persistent memory.
2481 2482 2483 2484 2485
	 */
	snapshot->active_count = 0;
	snapshot->regd_count = 0;
	snapshot->copied = false;

2486
	/*
2487 2488
	 * MPP Addition. If we are the chief then we'll save our local snapshot
	 * into the shared snapshot. Note: we need to use the shared local
2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499
	 * snapshot for the "Local Implicit using Distributed Snapshot" case, too.
	 */
	
	if ((DistributedTransactionContext == DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER ||
		 DistributedTransactionContext == DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER ||
		 DistributedTransactionContext == DTX_CONTEXT_QE_AUTO_COMMIT_IMPLICIT) &&
		SharedLocalSnapshotSlot != NULL)
	{
		updateSharedLocalSnapshot(&QEDtxContextInfo, snapshot, "GetSnapshotData");
	}

2500 2501 2502
	ereport((Debug_print_snapshot_dtm ? LOG : DEBUG5),
			(errmsg("GetSnapshotData(): WRITER currentcommandid %d curcid %d segmatesync %d",
					GetCurrentCommandId(false), snapshot->curcid, QEDtxContextInfo.segmateSync)));
2503

2504 2505 2506
	return snapshot;
}

2507
/*
2508
 * GetRunningTransactionData -- returns information about running transactions.
2509
 *
2510
 * Similar to GetSnapshotData but returns more information. We include
2511
 * all PGPROCs with an assigned TransactionId, even VACUUM processes.
2512
 *
2513 2514
 * The returned data structure is statically allocated; caller should not
 * modify it, and must not assume it is valid past the next call.
2515
 *
2516 2517 2518 2519 2520 2521
 * This is never executed during recovery so there is no need to look at
 * KnownAssignedXids.
 *
 * We don't worry about updating other counters, we want to keep this as
 * simple as possible and leave GetSnapshotData() as the primary code for
 * that bookkeeping.
2522
 */
2523 2524 2525
RunningTransactions
GetRunningTransactionData(void)
{
2526 2527 2528
	/* result workspace */
	static RunningTransactionsData CurrentRunningXactsData;

2529
	ProcArrayStruct *arrayP = procArray;
2530
	RunningTransactions CurrentRunningXacts = &CurrentRunningXactsData;
2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546
	TransactionId latestCompletedXid;
	TransactionId oldestRunningXid;
	TransactionId *xids;
	int			index;
	int			count;
	int			subcount;
	bool		suboverflowed;

	Assert(!RecoveryInProgress());

	/*
	 * Allocating space for maxProcs xids is usually overkill; numProcs would
	 * be sufficient.  But it seems better to do the malloc while not holding
	 * the lock, so we can't look at numProcs.  Likewise, we allocate much
	 * more subxip storage than is probably needed.
	 *
2547
	 * Should only be allocated in bgwriter, since only ever executed during
B
Bruce Momjian 已提交
2548
	 * checkpoints.
2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577
	 */
	if (CurrentRunningXacts->xids == NULL)
	{
		/*
		 * First call
		 */
		CurrentRunningXacts->xids = (TransactionId *)
			malloc(TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
		if (CurrentRunningXacts->xids == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
	}

	xids = CurrentRunningXacts->xids;

	count = subcount = 0;
	suboverflowed = false;

	/*
	 * Ensure that no xids enter or leave the procarray while we obtain
	 * snapshot.
	 */
	LWLockAcquire(ProcArrayLock, LW_SHARED);
	LWLockAcquire(XidGenLock, LW_SHARED);

	latestCompletedXid = ShmemVariableCache->latestCompletedXid;

	oldestRunningXid = ShmemVariableCache->nextXid;
B
Bruce Momjian 已提交
2578

2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603
	/*
	 * Spin over procArray collecting all xids and subxids.
	 */
	for (index = 0; index < arrayP->numProcs; index++)
	{
		volatile PGPROC *proc = arrayP->procs[index];
		TransactionId xid;
		int			nxids;

		/* Fetch xid just once - see GetNewTransactionId */
		xid = proc->xid;

		/*
		 * We don't need to store transactions that don't have a TransactionId
		 * yet because they will not show as running on a standby server.
		 */
		if (!TransactionIdIsValid(xid))
			continue;

		xids[count++] = xid;

		if (TransactionIdPrecedes(xid, oldestRunningXid))
			oldestRunningXid = xid;

		/*
B
Bruce Momjian 已提交
2604 2605
		 * Save subtransaction XIDs. Other backends can't add or remove
		 * entries while we're holding XidGenLock.
2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629
		 */
		nxids = proc->subxids.nxids;
		if (nxids > 0)
		{
			memcpy(&xids[count], (void *) proc->subxids.xids,
				   nxids * sizeof(TransactionId));
			count += nxids;
			subcount += nxids;

			if (proc->subxids.overflowed)
				suboverflowed = true;

			/*
			 * Top-level XID of a transaction is always greater than any of
			 * its subxids, so we don't need to check if any of the subxids
			 * are smaller than oldestRunningXid
			 */
		}
	}

	CurrentRunningXacts->xcnt = count;
	CurrentRunningXacts->subxid_overflow = suboverflowed;
	CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid;
	CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
2630
	CurrentRunningXacts->latestCompletedXid = latestCompletedXid;
2631 2632 2633 2634

	LWLockRelease(XidGenLock);
	LWLockRelease(ProcArrayLock);

2635 2636 2637 2638
	Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid));
	Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid));
	Assert(TransactionIdIsNormal(CurrentRunningXacts->latestCompletedXid));

2639 2640 2641
	return CurrentRunningXacts;
}

2642
/*
2643 2644
 * GetVirtualXIDsDelayingChkpt -- Get the VXIDs of transactions that are
 * delaying checkpoint because they have critical actions in progress.
2645
 *
2646 2647
 * Constructs an array of VXIDs of transactions that are currently in commit
 * critical sections, as shown by having inCommit set in their PGXACT.
2648
 *
2649 2650
 * Returns a palloc'd array that should be freed by the caller.
 * *nvxids is the number of valid entries.
2651 2652 2653 2654 2655
 *
 * Note that because backends set or clear inCommit without holding any lock,
 * the result is somewhat indeterminate, but we don't really care.  Even in
 * a multiprocessor with delayed writes to shared memory, it should be certain
 * that setting of inCommit will propagate to shared memory when the backend
2656
 * takes a lock, so we cannot fail to see a virtual xact as inCommit if
2657 2658 2659
 * it's already inserted its commit record.  Whether it takes a little while
 * for clearing of inCommit to propagate is unimportant for correctness.
 */
2660 2661
VirtualTransactionId *
GetVirtualXIDsDelayingChkpt(int *nvxids)
2662
{
2663
	VirtualTransactionId *vxids;
2664
	ProcArrayStruct *arrayP = procArray;
2665
	int			count = 0;
B
Bruce Momjian 已提交
2666
	int			index;
2667

2668 2669 2670
	/* allocate what's certainly enough result space */
	vxids = (VirtualTransactionId *)
		palloc(sizeof(VirtualTransactionId) * arrayP->maxProcs);
2671 2672 2673 2674 2675

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
B
Bruce Momjian 已提交
2676 2677
		volatile PGPROC *proc = arrayP->procs[index];

2678 2679 2680
		if (proc->inCommit)
		{
			VirtualTransactionId vxid;
2681

2682 2683 2684 2685
			GET_VXID_FROM_PGPROC(vxid, *proc);
			if (VirtualTransactionIdIsValid(vxid))
				vxids[count++] = vxid;
		}
2686 2687 2688 2689
	}

	LWLockRelease(ProcArrayLock);

2690 2691
	*nvxids = count;
	return vxids;
2692 2693 2694
}

/*
2695
 * HaveVirtualXIDsDelayingChkpt -- Are any of the specified VXIDs delaying?
2696
 *
2697 2698
 * This is used with the results of GetVirtualXIDsDelayingChkpt to see if any
 * of the specified VXIDs are still in critical sections of code.
2699
 *
2700
 * Note: this is O(N^2) in the number of vxacts that are/were delaying, but
2701 2702 2703
 * those numbers should be small enough for it not to be a problem.
 */
bool
2704
HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids)
2705
{
B
Bruce Momjian 已提交
2706
	bool		result = false;
2707
	ProcArrayStruct *arrayP = procArray;
B
Bruce Momjian 已提交
2708
	int			index;
2709 2710 2711 2712 2713

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
B
Bruce Momjian 已提交
2714
		volatile PGPROC *proc = arrayP->procs[index];
2715
		VirtualTransactionId vxid;
B
Bruce Momjian 已提交
2716

2717
		GET_VXID_FROM_PGPROC(vxid, *proc);
2718

2719
		if (proc->inCommit && VirtualTransactionIdIsValid(vxid))
2720
		{
B
Bruce Momjian 已提交
2721
			int			i;
2722

2723
			for (i = 0; i < nvxids; i++)
2724
			{
2725
				if (VirtualTransactionIdEquals(vxid, vxids[i]))
2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740
				{
					result = true;
					break;
				}
			}
			if (result)
				break;
		}
	}

	LWLockRelease(ProcArrayLock);

	return result;
}

2741 2742 2743 2744
/*
 * MPP: Special code to update the command id in the SharedLocalSnapshot
 * when we are in SERIALIZABLE isolation mode.
 */
2745 2746
void
UpdateSerializableCommandId(CommandId curcid)
2747 2748 2749 2750
{
	if ((DistributedTransactionContext == DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER ||
		 DistributedTransactionContext == DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER) &&
		 SharedLocalSnapshotSlot != NULL &&
2751
		 FirstSnapshotSet)
2752 2753 2754
	{
		int combocidSize;

2755 2756
		LWLockAcquire(SharedLocalSnapshotSlot->slotLock, LW_EXCLUSIVE);

2757 2758
		if (SharedLocalSnapshotSlot->QDxid != QEDtxContextInfo.distributedXid)
		{
2759 2760 2761 2762 2763 2764
			ereport((Debug_print_snapshot_dtm ? LOG : DEBUG5),
					(errmsg("[Distributed Snapshot #%u] *Can't Update Serializable Command Id* QDxid = %u (gxid = %u, '%s')",
							QEDtxContextInfo.distributedSnapshot.distribSnapshotId,
							SharedLocalSnapshotSlot->QDxid,
							getDistributedTransactionId(),
							DtxContextToString(DistributedTransactionContext))));
2765
			LWLockRelease(SharedLocalSnapshotSlot->slotLock);
2766 2767 2768
			return;
		}

2769
		ereport((Debug_print_snapshot_dtm ? LOG : DEBUG5),
2770
				(errmsg("[Distributed Snapshot #%u] *Update Serializable Command Id* segment currcid = %d, QDcid = %d, TransactionSnapshot currcid = %d, Shared currcid = %d (gxid = %u, '%s')",
2771 2772 2773
						QEDtxContextInfo.distributedSnapshot.distribSnapshotId,
						QEDtxContextInfo.curcid,
						SharedLocalSnapshotSlot->QDcid,
2774
						curcid,
2775 2776 2777 2778 2779 2780 2781
						SharedLocalSnapshotSlot->snapshot.curcid,
						getDistributedTransactionId(),
						DtxContextToString(DistributedTransactionContext))));

		ereport((Debug_print_snapshot_dtm ? LOG : DEBUG5),
				(errmsg("serializable writer updating combocid: used combocids %d shared %d",
						usedComboCids, SharedLocalSnapshotSlot->combocidcnt)));
2782 2783 2784 2785 2786 2787 2788

		combocidSize = ((usedComboCids < MaxComboCids) ? usedComboCids : MaxComboCids );

		SharedLocalSnapshotSlot->combocidcnt = combocidSize;	
		memcpy((void *)SharedLocalSnapshotSlot->combocids, comboCids,
			   combocidSize * sizeof(ComboCidKeyData));

2789
		SharedLocalSnapshotSlot->snapshot.curcid = curcid;
2790 2791 2792
		SharedLocalSnapshotSlot->QDcid = QEDtxContextInfo.curcid;
		SharedLocalSnapshotSlot->segmateSync = QEDtxContextInfo.segmateSync;

2793
		LWLockRelease(SharedLocalSnapshotSlot->slotLock);
2794 2795 2796
	}
}

2797 2798
/*
 * BackendPidGetProc -- get a backend's PGPROC given its PID
2799 2800 2801 2802
 *
 * Returns NULL if not found.  Note that it is up to the caller to be
 * sure that the question remains meaningful for long enough for the
 * answer to be used ...
2803
 */
2804
PGPROC *
2805 2806 2807 2808 2809 2810
BackendPidGetProc(int pid)
{
	PGPROC	   *result = NULL;
	ProcArrayStruct *arrayP = procArray;
	int			index;

2811 2812 2813
	if (pid == 0)				/* never match dummy PGPROCs */
		return NULL;

2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831
	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
		PGPROC	   *proc = arrayP->procs[index];

		if (proc->pid == pid)
		{
			result = proc;
			break;
		}
	}

	LWLockRelease(ProcArrayLock);

	return result;
}

T
Tatsuo Ishii 已提交
2832 2833 2834 2835 2836 2837
/*
 * BackendXidGetPid -- get a backend's pid given its XID
 *
 * Returns 0 if not found or it's a prepared transaction.  Note that
 * it is up to the caller to be sure that the question remains
 * meaningful for long enough for the answer to be used ...
B
Bruce Momjian 已提交
2838
 *
T
Tatsuo Ishii 已提交
2839 2840
 * Only main transaction Ids are considered.  This function is mainly
 * useful for determining what backend owns a lock.
2841
 *
B
Bruce Momjian 已提交
2842
 * Beware that not every xact has an XID assigned.	However, as long as you
2843
 * only call this using an XID found on disk, you're safe.
T
Tatsuo Ishii 已提交
2844 2845 2846 2847
 */
int
BackendXidGetPid(TransactionId xid)
{
B
Bruce Momjian 已提交
2848
	int			result = 0;
T
Tatsuo Ishii 已提交
2849 2850 2851 2852 2853 2854 2855 2856 2857 2858
	ProcArrayStruct *arrayP = procArray;
	int			index;

	if (xid == InvalidTransactionId)	/* never match invalid xid */
		return 0;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
2859
		volatile PGPROC *proc = arrayP->procs[index];
T
Tatsuo Ishii 已提交
2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872

		if (proc->xid == xid)
		{
			result = proc->pid;
			break;
		}
	}

	LWLockRelease(ProcArrayLock);

	return result;
}

2873 2874 2875 2876 2877 2878 2879 2880 2881
/*
 * IsBackendPid -- is a given pid a running backend
 */
bool
IsBackendPid(int pid)
{
	return (BackendPidGetProc(pid) != NULL);
}

2882 2883 2884 2885

/*
 * GetCurrentVirtualXIDs -- returns an array of currently active VXIDs.
 *
2886
 * The array is palloc'd. The number of valid entries is returned into *nvxids.
2887
 *
2888 2889 2890 2891 2892 2893 2894 2895
 * The arguments allow filtering the set of VXIDs returned.  Our own process
 * is always skipped.  In addition:
 *	If limitXmin is not InvalidTransactionId, skip processes with
 *		xmin > limitXmin.
 *	If excludeXmin0 is true, skip processes with xmin = 0.
 *	If allDbs is false, skip processes attached to other databases.
 *	If excludeVacuum isn't zero, skip processes for which
 *		(vacuumFlags & excludeVacuum) is not zero.
2896
 *
2897 2898 2899 2900 2901
 * Note: the purpose of the limitXmin and excludeXmin0 parameters is to
 * allow skipping backends whose oldest live snapshot is no older than
 * some snapshot we have.  Since we examine the procarray with only shared
 * lock, there are race conditions: a backend could set its xmin just after
 * we look.  Indeed, on multiprocessors with weak memory ordering, the
2902
 * other backend could have set its xmin *before* we look.	We know however
2903 2904 2905 2906 2907
 * that such a backend must have held shared ProcArrayLock overlapping our
 * own hold of ProcArrayLock, else we would see its xmin update.  Therefore,
 * any snapshot the other backend is taking concurrently with our scan cannot
 * consider any transactions as still running that we think are committed
 * (since backends must hold ProcArrayLock exclusive to commit).
2908 2909
 */
VirtualTransactionId *
2910 2911 2912
GetCurrentVirtualXIDs(TransactionId limitXmin, bool excludeXmin0,
					  bool allDbs, int excludeVacuum,
					  int *nvxids)
2913 2914 2915 2916 2917 2918
{
	VirtualTransactionId *vxids;
	ProcArrayStruct *arrayP = procArray;
	int			count = 0;
	int			index;

2919
	/* allocate what's certainly enough result space */
2920
	vxids = (VirtualTransactionId *)
2921
		palloc(sizeof(VirtualTransactionId) * arrayP->maxProcs);
2922 2923 2924 2925 2926

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
B
Bruce Momjian 已提交
2927
		volatile PGPROC *proc = arrayP->procs[index];
2928 2929 2930 2931

		if (proc == MyProc)
			continue;

2932 2933 2934
		if (excludeVacuum & proc->vacuumFlags)
			continue;

2935
		if (allDbs || proc->databaseId == MyDatabaseId)
2936
		{
2937
			/* Fetch xmin just once - might change on us */
2938 2939
			TransactionId pxmin = proc->xmin;

2940 2941 2942
			if (excludeXmin0 && !TransactionIdIsValid(pxmin))
				continue;

2943
			/*
2944 2945
			 * InvalidTransactionId precedes all other XIDs, so a proc that
			 * hasn't set xmin yet will not be rejected by this test.
2946 2947
			 */
			if (!TransactionIdIsValid(limitXmin) ||
2948
				TransactionIdPrecedesOrEquals(pxmin, limitXmin))
2949 2950
			{
				VirtualTransactionId vxid;
2951

2952 2953 2954 2955
				GET_VXID_FROM_PGPROC(vxid, *proc);
				if (VirtualTransactionIdIsValid(vxid))
					vxids[count++] = vxid;
			}
2956 2957 2958 2959 2960
		}
	}

	LWLockRelease(ProcArrayLock);

2961
	*nvxids = count;
2962 2963 2964
	return vxids;
}

2965 2966 2967 2968 2969 2970
/*
 * GetConflictingVirtualXIDs -- returns an array of currently active VXIDs.
 *
 * Usage is limited to conflict resolution during recovery on standby servers.
 * limitXmin is supplied as either latestRemovedXid, or InvalidTransactionId
 * in cases where we cannot accurately determine a value for latestRemovedXid.
2971
 *
2972 2973 2974
 * If limitXmin is InvalidTransactionId then we want to kill everybody,
 * so we're not worried if they have a snapshot or not, nor does it really
 * matter what type of lock we hold.
2975
 *
2976 2977 2978 2979 2980 2981 2982 2983 2984 2985
 * All callers that are checking xmins always now supply a valid and useful
 * value for limitXmin. The limitXmin is always lower than the lowest
 * numbered KnownAssignedXid that is not already a FATAL error. This is
 * because we only care about cleanup records that are cleaning up tuple
 * versions from committed transactions. In that case they will only occur
 * at the point where the record is less than the lowest running xid. That
 * allows us to say that if any backend takes a snapshot concurrently with
 * us then the conflict assessment made here would never include the snapshot
 * that is being derived. So we take LW_SHARED on the ProcArray and allow
 * concurrent snapshots when limitXmin is valid. We might think about adding
B
Bruce Momjian 已提交
2986
 *	 Assert(limitXmin < lowest(KnownAssignedXids))
2987 2988
 * but that would not be true in the case of FATAL errors lagging in array,
 * but we already know those are bogus anyway, so we skip that test.
2989
 *
2990
 * If dbOid is valid we skip backends attached to other databases.
2991 2992 2993 2994 2995
 *
 * Be careful to *not* pfree the result from this function. We reuse
 * this array sufficiently often that we use malloc for the result.
 */
VirtualTransactionId *
2996
GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
2997 2998 2999 3000 3001 3002 3003 3004
{
	static VirtualTransactionId *vxids;
	ProcArrayStruct *arrayP = procArray;
	int			count = 0;
	int			index;

	/*
	 * If not first time through, get workspace to remember main XIDs in. We
B
Bruce Momjian 已提交
3005 3006
	 * malloc it permanently to avoid repeated palloc/pfree overhead. Allow
	 * result space, remembering room for a terminator.
3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017
	 */
	if (vxids == NULL)
	{
		vxids = (VirtualTransactionId *)
			malloc(sizeof(VirtualTransactionId) * (arrayP->maxProcs + 1));
		if (vxids == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
	}

3018
	LWLockAcquire(ProcArrayLock, LW_SHARED);
3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034

	for (index = 0; index < arrayP->numProcs; index++)
	{
		volatile PGPROC *proc = arrayP->procs[index];

		/* Exclude prepared transactions */
		if (proc->pid == 0)
			continue;

		if (!OidIsValid(dbOid) ||
			proc->databaseId == dbOid)
		{
			/* Fetch xmin just once - can't change on us, but good coding */
			TransactionId pxmin = proc->xmin;

			/*
B
Bruce Momjian 已提交
3035 3036 3037
			 * We ignore an invalid pxmin because this means that backend has
			 * no snapshot and cannot get another one while we hold exclusive
			 * lock.
3038
			 */
3039 3040
			if (!TransactionIdIsValid(limitXmin) ||
				(TransactionIdIsValid(pxmin) && !TransactionIdFollows(pxmin, limitXmin)))
3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065
			{
				VirtualTransactionId vxid;

				GET_VXID_FROM_PGPROC(vxid, *proc);
				if (VirtualTransactionIdIsValid(vxid))
					vxids[count++] = vxid;
			}
		}
	}

	LWLockRelease(ProcArrayLock);

	/* add the terminator */
	vxids[count].backendId = InvalidBackendId;
	vxids[count].localTransactionId = InvalidLocalTransactionId;

	return vxids;
}

/*
 * CancelVirtualTransaction - used in recovery conflict processing
 *
 * Returns pid of the process signaled, or 0 if not found.
 */
pid_t
3066
CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode)
3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083
{
	ProcArrayStruct *arrayP = procArray;
	int			index;
	pid_t		pid = 0;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
		VirtualTransactionId procvxid;
		PGPROC	   *proc = arrayP->procs[index];

		GET_VXID_FROM_PGPROC(procvxid, *proc);

		if (procvxid.backendId == vxid.backendId &&
			procvxid.localTransactionId == vxid.localTransactionId)
		{
3084
			proc->recoveryConflictPending = true;
3085
			pid = proc->pid;
3086 3087 3088
			if (pid != 0)
			{
				/*
B
Bruce Momjian 已提交
3089 3090
				 * Kill the pid if it's still here. If not, that's what we
				 * wanted so ignore any errors.
3091 3092 3093
				 */
				(void) SendProcSignal(pid, sigmode, vxid.backendId);
			}
3094 3095 3096 3097 3098 3099 3100 3101
			break;
		}
	}

	LWLockRelease(ProcArrayLock);

	return pid;
}
3102

3103 3104 3105 3106 3107
/*
 * CountActiveBackends --- count backends (other than myself) that are in
 *		active transactions.  This is used as a heuristic to decide if
 *		a pre-XLOG-flush delay is worthwhile during commit.
 *
3108 3109
 * Do not count backends that are blocked waiting for locks, since they are
 * not going to get to run until someone else commits.
3110 3111 3112 3113 3114 3115 3116 3117 3118 3119
 */
int
CountActiveBackends(void)
{
	ProcArrayStruct *arrayP = procArray;
	int			count = 0;
	int			index;

	/*
	 * Note: for speed, we don't acquire ProcArrayLock.  This is a little bit
B
Bruce Momjian 已提交
3120 3121
	 * bogus, but since we are only testing fields for zero or nonzero, it
	 * should be OK.  The result is only used for heuristic purposes anyway...
3122 3123 3124
	 */
	for (index = 0; index < arrayP->numProcs; index++)
	{
3125 3126 3127 3128 3129 3130 3131 3132 3133
		volatile PGPROC *proc = arrayP->procs[index];

		/*
		 * Since we're not holding a lock, need to check that the pointer is
		 * valid. Someone holding the lock could have incremented numProcs
		 * already, but not yet inserted a valid pointer to the array.
		 *
		 * If someone just decremented numProcs, 'proc' could also point to a
		 * PGPROC entry that's no longer in the array. It still points to a
3134 3135 3136
		 * PGPROC struct, though, because freed PGPPROC entries just go to the
		 * free list and are recycled. Its contents are nonsense in that case,
		 * but that's acceptable for this function.
3137 3138 3139
		 */
		if (proc == NULL)
			continue;
3140 3141 3142

		if (proc == MyProc)
			continue;			/* do not count myself */
3143 3144 3145
		if (proc->pid == 0)
			continue;			/* do not count prepared xacts */
		if (proc->xid == InvalidTransactionId)
3146
			continue;			/* do not count if no XID assigned */
3147 3148 3149 3150 3151 3152 3153 3154
		if (proc->waitLock != NULL)
			continue;			/* do not count if blocked on a lock */
		count++;
	}

	return count;
}

3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168
/*
 * CountDBBackends --- count backends that are using specified database
 */
int
CountDBBackends(Oid databaseid)
{
	ProcArrayStruct *arrayP = procArray;
	int			count = 0;
	int			index;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
3169
		volatile PGPROC *proc = arrayP->procs[index];
3170 3171 3172

		if (proc->pid == 0)
			continue;			/* do not count prepared xacts */
3173 3174
		if (!OidIsValid(databaseid) ||
			proc->databaseId == databaseid)
3175 3176 3177 3178 3179 3180 3181 3182
			count++;
	}

	LWLockRelease(ProcArrayLock);

	return count;
}

3183 3184 3185 3186
/*
 * CancelDBBackends --- cancel backends that are using specified database
 */
void
3187
CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPending)
3188 3189 3190
{
	ProcArrayStruct *arrayP = procArray;
	int			index;
3191
	pid_t		pid = 0;
3192 3193 3194 3195 3196 3197 3198 3199

	/* tell all backends to die */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

	for (index = 0; index < arrayP->numProcs; index++)
	{
		volatile PGPROC *proc = arrayP->procs[index];

3200
		if (databaseid == InvalidOid || proc->databaseId == databaseid)
3201
		{
3202 3203 3204 3205
			VirtualTransactionId procvxid;

			GET_VXID_FROM_PGPROC(procvxid, *proc);

3206
			proc->recoveryConflictPending = conflictPending;
3207 3208 3209 3210
			pid = proc->pid;
			if (pid != 0)
			{
				/*
B
Bruce Momjian 已提交
3211 3212
				 * Kill the pid if it's still here. If not, that's what we
				 * wanted so ignore any errors.
3213
				 */
3214
				(void) SendProcSignal(pid, sigmode, procvxid.backendId);
3215
			}
3216 3217 3218 3219 3220 3221
		}
	}

	LWLockRelease(ProcArrayLock);
}

3222 3223 3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235
/*
 * CountUserBackends --- count backends that are used by specified user
 */
int
CountUserBackends(Oid roleid)
{
	ProcArrayStruct *arrayP = procArray;
	int			count = 0;
	int			index;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
3236
		volatile PGPROC *proc = arrayP->procs[index];
3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248

		if (proc->pid == 0)
			continue;			/* do not count prepared xacts */
		if (proc->roleId == roleid)
			count++;
	}

	LWLockRelease(ProcArrayLock);

	return count;
}

3249
/*
3250
 * CountOtherDBBackends -- check for other backends running in the given DB
3251 3252 3253 3254 3255 3256 3257 3258 3259
 *
 * If there are other backends in the DB, we will wait a maximum of 5 seconds
 * for them to exit.  Autovacuum backends are encouraged to exit early by
 * sending them SIGTERM, but normal user backends are just waited for.
 *
 * The current backend is always ignored; it is caller's responsibility to
 * check whether the current backend uses the given DB, if it's important.
 *
 * Returns TRUE if there are (still) other backends in the DB, FALSE if not.
3260 3261
 * Also, *nbackends and *nprepared are set to the number of other backends
 * and prepared transactions in the DB, respectively.
3262 3263 3264 3265 3266 3267 3268 3269 3270 3271 3272
 *
 * This function is used to interlock DROP DATABASE and related commands
 * against there being any active backends in the target DB --- dropping the
 * DB while active backends remain would be a Bad Thing.  Note that we cannot
 * detect here the possibility of a newly-started backend that is trying to
 * connect to the doomed database, so additional interlocking is needed during
 * backend startup.  The caller should normally hold an exclusive lock on the
 * target DB before calling this, which is one reason we mustn't wait
 * indefinitely.
 */
bool
3273
CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
3274 3275
{
	ProcArrayStruct *arrayP = procArray;
3276 3277

#define MAXAUTOVACPIDS	10		/* max autovacs to SIGTERM per iteration */
3278
	int			autovac_pids[MAXAUTOVACPIDS];
3279 3280 3281 3282 3283
	int			tries;

	/* 50 tries with 100ms sleep between tries makes 5 sec total wait */
	for (tries = 0; tries < 50; tries++)
	{
3284
		int			nautovacs = 0;
3285 3286 3287 3288 3289
		bool		found = false;
		int			index;

		CHECK_FOR_INTERRUPTS();

3290 3291
		*nbackends = *nprepared = 0;

3292 3293 3294 3295 3296 3297 3298 3299 3300 3301 3302 3303 3304
		LWLockAcquire(ProcArrayLock, LW_SHARED);

		for (index = 0; index < arrayP->numProcs; index++)
		{
			volatile PGPROC *proc = arrayP->procs[index];

			if (proc->databaseId != databaseId)
				continue;
			if (proc == MyProc)
				continue;

			found = true;

3305 3306
			if (proc->pid == 0)
				(*nprepared)++;
3307 3308
			else
			{
3309 3310 3311 3312
				(*nbackends)++;
				if ((proc->vacuumFlags & PROC_IS_AUTOVACUUM) &&
					nautovacs < MAXAUTOVACPIDS)
					autovac_pids[nautovacs++] = proc->pid;
3313 3314 3315
			}
		}

3316 3317
		LWLockRelease(ProcArrayLock);

3318 3319 3320
		if (!found)
			return false;		/* no conflicting backends, so done */

3321
		/*
3322 3323 3324 3325
		 * Send SIGTERM to any conflicting autovacuums before sleeping. We
		 * postpone this step until after the loop because we don't want to
		 * hold ProcArrayLock while issuing kill(). We have no idea what might
		 * block kill() inside the kernel...
3326 3327 3328 3329 3330
		 */
		for (index = 0; index < nautovacs; index++)
			(void) kill(autovac_pids[index], SIGTERM);	/* ignore any error */

		/* sleep, then try again */
3331 3332 3333 3334 3335 3336
		pg_usleep(100 * 1000L); /* 100ms */
	}

	return true;				/* timed out, still conflicts */
}

3337 3338 3339 3340 3341 3342 3343 3344 3345 3346 3347 3348 3349

#define XidCacheRemove(i) \
	do { \
		MyProc->subxids.xids[i] = MyProc->subxids.xids[MyProc->subxids.nxids - 1]; \
		MyProc->subxids.nxids--; \
	} while (0)

/*
 * XidCacheRemoveRunningXids
 *
 * Remove a bunch of TransactionIds from the list of known-running
 * subtransactions for my backend.	Both the specified xid and those in
 * the xids[] array (of length nxids) are removed from the subxids cache.
3350
 * latestXid must be the latest XID among the group.
3351 3352
 */
void
3353 3354 3355
XidCacheRemoveRunningXids(TransactionId xid,
						  int nxids, const TransactionId *xids,
						  TransactionId latestXid)
3356 3357 3358 3359
{
	int			i,
				j;

3360
	Assert(TransactionIdIsValid(xid));
3361 3362 3363

	/*
	 * We must hold ProcArrayLock exclusively in order to remove transactions
3364 3365 3366 3367
	 * from the PGPROC array.  (See src/backend/access/transam/README.)  It's
	 * possible this could be relaxed since we know this routine is only used
	 * to abort subtransactions, but pending closer analysis we'd best be
	 * conservative.
3368 3369 3370 3371
	 */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

	/*
B
Bruce Momjian 已提交
3372 3373 3374
	 * Under normal circumstances xid and xids[] will be in increasing order,
	 * as will be the entries in subxids.  Scan backwards to avoid O(N^2)
	 * behavior when removing a lot of xids.
3375 3376 3377 3378 3379 3380 3381 3382 3383 3384 3385 3386 3387
	 */
	for (i = nxids - 1; i >= 0; i--)
	{
		TransactionId anxid = xids[i];

		for (j = MyProc->subxids.nxids - 1; j >= 0; j--)
		{
			if (TransactionIdEquals(MyProc->subxids.xids[j], anxid))
			{
				XidCacheRemove(j);
				break;
			}
		}
B
Bruce Momjian 已提交
3388

3389
		/*
B
Bruce Momjian 已提交
3390 3391 3392 3393 3394
		 * Ordinarily we should have found it, unless the cache has
		 * overflowed. However it's also possible for this routine to be
		 * invoked multiple times for the same subtransaction, in case of an
		 * error during AbortSubTransaction.  So instead of Assert, emit a
		 * debug warning.
3395 3396 3397 3398 3399 3400 3401 3402 3403 3404 3405 3406 3407 3408 3409 3410 3411
		 */
		if (j < 0 && !MyProc->subxids.overflowed)
			elog(WARNING, "did not find subXID %u in MyProc", anxid);
	}

	for (j = MyProc->subxids.nxids - 1; j >= 0; j--)
	{
		if (TransactionIdEquals(MyProc->subxids.xids[j], xid))
		{
			XidCacheRemove(j);
			break;
		}
	}
	/* Ordinarily we should have found it, unless the cache has overflowed */
	if (j < 0 && !MyProc->subxids.overflowed)
		elog(WARNING, "did not find subXID %u in MyProc", xid);

3412 3413 3414 3415 3416
	/* Also advance global latestCompletedXid while holding the lock */
	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
							  latestXid))
		ShmemVariableCache->latestCompletedXid = latestXid;

3417 3418 3419 3420 3421 3422 3423 3424 3425 3426 3427 3428
	LWLockRelease(ProcArrayLock);
}

#ifdef XIDCACHE_DEBUG

/*
 * Print stats about effectiveness of XID cache
 */
static void
DisplayXidCache(void)
{
	fprintf(stderr,
3429
			"XidCache: xmin: %ld, known: %ld, myxact: %ld, latest: %ld, mainxid: %ld, childxid: %ld, knownassigned: %ld, nooflo: %ld, slow: %ld\n",
3430
			xc_by_recent_xmin,
3431
			xc_by_known_xact,
3432
			xc_by_my_xact,
3433
			xc_by_latest_xid,
3434 3435
			xc_by_main_xid,
			xc_by_child_xid,
3436
			xc_by_known_assigned,
3437
			xc_no_overflow,
3438 3439 3440
			xc_slow_answer);
}
#endif   /* XIDCACHE_DEBUG */
3441 3442 3443 3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 3461 3462 3463 3464 3465 3466 3467 3468 3469 3470 3471 3472 3473 3474 3475 3476 3477 3478 3479 3480 3481 3482 3483 3484 3485 3486 3487 3488 3489 3490 3491 3492 3493 3494 3495 3496 3497 3498 3499 3500 3501 3502 3503 3504 3505 3506 3507 3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518 3519 3520 3521

PGPROC *
FindProcByGpSessionId(long gp_session_id)
{
	/* Find the guy who should manage our locks */
	ProcArrayStruct *arrayP = procArray;
	int			index;

	Assert(gp_session_id > 0);
		
	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (index = 0; index < arrayP->numProcs; index++)
	{
		PGPROC	   *proc = arrayP->procs[index];
			
		if (proc->pid == MyProc->pid)
			continue;
				
		if (!proc->mppIsWriter)
			continue;
				
		if (proc->mppSessionId == gp_session_id)
		{
			LWLockRelease(ProcArrayLock);
			return proc;
		}
	}
		
	LWLockRelease(ProcArrayLock);
	return NULL;
}

/*
 * FindAndSignalProcess
 *     Find the PGPROC entry in procArray which contains the given sessionId and commandId,
 *     and send the corresponding process an interrupt signal.
 *
 * This function returns false if not such an entry found in procArray or the interrupt
 * signal can not be sent to the process.
 */
bool
FindAndSignalProcess(int sessionId, int commandId)
{
	Assert(sessionId > 0 && commandId > 0);
	bool queryCancelled = false;
	int pid = 0;

	LWLockAcquire(ProcArrayLock, LW_SHARED);

	for (int index = 0; index < procArray->numProcs; index++)
	{
		PGPROC *proc = procArray->procs[index];
		
		if (proc->mppSessionId == sessionId &&
			proc->queryCommandId == commandId)
		{
			/* If we have setsid(), signal the backend's whole process group */
#ifdef HAVE_SETSID
			if (kill(-proc->pid, SIGINT) == 0)
#else
			if (kill(proc->pid, SIGINT) == 0)
#endif
			{
				pid = proc->pid;
				queryCancelled = true;
			}
			
			break;
		}
	}

	LWLockRelease(ProcArrayLock);

	if (gp_cancel_query_print_log && queryCancelled)
	{
		elog(NOTICE, "sent an interrupt to process %d", pid);
	}

	return queryCancelled;
}
3522

3523
/* ----------------------------------------------
B
Bruce Momjian 已提交
3524
 *		KnownAssignedTransactions sub-module
3525 3526 3527 3528 3529
 * ----------------------------------------------
 */

/*
 * In Hot Standby mode, we maintain a list of transactions that are (or were)
3530 3531 3532 3533
 * running in the master at the current point in WAL.  These XIDs must be
 * treated as running by standby transactions, even though they are not in
 * the standby server's PGPROC array.
 *
B
Bruce Momjian 已提交
3534
 * We record all XIDs that we know have been assigned.	That includes all the
3535 3536 3537 3538 3539 3540 3541 3542
 * XIDs seen in WAL records, plus all unobserved XIDs that we can deduce have
 * been assigned.  We can deduce the existence of unobserved XIDs because we
 * know XIDs are assigned in sequence, with no gaps.  The KnownAssignedXids
 * list expands as new XIDs are observed or inferred, and contracts when
 * transaction completion records arrive.
 *
 * During hot standby we do not fret too much about the distinction between
 * top-level XIDs and subtransaction XIDs. We store both together in the
B
Bruce Momjian 已提交
3543
 * KnownAssignedXids list.	In backends, this is copied into snapshots in
3544 3545 3546 3547 3548 3549 3550 3551 3552 3553 3554 3555 3556 3557 3558 3559 3560 3561 3562 3563 3564 3565 3566 3567 3568 3569 3570 3571 3572 3573 3574 3575 3576 3577 3578 3579 3580 3581
 * GetSnapshotData(), taking advantage of the fact that XidInMVCCSnapshot()
 * doesn't care about the distinction either.  Subtransaction XIDs are
 * effectively treated as top-level XIDs and in the typical case pg_subtrans
 * links are *not* maintained (which does not affect visibility).
 *
 * We have room in KnownAssignedXids and in snapshots to hold maxProcs *
 * (1 + PGPROC_MAX_CACHED_SUBXIDS) XIDs, so every master transaction must
 * report its subtransaction XIDs in a WAL XLOG_XACT_ASSIGNMENT record at
 * least every PGPROC_MAX_CACHED_SUBXIDS.  When we receive one of these
 * records, we mark the subXIDs as children of the top XID in pg_subtrans,
 * and then remove them from KnownAssignedXids.  This prevents overflow of
 * KnownAssignedXids and snapshots, at the cost that status checks for these
 * subXIDs will take a slower path through TransactionIdIsInProgress().
 * This means that KnownAssignedXids is not necessarily complete for subXIDs,
 * though it should be complete for top-level XIDs; this is the same situation
 * that holds with respect to the PGPROC entries in normal running.
 *
 * When we throw away subXIDs from KnownAssignedXids, we need to keep track of
 * that, similarly to tracking overflow of a PGPROC's subxids array.  We do
 * that by remembering the lastOverflowedXID, ie the last thrown-away subXID.
 * As long as that is within the range of interesting XIDs, we have to assume
 * that subXIDs are missing from snapshots.  (Note that subXID overflow occurs
 * on primary when 65th subXID arrives, whereas on standby it occurs when 64th
 * subXID arrives - that is not an error.)
 *
 * Should a backend on primary somehow disappear before it can write an abort
 * record, then we just leave those XIDs in KnownAssignedXids. They actually
 * aborted but we think they were running; the distinction is irrelevant
 * because either way any changes done by the transaction are not visible to
 * backends in the standby.  We prune KnownAssignedXids when
 * XLOG_XACT_RUNNING_XACTS arrives, to forestall possible overflow of the
 * array due to such dead XIDs.
 */

/*
 * RecordKnownAssignedTransactionIds
 *		Record the given XID in KnownAssignedXids, as well as any preceding
 *		unobserved XIDs.
3582 3583
 *
 * RecordKnownAssignedTransactionIds() should be run for *every* WAL record
3584
 * type apart from XLOG_XACT_RUNNING_XACTS (since that initialises the first
3585 3586 3587
 * snapshot so that RecordKnownAssignedTransactionIds() can be called). Must
 * be called for each record after we have executed StartupCLog() et al,
 * since we must ExtendCLOG() etc..
3588
 *
3589
 * Called during recovery in analogy with and in place of GetNewTransactionId()
3590 3591 3592 3593
 */
void
RecordKnownAssignedTransactionIds(TransactionId xid)
{
3594
	Assert(standbyState >= STANDBY_INITIALIZED);
3595 3596
	Assert(TransactionIdIsValid(latestObservedXid));
	Assert(TransactionIdIsValid(xid));
3597

3598
	elog(trace_recovery(DEBUG4), "record known xact %u latestObservedXid %u",
B
Bruce Momjian 已提交
3599
		 xid, latestObservedXid);
3600 3601

	/*
B
Bruce Momjian 已提交
3602 3603 3604
	 * When a newly observed xid arrives, it is frequently the case that it is
	 * *not* the next xid in sequence. When this occurs, we must treat the
	 * intervening xids as running also.
3605 3606 3607
	 */
	if (TransactionIdFollows(xid, latestObservedXid))
	{
3608
		TransactionId next_expected_xid;
3609 3610

		/*
B
Bruce Momjian 已提交
3611 3612 3613
		 * Extend clog and subtrans like we do in GetNewTransactionId() during
		 * normal operation using individual extend steps. Typical case
		 * requires almost no activity.
3614
		 */
3615 3616
		next_expected_xid = latestObservedXid;
		TransactionIdAdvance(next_expected_xid);
3617 3618 3619 3620 3621 3622 3623 3624
		while (TransactionIdPrecedesOrEquals(next_expected_xid, xid))
		{
			ExtendCLOG(next_expected_xid);
			ExtendSUBTRANS(next_expected_xid);

			TransactionIdAdvance(next_expected_xid);
		}

3625 3626 3627 3628 3629 3630
		/*
		 * Add the new xids onto the KnownAssignedXids array.
		 */
		next_expected_xid = latestObservedXid;
		TransactionIdAdvance(next_expected_xid);
		KnownAssignedXidsAdd(next_expected_xid, xid, false);
3631

3632 3633 3634
		/*
		 * Now we can advance latestObservedXid
		 */
3635 3636
		latestObservedXid = xid;

3637 3638 3639 3640
		/* ShmemVariableCache->nextXid must be beyond any observed xid */
		next_expected_xid = latestObservedXid;
		TransactionIdAdvance(next_expected_xid);
		ShmemVariableCache->nextXid = next_expected_xid;
3641 3642 3643
	}
}

3644 3645 3646
/*
 * ExpireTreeKnownAssignedTransactionIds
 *		Remove the given XIDs from KnownAssignedXids.
3647 3648
 *
 * Called during recovery in analogy with and in place of ProcArrayEndTransaction()
3649
 */
3650 3651
void
ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
B
Bruce Momjian 已提交
3652
							   TransactionId *subxids, TransactionId max_xid)
3653
{
3654
	Assert(standbyState >= STANDBY_INITIALIZED);
3655 3656 3657 3658 3659 3660

	/*
	 * Uses same locking as transaction commit
	 */
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

3661
	KnownAssignedXidsRemoveTree(xid, nsubxids, subxids);
3662

3663 3664 3665
	/* As in ProcArrayEndTransaction, advance latestCompletedXid */
	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
							  max_xid))
3666 3667 3668 3669 3670
		ShmemVariableCache->latestCompletedXid = max_xid;

	LWLockRelease(ProcArrayLock);
}

3671 3672 3673 3674
/*
 * ExpireAllKnownAssignedTransactionIds
 *		Remove all entries in KnownAssignedXids
 */
3675 3676 3677 3678
void
ExpireAllKnownAssignedTransactionIds(void)
{
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3679
	KnownAssignedXidsRemovePreceding(InvalidTransactionId);
3680 3681 3682
	LWLockRelease(ProcArrayLock);
}

3683 3684 3685 3686
/*
 * ExpireOldKnownAssignedTransactionIds
 *		Remove KnownAssignedXids entries preceding the given XID
 */
3687 3688 3689 3690
void
ExpireOldKnownAssignedTransactionIds(TransactionId xid)
{
	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3691
	KnownAssignedXidsRemovePreceding(xid);
3692 3693 3694
	LWLockRelease(ProcArrayLock);
}

3695

3696 3697 3698
/*
 * Private module functions to manipulate KnownAssignedXids
 *
3699 3700 3701 3702 3703 3704 3705 3706 3707 3708 3709 3710 3711 3712 3713 3714 3715 3716 3717 3718 3719 3720 3721 3722 3723 3724 3725 3726 3727 3728 3729 3730 3731 3732 3733 3734 3735 3736 3737 3738 3739 3740 3741 3742 3743 3744 3745
 * There are 5 main uses of the KnownAssignedXids data structure:
 *
 *	* backends taking snapshots - all valid XIDs need to be copied out
 *	* backends seeking to determine presence of a specific XID
 *	* startup process adding new known-assigned XIDs
 *	* startup process removing specific XIDs as transactions end
 *	* startup process pruning array when special WAL records arrive
 *
 * This data structure is known to be a hot spot during Hot Standby, so we
 * go to some lengths to make these operations as efficient and as concurrent
 * as possible.
 *
 * The XIDs are stored in an array in sorted order --- TransactionIdPrecedes
 * order, to be exact --- to allow binary search for specific XIDs.  Note:
 * in general TransactionIdPrecedes would not provide a total order, but
 * we know that the entries present at any instant should not extend across
 * a large enough fraction of XID space to wrap around (the master would
 * shut down for fear of XID wrap long before that happens).  So it's OK to
 * use TransactionIdPrecedes as a binary-search comparator.
 *
 * It's cheap to maintain the sortedness during insertions, since new known
 * XIDs are always reported in XID order; we just append them at the right.
 *
 * To keep individual deletions cheap, we need to allow gaps in the array.
 * This is implemented by marking array elements as valid or invalid using
 * the parallel boolean array KnownAssignedXidsValid[].  A deletion is done
 * by setting KnownAssignedXidsValid[i] to false, *without* clearing the
 * XID entry itself.  This preserves the property that the XID entries are
 * sorted, so we can do binary searches easily.  Periodically we compress
 * out the unused entries; that's much cheaper than having to compress the
 * array immediately on every deletion.
 *
 * The actually valid items in KnownAssignedXids[] and KnownAssignedXidsValid[]
 * are those with indexes tail <= i < head; items outside this subscript range
 * have unspecified contents.  When head reaches the end of the array, we
 * force compression of unused entries rather than wrapping around, since
 * allowing wraparound would greatly complicate the search logic.  We maintain
 * an explicit tail pointer so that pruning of old XIDs can be done without
 * immediately moving the array contents.  In most cases only a small fraction
 * of the array contains valid entries at any instant.
 *
 * Although only the startup process can ever change the KnownAssignedXids
 * data structure, we still need interlocking so that standby backends will
 * not observe invalid intermediate states.  The convention is that backends
 * must hold shared ProcArrayLock to examine the array.  To remove XIDs from
 * the array, the startup process must hold ProcArrayLock exclusively, for
 * the usual transactional reasons (compare commit/abort of a transaction
B
Bruce Momjian 已提交
3746
 * during normal running).	Compressing unused entries out of the array
3747 3748 3749 3750 3751 3752
 * likewise requires exclusive lock.  To add XIDs to the array, we just insert
 * them into slots to the right of the head pointer and then advance the head
 * pointer.  This wouldn't require any lock at all, except that on machines
 * with weak memory ordering we need to be careful that other processors
 * see the array element changes before they see the head pointer change.
 * We handle this by using a spinlock to protect reads and writes of the
B
Bruce Momjian 已提交
3753
 * head/tail pointers.	(We could dispense with the spinlock if we were to
3754 3755 3756 3757 3758 3759 3760 3761 3762 3763 3764 3765 3766 3767 3768 3769 3770 3771 3772 3773 3774 3775 3776 3777
 * create suitable memory access barrier primitives and use those instead.)
 * The spinlock must be taken to read or write the head/tail pointers unless
 * the caller holds ProcArrayLock exclusively.
 *
 * Algorithmic analysis:
 *
 * If we have a maximum of M slots, with N XIDs currently spread across
 * S elements then we have N <= S <= M always.
 *
 *	* Adding a new XID is O(1) and needs little locking (unless compression
 *		must happen)
 *	* Compressing the array is O(S) and requires exclusive lock
 *	* Removing an XID is O(logS) and requires exclusive lock
 *	* Taking a snapshot is O(S) and requires shared lock
 *	* Checking for an XID is O(logS) and requires shared lock
 *
 * In comparison, using a hash table for KnownAssignedXids would mean that
 * taking snapshots would be O(M). If we can maintain S << M then the
 * sorted array technique will deliver significantly faster snapshots.
 * If we try to keep S too small then we will spend too much time compressing,
 * so there is an optimal point for any workload mix. We use a heuristic to
 * decide when to compress the array, though trimming also helps reduce
 * frequency of compressing. The heuristic requires us to track the number of
 * currently valid XIDs in the array.
3778 3779
 */

3780

3781
/*
3782 3783 3784 3785 3786
 * Compress KnownAssignedXids by shifting valid data down to the start of the
 * array, removing any gaps.
 *
 * A compression step is forced if "force" is true, otherwise we do it
 * only if a heuristic indicates it's a good time to do it.
3787
 *
3788
 * Caller must hold ProcArrayLock in exclusive mode.
3789 3790
 */
static void
3791
KnownAssignedXidsCompress(bool force)
3792
{
3793 3794
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
B
Bruce Momjian 已提交
3795 3796 3797 3798
	int			head,
				tail;
	int			compress_index;
	int			i;
3799

3800 3801 3802 3803 3804
	/* no spinlock required since we hold ProcArrayLock exclusively */
	head = pArray->headKnownAssignedXids;
	tail = pArray->tailKnownAssignedXids;

	if (!force)
3805
	{
3806
		/*
B
Bruce Momjian 已提交
3807 3808
		 * If we can choose how much to compress, use a heuristic to avoid
		 * compressing too often or not often enough.
3809
		 *
B
Bruce Momjian 已提交
3810 3811 3812 3813 3814
		 * Heuristic is if we have a large enough current spread and less than
		 * 50% of the elements are currently in use, then compress. This
		 * should ensure we compress fairly infrequently. We could compress
		 * less often though the virtual array would spread out more and
		 * snapshots would become more expensive.
3815
		 */
B
Bruce Momjian 已提交
3816
		int			nelements = head - tail;
3817

3818 3819 3820 3821
		if (nelements < 4 * PROCARRAY_MAXPROCS ||
			nelements < 2 * pArray->numKnownAssignedXids)
			return;
	}
3822

3823
	/*
B
Bruce Momjian 已提交
3824 3825
	 * We compress the array by reading the valid values from tail to head,
	 * re-aligning data to 0th element.
3826 3827 3828 3829 3830
	 */
	compress_index = 0;
	for (i = tail; i < head; i++)
	{
		if (KnownAssignedXidsValid[i])
3831
		{
3832 3833 3834
			KnownAssignedXids[compress_index] = KnownAssignedXids[i];
			KnownAssignedXidsValid[compress_index] = true;
			compress_index++;
3835
		}
3836
	}
3837

3838 3839 3840
	pArray->tailKnownAssignedXids = 0;
	pArray->headKnownAssignedXids = compress_index;
}
3841

3842 3843 3844 3845 3846 3847 3848 3849
/*
 * Add xids into KnownAssignedXids at the head of the array.
 *
 * xids from from_xid to to_xid, inclusive, are added to the array.
 *
 * If exclusive_lock is true then caller already holds ProcArrayLock in
 * exclusive mode, so we need no extra locking here.  Else caller holds no
 * lock, so we need to be sure we maintain sufficient interlocks against
B
Bruce Momjian 已提交
3850
 * concurrent readers.	(Only the startup process ever calls this, so no need
3851 3852 3853 3854 3855 3856 3857 3858
 * to worry about concurrent writers.)
 */
static void
KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
					 bool exclusive_lock)
{
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
B
Bruce Momjian 已提交
3859 3860 3861
	TransactionId next_xid;
	int			head,
				tail;
3862 3863 3864 3865 3866 3867
	int			nxids;
	int			i;

	Assert(TransactionIdPrecedesOrEquals(from_xid, to_xid));

	/*
B
Bruce Momjian 已提交
3868 3869 3870
	 * Calculate how many array slots we'll need.  Normally this is cheap; in
	 * the unusual case where the XIDs cross the wrap point, we do it the hard
	 * way.
3871 3872 3873 3874 3875 3876 3877 3878
	 */
	if (to_xid >= from_xid)
		nxids = to_xid - from_xid + 1;
	else
	{
		nxids = 1;
		next_xid = from_xid;
		while (TransactionIdPrecedes(next_xid, to_xid))
3879
		{
3880 3881 3882 3883 3884 3885
			nxids++;
			TransactionIdAdvance(next_xid);
		}
	}

	/*
B
Bruce Momjian 已提交
3886 3887
	 * Since only the startup process modifies the head/tail pointers, we
	 * don't need a lock to read them here.
3888 3889 3890 3891 3892 3893 3894 3895
	 */
	head = pArray->headKnownAssignedXids;
	tail = pArray->tailKnownAssignedXids;

	Assert(head >= 0 && head <= pArray->maxKnownAssignedXids);
	Assert(tail >= 0 && tail < pArray->maxKnownAssignedXids);

	/*
B
Bruce Momjian 已提交
3896 3897 3898
	 * Verify that insertions occur in TransactionId sequence.	Note that even
	 * if the last existing element is marked invalid, it must still have a
	 * correctly sequenced XID value.
3899 3900 3901 3902 3903 3904 3905 3906 3907 3908 3909 3910 3911 3912 3913 3914 3915 3916 3917 3918 3919 3920 3921
	 */
	if (head > tail &&
		TransactionIdFollowsOrEquals(KnownAssignedXids[head - 1], from_xid))
	{
		KnownAssignedXidsDisplay(LOG);
		elog(ERROR, "out-of-order XID insertion in KnownAssignedXids");
	}

	/*
	 * If our xids won't fit in the remaining space, compress out free space
	 */
	if (head + nxids > pArray->maxKnownAssignedXids)
	{
		/* must hold lock to compress */
		if (!exclusive_lock)
			LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);

		KnownAssignedXidsCompress(true);

		head = pArray->headKnownAssignedXids;
		/* note: we no longer care about the tail pointer */

		if (!exclusive_lock)
3922
			LWLockRelease(ProcArrayLock);
3923 3924 3925 3926 3927 3928 3929 3930 3931 3932 3933 3934 3935 3936 3937 3938 3939 3940 3941 3942 3943 3944 3945 3946 3947 3948 3949

		/*
		 * If it still won't fit then we're out of memory
		 */
		if (head + nxids > pArray->maxKnownAssignedXids)
			elog(ERROR, "too many KnownAssignedXids");
	}

	/* Now we can insert the xids into the space starting at head */
	next_xid = from_xid;
	for (i = 0; i < nxids; i++)
	{
		KnownAssignedXids[head] = next_xid;
		KnownAssignedXidsValid[head] = true;
		TransactionIdAdvance(next_xid);
		head++;
	}

	/* Adjust count of number of valid entries */
	pArray->numKnownAssignedXids += nxids;

	/*
	 * Now update the head pointer.  We use a spinlock to protect this
	 * pointer, not because the update is likely to be non-atomic, but to
	 * ensure that other processors see the above array updates before they
	 * see the head pointer change.
	 *
B
Bruce Momjian 已提交
3950 3951
	 * If we're holding ProcArrayLock exclusively, there's no need to take the
	 * spinlock.
3952 3953 3954 3955 3956 3957 3958 3959 3960 3961 3962 3963 3964 3965 3966 3967 3968 3969 3970 3971 3972 3973 3974 3975 3976
	 */
	if (exclusive_lock)
		pArray->headKnownAssignedXids = head;
	else
	{
		SpinLockAcquire(&pArray->known_assigned_xids_lck);
		pArray->headKnownAssignedXids = head;
		SpinLockRelease(&pArray->known_assigned_xids_lck);
	}
}

/*
 * KnownAssignedXidsSearch
 *
 * Searches KnownAssignedXids for a specific xid and optionally removes it.
 * Returns true if it was found, false if not.
 *
 * Caller must hold ProcArrayLock in shared or exclusive mode.
 * Exclusive lock must be held for remove = true.
 */
static bool
KnownAssignedXidsSearch(TransactionId xid, bool remove)
{
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
B
Bruce Momjian 已提交
3977 3978 3979 3980 3981
	int			first,
				last;
	int			head;
	int			tail;
	int			result_index = -1;
3982 3983 3984 3985 3986 3987 3988 3989 3990 3991 3992 3993 3994 3995 3996 3997 3998

	if (remove)
	{
		/* we hold ProcArrayLock exclusively, so no need for spinlock */
		tail = pArray->tailKnownAssignedXids;
		head = pArray->headKnownAssignedXids;
	}
	else
	{
		/* take spinlock to ensure we see up-to-date array contents */
		SpinLockAcquire(&pArray->known_assigned_xids_lck);
		tail = pArray->tailKnownAssignedXids;
		head = pArray->headKnownAssignedXids;
		SpinLockRelease(&pArray->known_assigned_xids_lck);
	}

	/*
B
Bruce Momjian 已提交
3999
	 * Standard binary search.	Note we can ignore the KnownAssignedXidsValid
4000 4001 4002 4003 4004 4005
	 * array here, since even invalid entries will contain sorted XIDs.
	 */
	first = tail;
	last = head - 1;
	while (first <= last)
	{
B
Bruce Momjian 已提交
4006 4007
		int			mid_index;
		TransactionId mid_xid;
4008 4009 4010 4011 4012 4013 4014 4015

		mid_index = (first + last) / 2;
		mid_xid = KnownAssignedXids[mid_index];

		if (xid == mid_xid)
		{
			result_index = mid_index;
			break;
4016
		}
4017 4018 4019 4020 4021 4022 4023 4024 4025 4026 4027 4028 4029 4030 4031
		else if (TransactionIdPrecedes(xid, mid_xid))
			last = mid_index - 1;
		else
			first = mid_index + 1;
	}

	if (result_index < 0)
		return false;			/* not in array */

	if (!KnownAssignedXidsValid[result_index])
		return false;			/* in array, but invalid */

	if (remove)
	{
		KnownAssignedXidsValid[result_index] = false;
4032

4033 4034 4035 4036 4037 4038 4039 4040
		pArray->numKnownAssignedXids--;
		Assert(pArray->numKnownAssignedXids >= 0);

		/*
		 * If we're removing the tail element then advance tail pointer over
		 * any invalid elements.  This will speed future searches.
		 */
		if (result_index == tail)
4041
		{
4042 4043 4044 4045 4046 4047 4048 4049 4050 4051 4052 4053 4054
			tail++;
			while (tail < head && !KnownAssignedXidsValid[tail])
				tail++;
			if (tail >= head)
			{
				/* Array is empty, so we can reset both pointers */
				pArray->headKnownAssignedXids = 0;
				pArray->tailKnownAssignedXids = 0;
			}
			else
			{
				pArray->tailKnownAssignedXids = tail;
			}
4055 4056
		}
	}
4057 4058

	return true;
4059 4060 4061
}

/*
4062
 * Is the specified XID present in KnownAssignedXids[]?
4063
 *
4064
 * Caller must hold ProcArrayLock in shared or exclusive mode.
4065 4066
 */
static bool
4067
KnownAssignedXidExists(TransactionId xid)
4068
{
4069
	Assert(TransactionIdIsValid(xid));
B
Bruce Momjian 已提交
4070

4071
	return KnownAssignedXidsSearch(xid, false);
4072 4073 4074
}

/*
4075
 * Remove the specified XID from KnownAssignedXids[].
4076
 *
4077
 * Caller must hold ProcArrayLock in exclusive mode.
4078 4079 4080 4081 4082 4083 4084 4085 4086
 */
static void
KnownAssignedXidsRemove(TransactionId xid)
{
	Assert(TransactionIdIsValid(xid));

	elog(trace_recovery(DEBUG4), "remove KnownAssignedXid %u", xid);

	/*
4087 4088
	 * Note: we cannot consider it an error to remove an XID that's not
	 * present.  We intentionally remove subxact IDs while processing
B
Bruce Momjian 已提交
4089 4090
	 * XLOG_XACT_ASSIGNMENT, to avoid array overflow.  Then those XIDs will be
	 * removed again when the top-level xact commits or aborts.
4091
	 *
B
Bruce Momjian 已提交
4092 4093 4094
	 * It might be possible to track such XIDs to distinguish this case from
	 * actual errors, but it would be complicated and probably not worth it.
	 * So, just ignore the search result.
4095
	 */
4096
	(void) KnownAssignedXidsSearch(xid, true);
4097 4098 4099
}

/*
4100 4101
 * KnownAssignedXidsRemoveTree
 *		Remove xid (if it's not InvalidTransactionId) and all the subxids.
4102
 *
4103
 * Caller must hold ProcArrayLock in exclusive mode.
4104
 */
4105 4106 4107
static void
KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
							TransactionId *subxids)
4108
{
B
Bruce Momjian 已提交
4109
	int			i;
4110

4111 4112 4113 4114 4115 4116 4117 4118
	if (TransactionIdIsValid(xid))
		KnownAssignedXidsRemove(xid);

	for (i = 0; i < nsubxids; i++)
		KnownAssignedXidsRemove(subxids[i]);

	/* Opportunistically compress the array */
	KnownAssignedXidsCompress(false);
4119 4120 4121
}

/*
4122 4123
 * Prune KnownAssignedXids up to, but *not* including xid. If xid is invalid
 * then clear the whole table.
4124
 *
4125
 * Caller must hold ProcArrayLock in exclusive mode.
4126
 */
4127 4128
static void
KnownAssignedXidsRemovePreceding(TransactionId removeXid)
4129
{
4130 4131
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
B
Bruce Momjian 已提交
4132 4133 4134 4135
	int			count = 0;
	int			head,
				tail,
				i;
4136

4137
	if (!TransactionIdIsValid(removeXid))
4138
	{
4139 4140 4141 4142 4143
		elog(trace_recovery(DEBUG4), "removing all KnownAssignedXids");
		pArray->numKnownAssignedXids = 0;
		pArray->headKnownAssignedXids = pArray->tailKnownAssignedXids = 0;
		return;
	}
4144

4145
	elog(trace_recovery(DEBUG4), "prune KnownAssignedXids to %u", removeXid);
4146

4147
	/*
B
Bruce Momjian 已提交
4148 4149
	 * Mark entries invalid starting at the tail.  Since array is sorted, we
	 * can stop as soon as we reach a entry >= removeXid.
4150 4151 4152 4153 4154 4155 4156 4157
	 */
	tail = pArray->tailKnownAssignedXids;
	head = pArray->headKnownAssignedXids;

	for (i = tail; i < head; i++)
	{
		if (KnownAssignedXidsValid[i])
		{
B
Bruce Momjian 已提交
4158
			TransactionId knownXid = KnownAssignedXids[i];
4159 4160 4161 4162 4163 4164 4165 4166 4167 4168

			if (TransactionIdFollowsOrEquals(knownXid, removeXid))
				break;

			if (!StandbyTransactionIdIsPrepared(knownXid))
			{
				KnownAssignedXidsValid[i] = false;
				count++;
			}
		}
4169 4170
	}

4171 4172 4173 4174 4175 4176 4177 4178 4179 4180 4181 4182 4183 4184 4185 4186 4187 4188 4189 4190 4191 4192 4193 4194
	pArray->numKnownAssignedXids -= count;
	Assert(pArray->numKnownAssignedXids >= 0);

	/*
	 * Advance the tail pointer if we've marked the tail item invalid.
	 */
	for (i = tail; i < head; i++)
	{
		if (KnownAssignedXidsValid[i])
			break;
	}
	if (i >= head)
	{
		/* Array is empty, so we can reset both pointers */
		pArray->headKnownAssignedXids = 0;
		pArray->tailKnownAssignedXids = 0;
	}
	else
	{
		pArray->tailKnownAssignedXids = i;
	}

	/* Opportunistically compress the array */
	KnownAssignedXidsCompress(false);
4195 4196 4197
}

/*
4198 4199 4200 4201 4202
 * KnownAssignedXidsGet - Get an array of xids by scanning KnownAssignedXids.
 * We filter out anything >= xmax.
 *
 * Returns the number of XIDs stored into xarray[].  Caller is responsible
 * that array is large enough.
4203
 *
4204
 * Caller must hold ProcArrayLock in (at least) shared mode.
4205
 */
4206 4207
static int
KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax)
4208
{
4209
	TransactionId xtmp = InvalidTransactionId;
4210

4211 4212
	return KnownAssignedXidsGetAndSetXmin(xarray, &xtmp, xmax);
}
4213

4214 4215 4216 4217 4218 4219 4220 4221 4222 4223 4224 4225 4226
/*
 * KnownAssignedXidsGetAndSetXmin - as KnownAssignedXidsGet, plus
 * we reduce *xmin to the lowest xid value seen if not already lower.
 *
 * Caller must hold ProcArrayLock in (at least) shared mode.
 */
static int
KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
							   TransactionId xmax)
{
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
	int			count = 0;
B
Bruce Momjian 已提交
4227 4228
	int			head,
				tail;
4229 4230 4231
	int			i;

	/*
B
Bruce Momjian 已提交
4232 4233 4234 4235 4236
	 * Fetch head just once, since it may change while we loop. We can stop
	 * once we reach the initially seen head, since we are certain that an xid
	 * cannot enter and then leave the array while we hold ProcArrayLock.  We
	 * might miss newly-added xids, but they should be >= xmax so irrelevant
	 * anyway.
4237 4238 4239 4240
	 *
	 * Must take spinlock to ensure we see up-to-date array contents.
	 */
	SpinLockAcquire(&pArray->known_assigned_xids_lck);
4241 4242
	tail = pArray->tailKnownAssignedXids;
	head = pArray->headKnownAssignedXids;
4243 4244 4245 4246 4247 4248
	SpinLockRelease(&pArray->known_assigned_xids_lck);

	for (i = tail; i < head; i++)
	{
		/* Skip any gaps in the array */
		if (KnownAssignedXidsValid[i])
4249
		{
4250 4251 4252 4253 4254 4255 4256 4257 4258 4259 4260 4261 4262 4263
			TransactionId knownXid = KnownAssignedXids[i];

			/*
			 * Update xmin if required.  Only the first XID need be checked,
			 * since the array is sorted.
			 */
			if (count == 0 &&
				TransactionIdPrecedes(knownXid, *xmin))
				*xmin = knownXid;

			/*
			 * Filter out anything >= xmax, again relying on sorted property
			 * of array.
			 */
4264 4265
			if (TransactionIdIsValid(xmax) &&
				TransactionIdFollowsOrEquals(knownXid, xmax))
4266 4267 4268 4269
				break;

			/* Add knownXid into output array */
			xarray[count++] = knownXid;
4270 4271
		}
	}
4272 4273

	return count;
4274 4275 4276 4277 4278
}

/*
 * Display KnownAssignedXids to provide debug trail
 *
4279 4280 4281 4282 4283 4284
 * Currently this is only called within startup process, so we need no
 * special locking.
 *
 * Note this is pretty expensive, and much of the expense will be incurred
 * even if the elog message will get discarded.  It's not currently called
 * in any performance-critical places, however, so no need to be tenser.
4285
 */
T
Tom Lane 已提交
4286
static void
4287 4288
KnownAssignedXidsDisplay(int trace_level)
{
4289 4290
	/* use volatile pointer to prevent code rearrangement */
	volatile ProcArrayStruct *pArray = procArray;
B
Bruce Momjian 已提交
4291 4292 4293 4294 4295
	StringInfoData buf;
	int			head,
				tail,
				i;
	int			nxids = 0;
4296

4297 4298
	tail = pArray->tailKnownAssignedXids;
	head = pArray->headKnownAssignedXids;
4299 4300 4301

	initStringInfo(&buf);

4302 4303 4304 4305 4306 4307 4308 4309
	for (i = tail; i < head; i++)
	{
		if (KnownAssignedXidsValid[i])
		{
			nxids++;
			appendStringInfo(&buf, "[%u]=%u ", i, KnownAssignedXids[i]);
		}
	}
4310

4311 4312 4313 4314 4315 4316
	elog(trace_level, "%d KnownAssignedXids (num=%u tail=%u head=%u) %s",
		 nxids,
		 pArray->numKnownAssignedXids,
		 pArray->tailKnownAssignedXids,
		 pArray->headKnownAssignedXids,
		 buf.data);
4317 4318 4319

	pfree(buf.data);
}