varsup.c 22.9 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * varsup.c
V
Vadim B. Mikheev 已提交
4
 *	  postgres OID & XID variables support routines
5
 *
6
 * Copyright (c) 2000-2010, PostgreSQL Global Development Group
7 8
 *
 * IDENTIFICATION
B
Bruce Momjian 已提交
9
 *	  $PostgreSQL: pgsql/src/backend/access/transam/varsup.c,v 1.91 2010/02/26 02:00:34 momjian Exp $
10 11 12
 *
 *-------------------------------------------------------------------------
 */
13

V
Vadim B. Mikheev 已提交
14
#include "postgres.h"
M
-Wall'd  
Marc G. Fournier 已提交
15

16
#include "access/clog.h"
17
#include "access/subtrans.h"
V
Vadim B. Mikheev 已提交
18
#include "access/transam.h"
19
#include "access/xact.h"
20
#include "commands/dbcommands.h"
21
#include "miscadmin.h"
22 23
#include "postmaster/autovacuum.h"
#include "storage/pmsignal.h"
24
#include "storage/proc.h"
25
#include "utils/builtins.h"
26
#include "utils/guc.h"
27
#include "utils/syscache.h"
28

29
#include "access/distributedlog.h"
30
#include "cdb/cdbvars.h"
31 32


33
/* Number of OIDs to prefetch (preallocate) per XLOG write */
T
Tom Lane 已提交
34 35
#define VAR_OID_PREFETCH		8192

V
Vadim B. Mikheev 已提交
36
/* pointer to "variable cache" in shared memory (set up by shmem.c) */
37
VariableCache ShmemVariableCache = NULL;
38

39 40
int xid_stop_limit;
int xid_warn_limit;
41 42

/*
43
 * Allocate the next XID for a new transaction or subtransaction.
44 45
 *
 * The new XID is also stored into MyProc before returning.
46 47 48
 *
 * Note: when this is called, we are actually already inside a valid
 * transaction, since XIDs are now not allocated until the transaction
B
Bruce Momjian 已提交
49
 * does something.	So it is safe to do a database lookup if we want to
50
 * issue a warning about XID wrap.
51
 */
52
TransactionId
53
GetNewTransactionId(bool isSubXact)
54
{
55
	TransactionId xid;
56

V
Vadim B. Mikheev 已提交
57
	/*
B
Bruce Momjian 已提交
58 59
	 * During bootstrap initialization, we return the special bootstrap
	 * transaction id.
60
	 */
61
	if (IsBootstrapProcessingMode())
62 63
	{
		Assert(!isSubXact);
64
		return BootstrapTransactionId;
65
	}
66

67 68 69 70
	/* safety check, we should never get this far in a HS slave */
	if (RecoveryInProgress())
		elog(ERROR, "cannot assign TransactionIds during recovery");

71
	LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
T
Tom Lane 已提交
72

73
	xid = ShmemVariableCache->nextXid;
74

75
	/*----------
B
Bruce Momjian 已提交
76 77
	 * Check to see if it's safe to assign another XID.  This protects against
	 * catastrophic data loss due to XID wraparound.  The basic rules are:
78 79 80 81 82 83 84
	 *
	 * If we're past xidVacLimit, start trying to force autovacuum cycles.
	 * If we're past xidWarnLimit, start issuing warnings.
	 * If we're past xidStopLimit, refuse to execute transactions, unless
	 * we are running in a standalone backend (which gives an escape hatch
	 * to the DBA who somehow got past the earlier defenses).
	 *----------
85
	 */
86
	if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidVacLimit))
87
	{
88
		/*
89 90 91
		 * For safety's sake, we release XidGenLock while sending signals,
		 * warnings, etc.  This is not so much because we care about
		 * preserving concurrency in this situation, as to avoid any
B
Bruce Momjian 已提交
92 93
		 * possibility of deadlock while doing get_database_name(). First,
		 * copy all the shared values we'll need in this path.
94 95 96 97
		 */
		TransactionId xidWarnLimit = ShmemVariableCache->xidWarnLimit;
		TransactionId xidStopLimit = ShmemVariableCache->xidStopLimit;
		TransactionId xidWrapLimit = ShmemVariableCache->xidWrapLimit;
B
Bruce Momjian 已提交
98
		Oid			oldest_datoid = ShmemVariableCache->oldestXidDB;
99 100 101

		LWLockRelease(XidGenLock);

102
		/*
B
Bruce Momjian 已提交
103 104 105
		 * To avoid swamping the postmaster with signals, we issue the autovac
		 * request only once per 64K transaction starts.  This still gives
		 * plenty of chances before we get into real trouble.
106
		 */
107
		if (IsUnderPostmaster && (xid % 65536) == 0)
108 109
			SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER);

110
		if (IsUnderPostmaster &&
111 112
			TransactionIdFollowsOrEquals(xid, xidStopLimit))
		{
B
Bruce Momjian 已提交
113
			char	   *oldest_datname = get_database_name(oldest_datoid);
114

115 116 117 118 119
			 /*
			  * In GPDB, don't say anything about old prepared transactions, because the system
			  * only uses prepared transactions internally. PREPARE TRANSACTION is not available
			  * to users.
			  */
120

121 122 123 124 125 126
			/* complain even if that DB has disappeared */
			if (oldest_datname)
				ereport(ERROR,
						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
						 errmsg("database is not accepting commands to avoid wraparound data loss in database \"%s\"",
								oldest_datname),
127 128
						 errhint("Shutdown Greenplum Database. Lower the xid_stop_limit GUC. Execute a database-wide VACUUM in that database. Reset the xid_stop_limit GUC."
								 )));
129 130 131 132 133
			else
				ereport(ERROR,
						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
						 errmsg("database is not accepting commands to avoid wraparound data loss in database with OID %u",
								oldest_datoid),
134 135
						 errhint("Shutdown Greenplum Database. Lower the xid_stop_limit GUC. Execute a database-wide VACUUM in that database. Reset the xid_stop_limit GUC."
								 )));
136 137 138
		}
		else if (TransactionIdFollowsOrEquals(xid, xidWarnLimit))
		{
B
Bruce Momjian 已提交
139
			char	   *oldest_datname = get_database_name(oldest_datoid);
140

141 142 143 144 145 146
			 /*
			  * In GPDB, don't say anything about old prepared transactions, because the system
			  * only uses prepared transactions internally. PREPARE TRANSACTION is not available
			  * to users.
			  */

147 148 149 150 151 152
			/* complain even if that DB has disappeared */
			if (oldest_datname)
				ereport(WARNING,
						(errmsg("database \"%s\" must be vacuumed within %u transactions",
								oldest_datname,
								xidWrapLimit - xid),
153 154
						 errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database."
								 )));
155 156 157 158 159
			else
				ereport(WARNING,
						(errmsg("database with OID %u must be vacuumed within %u transactions",
								oldest_datoid,
								xidWrapLimit - xid),
160 161
						 errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database."
								 )));
162 163 164 165 166
		}

		/* Re-acquire lock and start over */
		LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
		xid = ShmemVariableCache->nextXid;
167 168
	}

169
	/*
B
Bruce Momjian 已提交
170
	 * If we are allocating the first XID of a new page of the commit log,
B
Bruce Momjian 已提交
171 172 173 174
	 * zero out that commit-log page before returning. We must do this while
	 * holding XidGenLock, else another xact could acquire and commit a later
	 * XID before we zero the page.  Fortunately, a page of the commit log
	 * holds 32K or more transactions, so we don't have to do this very often.
175 176
	 *
	 * Extend pg_subtrans too.
177 178
	 */
	ExtendCLOG(xid);
179
	ExtendSUBTRANS(xid);
180
	DistributedLog_Extend(xid);
181

182
	/*
B
Bruce Momjian 已提交
183 184 185 186
	 * Now advance the nextXid counter.  This must not happen until after we
	 * have successfully completed ExtendCLOG() --- if that routine fails, we
	 * want the next incoming transaction to try it again.	We cannot assign
	 * more XIDs until there is CLOG space for them.
187 188 189
	 */
	TransactionIdAdvance(ShmemVariableCache->nextXid);

190 191 192
	/*
	 * To aid testing, you can set the debug_burn_xids GUC, to consume XIDs
	 * faster. If set, we bump the XID counter to the next value divisible by
193
	 * 4096, minus one. The idea is to skip over "boring" XID ranges, but
194 195 196 197 198 199 200 201
	 * still step through XID wraparound, CLOG page boundaries etc. one XID
	 * at a time.
	 */
	if (Debug_burn_xids)
	{
		TransactionId xx;
		uint32		r;

202 203 204 205 206 207
		/*
		 * Based on the minimum of ENTRIES_PER_PAGE (DistributedLog),
		 * SUBTRANS_XACTS_PER_PAGE, CLOG_XACTS_PER_PAGE.
		 */
		const uint32      page_extend_limit = 4 * 1024;

208 209
		xx = ShmemVariableCache->nextXid;

210 211
		r = xx % page_extend_limit;
		if (r > 1 && r < (page_extend_limit - 1))
212
		{
213
			xx += page_extend_limit - r - 1;
214 215 216 217
			ShmemVariableCache->nextXid = xx;
		}
	}

218
	/*
219
	 * We must store the new XID into the shared ProcArray before releasing
B
Bruce Momjian 已提交
220 221 222
	 * XidGenLock.	This ensures that every active XID older than
	 * latestCompletedXid is present in the ProcArray, which is essential for
	 * correct OldestXmin tracking; see src/backend/access/transam/README.
223
	 *
224
	 * XXX by storing xid into MyProc without acquiring ProcArrayLock, we are
225
	 * relying on fetch/store of an xid to be atomic, else other backends
226
	 * might see a partially-set xid here.	But holding both locks at once
227 228 229 230 231
	 * would be a nasty concurrency hit.  So for now, assume atomicity.
	 *
	 * Note that readers of PGPROC xid fields should be careful to fetch the
	 * value only once, rather than assume they can read a value multiple
	 * times and get the same answer each time.
232
	 *
233 234
	 * The same comments apply to the subxact xid count and overflow fields.
	 *
B
Bruce Momjian 已提交
235 236 237
	 * A solution to the atomic-store problem would be to give each PGPROC its
	 * own spinlock used only for fetching/storing that PGPROC's xid and
	 * related fields.
238
	 *
239 240
	 * If there's no room to fit a subtransaction XID into PGPROC, set the
	 * cache-overflowed flag instead.  This forces readers to look in
B
Bruce Momjian 已提交
241 242 243 244
	 * pg_subtrans to map subtransaction XIDs up to top-level XIDs. There is a
	 * race-condition window, in that the new XID will not appear as running
	 * until its parent link has been placed into pg_subtrans. However, that
	 * will happen before anyone could possibly have a reason to inquire about
245
	 * the status of the XID, so it seems OK.  (Snapshots taken during this
B
Bruce Momjian 已提交
246 247
	 * window *will* include the parent XID, so they will deliver the correct
	 * answer later on when someone does have a reason to inquire.)
248
	 */
249
	{
250 251
		/*
		 * Use volatile pointer to prevent code rearrangement; other backends
B
Bruce Momjian 已提交
252 253 254 255
		 * could be examining my subxids info concurrently, and we don't want
		 * them to see an invalid intermediate state, such as incrementing
		 * nxids before filling the array entry.  Note we are assuming that
		 * TransactionId and int fetch/store are atomic.
256 257 258
		 */
		volatile PGPROC *myproc = MyProc;

259
		if (!isSubXact)
260
			myproc->xid = xid;
261 262
		else
		{
B
Bruce Momjian 已提交
263
			int			nxids = myproc->subxids.nxids;
264 265

			if (nxids < PGPROC_MAX_CACHED_SUBXIDS)
266
			{
267 268
				myproc->subxids.xids[nxids] = xid;
				myproc->subxids.nxids = nxids + 1;
269 270
			}
			else
271
				myproc->subxids.overflowed = true;
272 273
		}
	}
274

275
	LWLockRelease(XidGenLock);
276 277

	return xid;
278 279
}

280
/*
T
Tom Lane 已提交
281
 * Read nextXid but don't allocate it.
282
 */
283 284
TransactionId
ReadNewTransactionId(void)
285
{
286
	TransactionId xid;
287

288
	LWLockAcquire(XidGenLock, LW_SHARED);
289
	xid = ShmemVariableCache->nextXid;
290
	LWLockRelease(XidGenLock);
291 292

	return xid;
293 294
}

295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
/*
 * Get the last safe XID, i.e. the oldest XID that might exist in any
 * database of our cluster.
 */
TransactionId
GetTransactionIdLimit(void)
{
	TransactionId xid;

	LWLockAcquire(XidGenLock, LW_SHARED);
	xid = ShmemVariableCache->oldestXid;
	LWLockRelease(XidGenLock);

	if (!TransactionIdIsNormal(xid))
	{
		/*
		 * shouldn't happen, but since this value is used in the computation
		 * of oldest xmin, which determines which tuples be safely vacuumed
		 * away, let's be paranoid.
		 */
		elog(ERROR, "invalid oldestXid limit: %u", xid);
	}

	return xid;
}

321 322
/*
 * Determine the last safe XID to allocate given the currently oldest
323
 * datfrozenxid (ie, the oldest XID that might exist in any database
324
 * of our cluster), and the OID of the (or a) database with that value.
325 326
 */
void
327
SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
328
{
329
	TransactionId xidVacLimit;
330 331 332 333 334
	TransactionId xidWarnLimit;
	TransactionId xidStopLimit;
	TransactionId xidWrapLimit;
	TransactionId curXid;

335
	Assert(TransactionIdIsNormal(oldest_datfrozenxid));
336 337 338

	/*
	 * The place where we actually get into deep trouble is halfway around
339 340 341 342
	 * from the oldest potentially-existing XID.  (This calculation is
	 * probably off by one or two counts, because the special XIDs reduce the
	 * size of the loop a little bit.  But we throw in plenty of slop below,
	 * so it doesn't matter.)
343
	 */
344
	xidWrapLimit = oldest_datfrozenxid + (MaxTransactionId >> 1);
345 346 347 348
	if (xidWrapLimit < FirstNormalTransactionId)
		xidWrapLimit += FirstNormalTransactionId;

	/*
B
Bruce Momjian 已提交
349
	 * We'll refuse to continue assigning XIDs in interactive mode once we get
350 351 352
	 * within xid_stop_limit transactions of data loss.  This leaves lots of
	 * room for the DBA to fool around fixing things in a standalone backend,
	 * while not being significant compared to total XID space. (Note that since
B
Bruce Momjian 已提交
353 354
	 * vacuuming requires one transaction per table cleaned, we had better be
	 * sure there's lots of XIDs left...)
355
	 */
356
	xidStopLimit = xidWrapLimit - (TransactionId)xid_stop_limit;
357 358 359 360
	if (xidStopLimit < FirstNormalTransactionId)
		xidStopLimit -= FirstNormalTransactionId;

	/*
361 362 363 364 365 366 367 368
	 * We'll start complaining loudly when we get within xid_warn_limit
	 * transactions of the stop point.	This is kind of arbitrary, but if
	 * you let your gas gauge get down to 1% of full, would you be looking for
	 * the next gas station?  We need to be fairly liberal about this number
	 * because there are lots of scenarios where most transactions are done by
	 * automatic clients that won't pay attention to warnings. (No, we're not
	 * gonna make this configurable.  If you know enough to configure it, you
	 * know enough to not get in this kind of trouble in the first place.)
369
	 */
370
	xidWarnLimit = xidStopLimit  - (TransactionId)xid_warn_limit;
371 372 373
	if (xidWarnLimit < FirstNormalTransactionId)
		xidWarnLimit -= FirstNormalTransactionId;

374
	/*
375 376
	 * We'll start trying to force autovacuums when oldest_datfrozenxid gets
	 * to be more than autovacuum_freeze_max_age transactions old.
377
	 *
378 379
	 * Note: guc.c ensures that autovacuum_freeze_max_age is in a sane range,
	 * so that xidVacLimit will be well before xidWarnLimit.
380 381 382 383 384
	 *
	 * Note: autovacuum_freeze_max_age is a PGC_POSTMASTER parameter so that
	 * we don't have to worry about dealing with on-the-fly changes in its
	 * value.  It doesn't look practical to update shared state from a GUC
	 * assign hook (too many processes would try to execute the hook,
385 386
	 * resulting in race conditions as well as crashes of those not connected
	 * to shared memory).  Perhaps this can be improved someday.
387
	 */
388 389 390
	xidVacLimit = oldest_datfrozenxid + autovacuum_freeze_max_age;
	if (xidVacLimit < FirstNormalTransactionId)
		xidVacLimit += FirstNormalTransactionId;
391

392 393
	/* Grab lock for just long enough to set the new limit values */
	LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
394 395
	ShmemVariableCache->oldestXid = oldest_datfrozenxid;
	ShmemVariableCache->xidVacLimit = xidVacLimit;
396 397 398
	ShmemVariableCache->xidWarnLimit = xidWarnLimit;
	ShmemVariableCache->xidStopLimit = xidStopLimit;
	ShmemVariableCache->xidWrapLimit = xidWrapLimit;
399
	ShmemVariableCache->oldestXidDB = oldest_datoid;
400 401 402 403
	curXid = ShmemVariableCache->nextXid;
	LWLockRelease(XidGenLock);

	/* Log the info */
404
	ereport(DEBUG1,
B
Bruce Momjian 已提交
405 406
			(errmsg("transaction ID wrap limit is %u, limited by database with OID %u",
					xidWrapLimit, oldest_datoid)));
407 408 409 410 411 412 413 414

	/*
	 * If past the autovacuum force point, immediately signal an autovac
	 * request.  The reason for this is that autovac only processes one
	 * database per invocation.  Once it's finished cleaning up the oldest
	 * database, it'll call here, and we'll signal the postmaster to start
	 * another iteration immediately if there are still any old databases.
	 */
415
	if (TransactionIdFollowsOrEquals(curXid, xidVacLimit) &&
416
		IsUnderPostmaster && !InRecovery)
417
		SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER);
418

419
	/* Give an immediate warning if past the wrap warn point */
420
	if (TransactionIdFollowsOrEquals(curXid, xidWarnLimit) && !InRecovery)
421
	{
422
		char	   *oldest_datname;
423 424

		/*
425 426 427 428 429 430 431
		 * We can be called when not inside a transaction, for example
		 * during StartupXLOG().  In such a case we cannot do database
		 * access, so we must just report the oldest DB's OID.
		 *
		 * Note: it's also possible that get_database_name fails and returns
		 * NULL, for example because the database just got dropped.  We'll
		 * still warn, even though the warning might now be unnecessary.
432
		 */
433 434 435 436 437
		if (IsTransactionState())
			oldest_datname = get_database_name(oldest_datoid);
		else
			oldest_datname = NULL;

438 439
		if (oldest_datname)
			ereport(WARNING,
B
Bruce Momjian 已提交
440 441 442 443 444
			(errmsg("database \"%s\" must be vacuumed within %u transactions",
					oldest_datname,
					xidWrapLimit - curXid),
			 errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
					 "You might also need to commit or roll back old prepared transactions.")));
445 446 447 448 449 450 451 452 453 454 455 456
		else
			ereport(WARNING,
					(errmsg("database with OID %u must be vacuumed within %u transactions",
							oldest_datoid,
							xidWrapLimit - curXid),
					 errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
							 "You might also need to commit or roll back old prepared transactions.")));
	}
}


/*
457
 * ForceTransactionIdLimitUpdate -- does the XID wrap-limit data need updating?
458 459 460 461
 *
 * We primarily check whether oldestXidDB is valid.  The cases we have in
 * mind are that that database was dropped, or the field was reset to zero
 * by pg_resetxlog.  In either case we should force recalculation of the
B
Bruce Momjian 已提交
462
 * wrap limit.	Also do it if oldestXid is old enough to be forcing
463 464
 * autovacuums or other actions; this ensures we update our state as soon
 * as possible once extra overhead is being incurred.
465 466
 */
bool
467
ForceTransactionIdLimitUpdate(void)
468
{
469 470
	TransactionId nextXid;
	TransactionId xidVacLimit;
471 472 473 474 475
	TransactionId oldestXid;
	Oid			oldestXidDB;

	/* Locking is probably not really necessary, but let's be careful */
	LWLockAcquire(XidGenLock, LW_SHARED);
476 477
	nextXid = ShmemVariableCache->nextXid;
	xidVacLimit = ShmemVariableCache->xidVacLimit;
478 479 480 481 482
	oldestXid = ShmemVariableCache->oldestXid;
	oldestXidDB = ShmemVariableCache->oldestXidDB;
	LWLockRelease(XidGenLock);

	if (!TransactionIdIsNormal(oldestXid))
483
		return true;			/* shouldn't happen, but just in case */
484 485 486
	if (!TransactionIdIsValid(xidVacLimit))
		return true;			/* this shouldn't happen anymore either */
	if (TransactionIdFollowsOrEquals(nextXid, xidVacLimit))
487
		return true;			/* past VacLimit, don't delay updating */
488
	if (!SearchSysCacheExists1(DATABASEOID, ObjectIdGetDatum(oldestXidDB)))
489 490
		return true;			/* could happen, per comments above */
	return false;
491 492
}

493
/*
494
 * Requires OidGenLock to be held by caller.
495
 */
496 497
static Oid
GetNewObjectIdUnderLock(void)
498
{
499
	Oid result;
500

501 502 503 504
	/* safety check, we should never get this far in a HS slave */
	if (RecoveryInProgress())
		elog(ERROR, "cannot assign OIDs during recovery");

505
	Assert(LWLockHeldByMe(OidGenLock));
506

507 508 509
	/*
	 * Check for wraparound of the OID counter.  We *must* not return 0
	 * (InvalidOid); and as long as we have to check that, it seems a good
510
	 * idea to skip over everything below FirstNormalObjectId too. (This
511 512 513
	 * basically just avoids lots of collisions with bootstrap-assigned OIDs
	 * right after a wrap occurs, so as to avoid a possibly large number of
	 * iterations in GetNewOid.)  Note we are relying on unsigned comparison.
514
	 *
515 516
	 * During initdb, we start the OID generator at FirstBootstrapObjectId, so
	 * we only enforce wrapping to that point when in bootstrap or standalone
B
Bruce Momjian 已提交
517 518 519
	 * mode.  The first time through this routine after normal postmaster
	 * start, the counter will be forced up to FirstNormalObjectId. This
	 * mechanism leaves the OIDs between FirstBootstrapObjectId and
520 521
	 * FirstNormalObjectId available for automatic assignment during initdb,
	 * while ensuring they will never conflict with user-assigned OIDs.
522
	 */
523
	if (ShmemVariableCache->nextOid < ((Oid) FirstNormalObjectId))
524
	{
525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540
		if (IsPostmasterEnvironment)
		{
			/* wraparound in normal environment */
			ShmemVariableCache->nextOid = FirstNormalObjectId;
			ShmemVariableCache->oidCount = 0;
		}
		else
		{
			/* we may be bootstrapping, so don't enforce the full range */
			if (ShmemVariableCache->nextOid < ((Oid) FirstBootstrapObjectId))
			{
				/* wraparound in standalone environment? */
				ShmemVariableCache->nextOid = FirstBootstrapObjectId;
				ShmemVariableCache->oidCount = 0;
			}
		}
541 542
	}

T
Tom Lane 已提交
543
	/* If we run out of logged for use oids then we must log more */
V
Vadim B. Mikheev 已提交
544
	if (ShmemVariableCache->oidCount == 0)
545
	{
V
Vadim B. Mikheev 已提交
546 547
		XLogPutNextOid(ShmemVariableCache->nextOid + VAR_OID_PREFETCH);
		ShmemVariableCache->oidCount = VAR_OID_PREFETCH;
548 549
	}

550
	result = ShmemVariableCache->nextOid;
551

V
Vadim B. Mikheev 已提交
552 553 554
	(ShmemVariableCache->nextOid)++;
	(ShmemVariableCache->oidCount)--;

555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578
	return result;
}

/*
 * GetNewObjectId -- allocate a new OID
 *
 * OIDs are generated by a cluster-wide counter. Since they are only
 * 32 bits wide, counter wraparound will occur eventually, and
 * therefore it is unwise to assume they are unique unless precautions
 * are taken to make them so. Hence, this routine should generally not
 * be used directly. The only direct callers should be GetNewOid() and
 * GetNewOidWithIndex() in catalog/catalog.c. It's also called from
 * cdb_sync_oid_to_segments() in cdb/cdboidsync.c to synchronize the
 * OID counter on the QD with its QEs.
 */
Oid
GetNewObjectId(void)
{
	Oid			result;

	LWLockAcquire(OidGenLock, LW_EXCLUSIVE);

	result = GetNewObjectIdUnderLock();

579
	LWLockRelease(OidGenLock);
580 581

	return result;
582
}
H
Heikki Linnakangas 已提交
583 584

/*
585
 * AdvanceObjectId -- advance object id counter for QE nodes
H
Heikki Linnakangas 已提交
586
 *
587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636
 * The QD provides the preassigned OID to the QE nodes which will be
 * used as the relation's OID. QE nodes do not use this OID as the
 * relfilenode value anymore so the OID counter is not
 * incremented. This function forcefully increments the QE node's OID
 * counter to be about the same as the OID provided by the QD node.
 */
void
AdvanceObjectId(Oid newOid)
{
	LWLockAcquire(OidGenLock, LW_EXCLUSIVE);

	while(GetNewObjectIdUnderLock() <= newOid);

	LWLockRelease(OidGenLock);
}

/*
 * Requires RelfilenodeGenLock to be held by caller.
 */
static Oid
GetNewSegRelfilenodeUnderLock(void)
{
	Oid result;

	Assert(LWLockHeldByMe(RelfilenodeGenLock));

	if (ShmemVariableCache->nextRelfilenode < ((Oid) FirstNormalObjectId) &&
		IsPostmasterEnvironment)
	{
		/* wraparound in normal environment */
		ShmemVariableCache->nextRelfilenode = FirstNormalObjectId;
		ShmemVariableCache->relfilenodeCount = 0;
	}

	if (ShmemVariableCache->relfilenodeCount == 0)
	{
		XLogPutNextRelfilenode(ShmemVariableCache->nextRelfilenode + VAR_OID_PREFETCH);
		ShmemVariableCache->relfilenodeCount = VAR_OID_PREFETCH;
	}

	result = ShmemVariableCache->nextRelfilenode;

	(ShmemVariableCache->nextRelfilenode)++;
	(ShmemVariableCache->relfilenodeCount)--;

	return result;
}

/*
 * GetNewSegRelfilenode -- allocate a new relfilenode value
H
Heikki Linnakangas 已提交
637
 *
638 639 640 641
 * Similar to GetNewObjectId but for relfilenodes. This function has its own
 * separate counter and is used to allocate relfilenode values instead of
 * trying to use the newly generated OIDs (QD) or preassigned OIDs (QE) as the
 * relfilenode.
H
Heikki Linnakangas 已提交
642
 */
643 644
Oid
GetNewSegRelfilenode(void)
H
Heikki Linnakangas 已提交
645
{
646
	Oid result;
H
Heikki Linnakangas 已提交
647

648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674
	LWLockAcquire(RelfilenodeGenLock, LW_EXCLUSIVE);

	result = GetNewSegRelfilenodeUnderLock();

	LWLockRelease(RelfilenodeGenLock);

	return result;
}

/*
 * Only used for GP_ROLE_DISPATCH and GP_ROLE_UTILITY to make sure
 * sequence relation has OID same as relfilenode. This is required due
 * to sequence server doing direct opens to filesystem assuming relfilenode is
 * the same as OID.
 *
 * Note: Delete this function and its calls if sequence relations
 * change to where we no longer have to assume relfilenode is the same
 * as OID.
 */
Oid
GetNewSequenceRelationObjectId(void)
{
	Oid currentOidCount;
	Oid currentRelfilenodeCount;
	Oid result = InvalidOid;

	Assert(Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_UTILITY);
H
Heikki Linnakangas 已提交
675 676

	LWLockAcquire(OidGenLock, LW_EXCLUSIVE);
677 678 679 680
	LWLockAcquire(RelfilenodeGenLock, LW_EXCLUSIVE);

	currentOidCount = ShmemVariableCache->nextOid;
	currentRelfilenodeCount = ShmemVariableCache->nextRelfilenode;
H
Heikki Linnakangas 已提交
681

682
	if (currentOidCount >= currentRelfilenodeCount)
H
Heikki Linnakangas 已提交
683
	{
684 685 686 687 688 689 690
		result = GetNewObjectIdUnderLock();
		while(GetNewSegRelfilenodeUnderLock() <= result);
	}
	else if (currentOidCount < currentRelfilenodeCount)
	{
		result = GetNewSegRelfilenodeUnderLock();
		while(GetNewObjectIdUnderLock() <= result);
H
Heikki Linnakangas 已提交
691 692 693
	}

	LWLockRelease(OidGenLock);
694
	LWLockRelease(RelfilenodeGenLock);
H
Heikki Linnakangas 已提交
695

696
	return result;
H
Heikki Linnakangas 已提交
697
}