sequence.c 54.3 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * sequence.c
4
 *	  PostgreSQL sequences support code.
5
 *
6
 * Portions Copyright (c) 2005-2008, Greenplum inc.
7
 * Portions Copyright (c) 2012-Present Pivotal Software, Inc.
B
Bruce Momjian 已提交
8
 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
9 10 11 12
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
13
 *	  $PostgreSQL: pgsql/src/backend/commands/sequence.c,v 1.160 2009/06/11 14:48:56 momjian Exp $
14
 *
15 16
 *-------------------------------------------------------------------------
 */
17
#include "postgres.h"
18

19
#include "access/heapam.h"
20
#include "access/bufmask.h"
21 22
#include "access/transam.h"
#include "access/xact.h"
23
#include "access/xlogutils.h"
24
#include "catalog/dependency.h"
25
#include "catalog/heap.h"
26
#include "catalog/namespace.h"
27
#include "catalog/pg_type.h"
28
#include "commands/defrem.h"
29
#include "commands/sequence.h"
30
#include "commands/tablecmds.h"
B
Bruce Momjian 已提交
31
#include "miscadmin.h"
32
#include "storage/smgr.h"               /* RelationCloseSmgr -> smgrclose */
33
#include "nodes/makefuncs.h"
34 35
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
36
#include "storage/proc.h"
37
#include "utils/acl.h"
B
Bruce Momjian 已提交
38
#include "utils/builtins.h"
39
#include "utils/formatting.h"
40
#include "utils/lsyscache.h"
41
#include "utils/resowner.h"
42
#include "utils/syscache.h"
43

44
#include "cdb/cdbdisp_query.h"
H
Heikki Linnakangas 已提交
45
#include "cdb/cdbdoublylinked.h"
46 47 48 49 50 51 52 53 54
#include "cdb/cdbsrlz.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbmotion.h"
#include "cdb/ml_ipc.h"

#include "cdb/cdbpersistentfilesysobj.h"

#include "postmaster/seqserver.h"

55

V
Vadim B. Mikheev 已提交
56
/*
57
 * We don't want to log each fetching of a value from a sequence,
V
Vadim B. Mikheev 已提交
58
 * so we pre-log a few fetches in advance. In the event of
59
 * crash we can lose (skip over) as many values as we pre-logged.
V
Vadim B. Mikheev 已提交
60
 */
B
Bruce Momjian 已提交
61
#define SEQ_LOG_VALS	32
62

63 64 65 66 67
/*
 * The "special area" of a sequence's buffer page looks like this.
 */
#define SEQ_MAGIC	  0x1717

68 69
typedef struct sequence_magic
{
70
	uint32		magic;
71
} sequence_magic;
72

73 74 75 76 77 78
/*
 * We store a SeqTable item for every sequence we have touched in the current
 * session.  This is needed to hold onto nextval/currval state.  (We can't
 * rely on the relcache, since it's only, well, a cache, and may decide to
 * discard entries.)
 *
B
Bruce Momjian 已提交
79
 * XXX We use linear search to find pre-existing SeqTable entries.	This is
80 81 82
 * good when only a small number of sequences are touched in a session, but
 * would suck with many different sequences.  Perhaps use a hashtable someday.
 */
83 84
typedef struct SeqTableData
{
85 86
	struct SeqTableData *next;	/* link to next SeqTable object */
	Oid			relid;			/* pg_class OID of this sequence */
87
	LocalTransactionId lxid;	/* xact in which we last did a seq op */
88
	bool		last_valid;		/* do we have a valid "last" value? */
89 90 91 92
	int64		last;			/* value last returned by nextval */
	int64		cached;			/* last value already cached for nextval */
	/* if last != cached, we have not used up all the cached values */
	int64		increment;		/* copy of sequence's increment field */
93
	/* note that increment is zero until we first do read_seq_tuple() */
94
} SeqTableData;
95 96 97

typedef SeqTableData *SeqTable;

98
static SeqTable seqtab = NULL;	/* Head of list of SeqTable items */
99

100 101 102 103 104
/*
 * last_used_seq is updated by nextval() to point to the last used
 * sequence.
 */
static SeqTableData *last_used_seq = NULL;
105

106
static int64 nextval_internal(Oid relid);
107
static Relation open_share_lock(SeqTable seq);
108
static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel);
109 110
static Form_pg_sequence read_seq_tuple(SeqTable elm, Relation rel,
			   Buffer *buf, HeapTuple seqtuple);
111
static void init_params(List *options, bool isInit,
B
Bruce Momjian 已提交
112
			Form_pg_sequence new, List **owned_by);
113
static void do_setval(Oid relid, int64 next, bool iscalled);
114 115
static void process_owned_by(Relation seqrel, List *owned_by);

116
static void
117 118
cdb_sequence_nextval(SeqTable elm,
					 Relation   seqrel,
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
                     int64     *plast,
                     int64     *pcached,
                     int64     *pincrement,
                     bool      *seq_overflow);
static void
cdb_sequence_nextval_proxy(Relation seqrel,
                           int64   *plast,
                           int64   *pcached,
                           int64   *pincrement,
                           bool    *poverflow);

typedef struct SequencePersistentInfoCacheEntryKey
{
	RelFileNode				relFileNode;
} SequencePersistentInfoCacheEntryKey;

typedef struct SequencePersistentInfoCacheEntryData
{
	SequencePersistentInfoCacheEntryKey	key;

	ItemPointerData		persistentTid;

	int64				persistentSerialNum;

	DoubleLinks			lruLinks;

} SequencePersistentInfoCacheEntryData;
typedef SequencePersistentInfoCacheEntryData *SequencePersistentInfoCacheEntry;

static HTAB *sequencePersistentInfoCacheTable = NULL;

static DoublyLinkedHead	sequencePersistentInfoCacheLruListHead;

static int sequencePersistentInfoCacheLruCount = 0;

static int sequencePersistentInfoCacheLruLimit = 100;

static void
Sequence_PersistentInfoCacheTableInit(void)
{
	HASHCTL			info;
	int				hash_flags;

	/* Set key and entry sizes. */
	MemSet(&info, 0, sizeof(info));
	info.keysize = sizeof(SequencePersistentInfoCacheEntryKey);
	info.entrysize = sizeof(SequencePersistentInfoCacheEntryData);
	info.hash = tag_hash;

	hash_flags = (HASH_ELEM | HASH_FUNCTION);

	sequencePersistentInfoCacheTable = hash_create("Sequence Persistent Info", 10, &info, hash_flags);

	DoublyLinkedHead_Init(
				&sequencePersistentInfoCacheLruListHead);
}

static bool Sequence_CheckPersistentInfoCache(
	RelFileNode 		*relFileNode,

	ItemPointer			persistentTid,

	int64				*persistentSerialNum)
{
	SequencePersistentInfoCacheEntryKey	key;

	SequencePersistentInfoCacheEntry persistentInfoCacheEntry;

	bool found;

	if (sequencePersistentInfoCacheTable == NULL)
		Sequence_PersistentInfoCacheTableInit();

	MemSet(&key, 0, sizeof(SequencePersistentInfoCacheEntryKey));
	key.relFileNode = *relFileNode;

	persistentInfoCacheEntry = 
		(SequencePersistentInfoCacheEntry) 
						hash_search(sequencePersistentInfoCacheTable,
									(void *) &key,
									HASH_FIND,
									&found);
	if (!found)
		return false;
	
	*persistentTid = persistentInfoCacheEntry->persistentTid;
	*persistentSerialNum = persistentInfoCacheEntry->persistentSerialNum;

	/*
	 * LRU.
	 */
	DoubleLinks_Remove(
		offsetof(SequencePersistentInfoCacheEntryData, lruLinks),
		&sequencePersistentInfoCacheLruListHead,
		persistentInfoCacheEntry);

	DoublyLinkedHead_AddFirst(
		offsetof(SequencePersistentInfoCacheEntryData, lruLinks),
		&sequencePersistentInfoCacheLruListHead,
		persistentInfoCacheEntry);

	return true;	
}

static void Sequence_AddPersistentInfoCache(
	RelFileNode 		*relFileNode,

	ItemPointer			persistentTid,

	int64				persistentSerialNum)
{
	SequencePersistentInfoCacheEntryKey	key;

	SequencePersistentInfoCacheEntry persistentInfoCacheEntry;

	bool found;

	if (sequencePersistentInfoCacheTable == NULL)
		Sequence_PersistentInfoCacheTableInit();

	MemSet(&key, 0, sizeof(SequencePersistentInfoCacheEntryKey));
	key.relFileNode = *relFileNode;

	persistentInfoCacheEntry = 
		(SequencePersistentInfoCacheEntry) 
						hash_search(
								sequencePersistentInfoCacheTable,
								(void *) &key,
								HASH_ENTER,
								&found);
	Assert (!found);
	
	persistentInfoCacheEntry->persistentTid = *persistentTid;
	persistentInfoCacheEntry->persistentSerialNum = persistentSerialNum;

	DoubleLinks_Init(&persistentInfoCacheEntry->lruLinks);

	/*
	 * LRU.
	 */
	DoublyLinkedHead_AddFirst(
		offsetof(SequencePersistentInfoCacheEntryData, lruLinks),
		&sequencePersistentInfoCacheLruListHead,
		persistentInfoCacheEntry);

	sequencePersistentInfoCacheLruCount++;

	if (sequencePersistentInfoCacheLruCount > sequencePersistentInfoCacheLruLimit)
	{
		SequencePersistentInfoCacheEntry lastPersistentInfoCacheEntry;

		lastPersistentInfoCacheEntry = 
			(SequencePersistentInfoCacheEntry) 
							DoublyLinkedHead_Last(
								offsetof(SequencePersistentInfoCacheEntryData, lruLinks),
								&sequencePersistentInfoCacheLruListHead);
		Assert(lastPersistentInfoCacheEntry != NULL);
		
		DoubleLinks_Remove(
			offsetof(SequencePersistentInfoCacheEntryData, lruLinks),
			&sequencePersistentInfoCacheLruListHead,
			lastPersistentInfoCacheEntry);
		
		if (Debug_persistent_print)
			elog(Persistent_DebugPrintLevel(), 
				 "Removed cached persistent information for sequence %u/%u/%u -- serial number " INT64_FORMAT ", TID %s",
				 lastPersistentInfoCacheEntry->key.relFileNode.spcNode,
				 lastPersistentInfoCacheEntry->key.relFileNode.dbNode,
				 lastPersistentInfoCacheEntry->key.relFileNode.relNode,
				 lastPersistentInfoCacheEntry->persistentSerialNum,
				 ItemPointerToString(&lastPersistentInfoCacheEntry->persistentTid));

		hash_search(
				sequencePersistentInfoCacheTable, 
				(void *) &lastPersistentInfoCacheEntry->key, 
				HASH_REMOVE, 
				NULL);
		
		sequencePersistentInfoCacheLruCount--;
	}
}


static void
Sequence_FetchGpRelationNodeForXLog(Relation rel)
{
	if (rel->rd_segfile0_relationnodeinfo.isPresent)
		return;

	/*
	 * For better performance, we cache the persistent information
	 * for sequences with upper bound and use LRU...
	 */
	if (Sequence_CheckPersistentInfoCache(
								&rel->rd_node,
								&rel->rd_segfile0_relationnodeinfo.persistentTid,
								&rel->rd_segfile0_relationnodeinfo.persistentSerialNum))
	{
		if (Debug_persistent_print)
			elog(Persistent_DebugPrintLevel(), 
				 "Found cached persistent information for sequence %u/%u/%u -- serial number " INT64_FORMAT ", TID %s",
				 rel->rd_node.spcNode,
				 rel->rd_node.dbNode,
				 rel->rd_node.relNode,
				 rel->rd_segfile0_relationnodeinfo.persistentSerialNum,
				 ItemPointerToString(&rel->rd_segfile0_relationnodeinfo.persistentTid));
	} 
	else 
	{
		if (!PersistentFileSysObj_ScanForRelation(
												&rel->rd_node,
												/* segmentFileNum */ 0,
												&rel->rd_segfile0_relationnodeinfo.persistentTid,
												&rel->rd_segfile0_relationnodeinfo.persistentSerialNum))
		{
			elog(ERROR, "Cound not find persistent information for sequence %u/%u/%u",
			     rel->rd_node.spcNode,
			     rel->rd_node.dbNode,
			     rel->rd_node.relNode);
		}

		Sequence_AddPersistentInfoCache(
								&rel->rd_node,
								&rel->rd_segfile0_relationnodeinfo.persistentTid,
								rel->rd_segfile0_relationnodeinfo.persistentSerialNum);

		if (Debug_persistent_print)
			elog(Persistent_DebugPrintLevel(), 
				 "Add cached persistent information for sequence %u/%u/%u -- serial number " INT64_FORMAT ", TID %s",
				 rel->rd_node.spcNode,
				 rel->rd_node.dbNode,
				 rel->rd_node.relNode,
				 rel->rd_segfile0_relationnodeinfo.persistentSerialNum,
				 ItemPointerToString(&rel->rd_segfile0_relationnodeinfo.persistentTid));
	}

355
	if (!Persistent_BeforePersistenceWork() &&
356 357 358 359 360 361 362 363 364 365 366 367 368
		PersistentStore_IsZeroTid(&rel->rd_segfile0_relationnodeinfo.persistentTid))
	{	
		elog(ERROR, 
			 "Sequence_FetchGpRelationNodeForXLog has invalid TID (0,0) for relation %u/%u/%u '%s', serial number " INT64_FORMAT,
			 rel->rd_node.spcNode,
			 rel->rd_node.dbNode,
			 rel->rd_node.relNode,
			 NameStr(rel->rd_rel->relname),
			 rel->rd_segfile0_relationnodeinfo.persistentSerialNum);
	}

	rel->rd_segfile0_relationnodeinfo.isPresent = true;
}
369 370

/*
B
Bruce Momjian 已提交
371
 * DefineSequence
372
 *				Creates a new sequence relation
373 374
 */
void
375
DefineSequence(CreateSeqStmt *seq)
376
{
377 378
	MIRROREDLOCK_BUFMGR_DECLARE;

379
	FormData_pg_sequence new;
380
	List	   *owned_by;
381
	CreateStmt *stmt = makeNode(CreateStmt);
382
	Oid			seqoid;
383 384
	Relation	rel;
	Buffer		buf;
385
	Page		page;
386
	sequence_magic *sm;
387 388 389
	HeapTuple	tuple;
	TupleDesc	tupDesc;
	Datum		value[SEQ_COL_LASTCOL];
390
	bool		null[SEQ_COL_LASTCOL];
391
	int			i;
392
	NameData	name;
393
	OffsetNumber offnum;
394

395 396
	bool shouldDispatch =  Gp_role == GP_ROLE_DISPATCH && !IsBootstrapProcessingMode();

397
	/* Check and set all option values */
398
	init_params(seq->options, true, &new, &owned_by);
399 400

	/*
401
	 * Create relation (and fill value[] and null[] for the tuple)
402 403 404
	 */
	stmt->tableElts = NIL;
	for (i = SEQ_COL_FIRSTCOL; i <= SEQ_COL_LASTCOL; i++)
405
	{
406
		ColumnDef  *coldef = makeNode(ColumnDef);
407

408 409
		coldef->inhcount = 0;
		coldef->is_local = true;
410
		coldef->is_not_null = true;
411 412
		coldef->raw_default = NULL;
		coldef->cooked_default = NULL;
413 414
		coldef->constraints = NIL;

415
		null[i - 1] = false;
416 417 418

		switch (i)
		{
419
			case SEQ_COL_NAME:
420
				coldef->typeName = makeTypeNameFromOid(NAMEOID, -1);
421
				coldef->colname = "sequence_name";
422
				namestrcpy(&name, seq->sequence->relname);
423
				value[i - 1] = NameGetDatum(&name);
424 425
				break;
			case SEQ_COL_LASTVAL:
426
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
427
				coldef->colname = "last_value";
428
				value[i - 1] = Int64GetDatumFast(new.last_value);
429
				break;
430
			case SEQ_COL_STARTVAL:
431
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
432 433 434
				coldef->colname = "start_value";
				value[i - 1] = Int64GetDatumFast(new.start_value);
				break;
435
			case SEQ_COL_INCBY:
436
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
437
				coldef->colname = "increment_by";
438
				value[i - 1] = Int64GetDatumFast(new.increment_by);
439 440
				break;
			case SEQ_COL_MAXVALUE:
441
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
442
				coldef->colname = "max_value";
443
				value[i - 1] = Int64GetDatumFast(new.max_value);
444 445
				break;
			case SEQ_COL_MINVALUE:
446
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
447
				coldef->colname = "min_value";
448
				value[i - 1] = Int64GetDatumFast(new.min_value);
449 450
				break;
			case SEQ_COL_CACHE:
451
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
452
				coldef->colname = "cache_value";
453
				value[i - 1] = Int64GetDatumFast(new.cache_value);
454
				break;
V
Vadim B. Mikheev 已提交
455
			case SEQ_COL_LOG:
456
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
V
Vadim B. Mikheev 已提交
457
				coldef->colname = "log_cnt";
458
				value[i - 1] = Int64GetDatum((int64) 0);
V
Vadim B. Mikheev 已提交
459
				break;
460
			case SEQ_COL_CYCLE:
461
				coldef->typeName = makeTypeNameFromOid(BOOLOID, -1);
462
				coldef->colname = "is_cycled";
463
				value[i - 1] = BoolGetDatum(new.is_cycled);
464 465
				break;
			case SEQ_COL_CALLED:
466
				coldef->typeName = makeTypeNameFromOid(BOOLOID, -1);
467
				coldef->colname = "is_called";
468
				value[i - 1] = BoolGetDatum(false);
469
				break;
470 471 472 473
		}
		stmt->tableElts = lappend(stmt->tableElts, coldef);
	}

474 475
	stmt->relation = seq->sequence;
	stmt->inhRelations = NIL;
476
	stmt->constraints = NIL;
477 478
	stmt->inhOids = NIL;
	stmt->parentOidCount = 0;
B
Bruce Momjian 已提交
479
	stmt->options = list_make1(defWithOids(false));
480
	stmt->oncommit = ONCOMMIT_NOOP;
481
	stmt->tablespacename = NULL;
482 483
	stmt->relKind = RELKIND_SEQUENCE;
	stmt->ownerid = GetUserId();
484

485
	seqoid = DefineRelation(stmt, RELKIND_SEQUENCE, RELSTORAGE_HEAP, false);
486

487 488 489 490 491 492 493 494
	/*
	 * Open and lock the new sequence.  (This lock is redundant; an
	 * AccessExclusiveLock was acquired above by DefineRelation and
	 * won't be released until end of transaction.)
	 *
	 * CDB: Acquire lock on qDisp before dispatching to qExecs, so
	 * qDisp can detect and resolve any deadlocks.
	 */
495
	rel = heap_open(seqoid, AccessExclusiveLock);
496
	tupDesc = RelationGetDescr(rel);
497

498 499 500 501 502 503
	/* Now form sequence tuple */
	tuple = heap_form_tuple(tupDesc, value, null);

	/* Fetch gp_persistent_relation_node information that will be added to XLOG record. */
	Assert(rel != NULL);
	Sequence_FetchGpRelationNodeForXLog(rel);
504

505 506
	// -------- MirroredLock ----------
	MIRROREDLOCK_BUFMGR_LOCK;
507 508

	/* Initialize first page of relation with special magic number */
509
	buf = ReadBuffer(rel, P_NEW);
510 511
	Assert(BufferGetBlockNumber(buf) == 0);

512
	page = BufferGetPage(buf);
513

514
	PageInit(page, BufferGetPageSize(buf), sizeof(sequence_magic));
515 516 517
	sm = (sequence_magic *) PageGetSpecialPointer(page);
	sm->magic = SEQ_MAGIC;

518 519
	/* Now insert sequence tuple */
	LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
520

521
	/*
522
	 * Since VACUUM does not process sequences, we have to force the tuple
523
	 * to have xmin = FrozenTransactionId now.	Otherwise it would become
B
Bruce Momjian 已提交
524
	 * invisible to SELECTs after 2G transactions.	It is okay to do this
525 526 527
	 * because if the current transaction aborts, no other xact will ever
	 * examine the sequence tuple anyway.
	 *
528
	 */
529

530 531 532 533 534 535
	HeapTupleHeaderSetXmin(tuple->t_data, FrozenTransactionId);
	HeapTupleHeaderSetXminFrozen(tuple->t_data);
	HeapTupleHeaderSetCmin(tuple->t_data, FirstCommandId);
	HeapTupleHeaderSetXmax(tuple->t_data, InvalidTransactionId);
	tuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
	ItemPointerSet(&tuple->t_data->t_ctid, 0, FirstOffsetNumber);
536

537
	START_CRIT_SECTION();
538

539 540
	MarkBufferDirty(buf);

541 542 543 544 545
	offnum = PageAddItem(page, (Item) tuple->t_data, tuple->t_len,
						 InvalidOffsetNumber, false, false);
	if (offnum != FirstOffsetNumber)
		elog(ERROR, "failed to add sequence tuple to page");

546 547
	/* XLOG stuff */
	if (!rel->rd_istemp)
548
	{
549 550 551
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
		XLogRecData rdata[2];
552 553

		xlrec.node = rel->rd_node;
554
		RelationGetPTInfo(rel, &xlrec.persistentTid, &xlrec.persistentSerialNum);
555

556 557
		rdata[0].data = (char *) &xlrec;
		rdata[0].len = sizeof(xl_seq_rec);
558
		rdata[0].buffer = InvalidBuffer;
559 560
		rdata[0].next = &(rdata[1]);

561
		rdata[1].data = (char *) tuple->t_data;
562
		rdata[1].len = tuple->t_len;
563
		rdata[1].buffer = InvalidBuffer;
564 565
		rdata[1].next = NULL;

566
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);
567 568 569

		PageSetLSN(page, recptr);
	}
570

571
	END_CRIT_SECTION();
572

573 574
	UnlockReleaseBuffer(buf);

575 576 577
	MIRROREDLOCK_BUFMGR_UNLOCK;
	// -------- MirroredLock ----------

578 579 580 581
	/* process OWNED BY if given */
	if (owned_by)
		process_owned_by(rel, owned_by);

582
	heap_close(rel, NoLock);
583 584 585 586 587

	
	/* Dispatch to segments */
	if (shouldDispatch)
	{
588 589 590 591
		CdbDispatchUtilityStatement((Node *) seq,
									DF_CANCEL_ON_ERROR|
									DF_WITH_SNAPSHOT|
									DF_NEED_TWO_PHASE,
592
									GetAssignedOidsForDispatch(),
593
									NULL);
594
	}
595 596
}

B
Bruce Momjian 已提交
597 598 599
/*
 * AlterSequence
 *
600
 * Modify the definition of a sequence relation
B
Bruce Momjian 已提交
601 602
 */
void
603
AlterSequence(AlterSeqStmt *stmt)
B
Bruce Momjian 已提交
604
{
605
	Oid			relid;
606 607 608 609 610 611 612 613 614 615 616 617

	/* find sequence */
	relid = RangeVarGetRelid(stmt->sequence, false);

	/* allow ALTER to sequence owner only */
	/* if you change this, see also callers of AlterSequenceInternal! */
	if (!pg_class_ownercheck(relid, GetUserId()))
		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
					   stmt->sequence->relname);

	/* do the work */
	AlterSequenceInternal(relid, stmt->options);
618 619 620 621 622 623 624 625

	if (Gp_role == GP_ROLE_DISPATCH)
		CdbDispatchUtilityStatement((Node *) stmt,
									DF_CANCEL_ON_ERROR|
									DF_WITH_SNAPSHOT|
									DF_NEED_TWO_PHASE,
									NIL,
									NULL);
626 627 628 629 630 631 632 633 634 635
}

/*
 * AlterSequenceInternal
 *
 * Same as AlterSequence except that the sequence is specified by OID
 * and we assume the caller already checked permissions.
 */
void
AlterSequenceInternal(Oid relid, List *options)
B
Bruce Momjian 已提交
636
{
637 638
	MIRROREDLOCK_BUFMGR_DECLARE;

B
Bruce Momjian 已提交
639 640 641 642 643
	SeqTable	elm;
	Relation	seqrel;
	Buffer		buf;
	Form_pg_sequence seq;
	FormData_pg_sequence new;
644
	List	   *owned_by;
645
	HeapTupleData seqtuple;
646
	int64		save_increment;
647
	bool		bSeqIsTemp = false;
648 649 650
	int			numopts	   = 0;
	char	   *alter_subtype = "";		/* metadata tracking: kind of
										   redundant to say "role" */
B
Bruce Momjian 已提交
651 652

	/* open and AccessShareLock sequence */
653
	init_sequence(relid, &elm, &seqrel);
B
Bruce Momjian 已提交
654 655

	/* lock page' buffer and read tuple into new sequence structure */
656 657 658
	
	// -------- MirroredLock ----------
	MIRROREDLOCK_BUFMGR_LOCK;
659 660 661

	/* hack to keep ALTER SEQUENCE OWNED BY from changing currval state */
	save_increment = elm->increment;
662
	
663
	seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
664
	elm->increment = seq->increment_by;
B
Bruce Momjian 已提交
665

666 667
	/* Copy old values of options into workspace */
	memcpy(&new, seq, sizeof(FormData_pg_sequence));
B
Bruce Momjian 已提交
668

669
	/* Check and set new values */
670
	init_params(options, false, &new, &owned_by);
B
Bruce Momjian 已提交
671

672 673 674 675 676 677 678 679
	if (owned_by)
	{
		/* Restore previous state of elm (assume nothing else changes) */
		elm->increment = save_increment;
	}
	else
	{
		/* Clear local cache so that we don't think we have cached numbers */
680 681
		/* Note that we do not change the currval() state */
		elm->cached = elm->last;
682 683 684 685 686
	}

	// Fetch gp_persistent_relation_node information that will be added to XLOG record.
	Assert(seqrel != NULL);
	Sequence_FetchGpRelationNodeForXLog(seqrel);
687

688
	/* Now okay to update the on-disk tuple */
B
Bruce Momjian 已提交
689 690
	START_CRIT_SECTION();

691 692
	memcpy(seq, &new, sizeof(FormData_pg_sequence));

693 694
	MarkBufferDirty(buf);

B
Bruce Momjian 已提交
695
	/* XLOG stuff */
696 697 698 699

	bSeqIsTemp = seqrel->rd_istemp;

	if (!bSeqIsTemp)
B
Bruce Momjian 已提交
700 701 702 703
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
		XLogRecData rdata[2];
704
		Page		page = BufferGetPage(buf);
B
Bruce Momjian 已提交
705 706

		xlrec.node = seqrel->rd_node;
707
		RelationGetPTInfo(seqrel, &xlrec.persistentTid, &xlrec.persistentSerialNum);
708

B
Bruce Momjian 已提交
709 710
		rdata[0].data = (char *) &xlrec;
		rdata[0].len = sizeof(xl_seq_rec);
711
		rdata[0].buffer = InvalidBuffer;
B
Bruce Momjian 已提交
712 713
		rdata[0].next = &(rdata[1]);

714 715
		rdata[1].data = (char *) seqtuple.t_data;
		rdata[1].len = seqtuple.t_len;
716
		rdata[1].buffer = InvalidBuffer;
B
Bruce Momjian 已提交
717 718
		rdata[1].next = NULL;

719
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);
B
Bruce Momjian 已提交
720 721 722 723 724 725

		PageSetLSN(page, recptr);
	}

	END_CRIT_SECTION();

726
	UnlockReleaseBuffer(buf);
B
Bruce Momjian 已提交
727

728 729 730
	MIRROREDLOCK_BUFMGR_UNLOCK;
	// -------- MirroredLock ----------

731 732 733 734
	/* process OWNED BY if given */
	if (owned_by)
		process_owned_by(seqrel, owned_by);

B
Bruce Momjian 已提交
735
	relation_close(seqrel, NoLock);
736

737
	numopts = list_length(options);
738 739 740 741 742 743 744 745 746 747 748 749 750 751 752

	if (numopts > 1)
	{
		char allopts[NAMEDATALEN];

		sprintf(allopts, "%d OPTIONS", numopts);

		alter_subtype = pstrdup(allopts);
	}
	else if (0 == numopts)
	{
		alter_subtype = "0 OPTIONS";
	}
	else if ((Gp_role == GP_ROLE_DISPATCH) && (!bSeqIsTemp))
	{
753
		ListCell		*option = list_head(options);
754 755 756 757 758 759 760 761 762 763 764 765
		DefElem			*defel	= (DefElem *) lfirst(option);
		char			*tempo	= NULL;

		alter_subtype = defel->defname;
		if (0 == strcmp(alter_subtype, "owned_by"))
			alter_subtype = "OWNED BY";

		tempo = str_toupper(alter_subtype, strlen(alter_subtype));

		alter_subtype = tempo;
	}

766
	if (Gp_role == GP_ROLE_DISPATCH && !bSeqIsTemp)
767
	{
768 769 770 771 772
		/* MPP-6929: metadata tracking */
		MetaTrackUpdObject(RelationRelationId,
						   relid,
						   GetUserId(),
						   "ALTER", alter_subtype);
773
	}
B
Bruce Momjian 已提交
774 775
}

776

777 778 779 780 781
/*
 * Note: nextval with a text argument is no longer exported as a pg_proc
 * entry, but we keep it around to ease porting of C code that may have
 * called the function directly.
 */
782 783
Datum
nextval(PG_FUNCTION_ARGS)
784
{
785
	text	   *seqin = PG_GETARG_TEXT_P(0);
786
	RangeVar   *sequence;
787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805
	Oid			relid;

	sequence = makeRangeVarFromNameList(textToQualifiedNameList(seqin));
	relid = RangeVarGetRelid(sequence, false);

	PG_RETURN_INT64(nextval_internal(relid));
}

Datum
nextval_oid(PG_FUNCTION_ARGS)
{
	Oid			relid = PG_GETARG_OID(0);

	PG_RETURN_INT64(nextval_internal(relid));
}

static int64
nextval_internal(Oid relid)
{
806
	SeqTable	elm;
807
	Relation	seqrel;
808 809 810 811 812 813 814
	bool is_overflow = false;

	/* open and AccessShareLock sequence */
	init_sequence(relid, &elm, &seqrel);

	if (elm->last != elm->cached)		/* some numbers were cached */
	{
815 816
		Assert(elm->last_valid);
		Assert(elm->increment != 0);
817 818
		elm->last += elm->increment;
		relation_close(seqrel, NoLock);
819
		last_used_seq = elm;
820 821 822 823 824 825 826 827 828
		return elm->last;
	}

	if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 errmsg("permission denied for sequence %s",
						RelationGetRelationName(seqrel))));

829 830 831 832 833 834 835 836 837
	/* Update the sequence object. */
	if (Gp_role == GP_ROLE_EXECUTE)
		cdb_sequence_nextval_proxy(seqrel,
								   &elm->last,
								   &elm->cached,
								   &elm->increment,
								   &is_overflow);
	else
		cdb_sequence_nextval(elm,
838
							 seqrel,
839 840 841 842
							 &elm->last,
							 &elm->cached,
							 &elm->increment,
							 &is_overflow);
843 844
	last_used_seq = elm;

845 846 847
	if (is_overflow)
	{
		relation_close(seqrel, NoLock);
848

849 850 851
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("nextval: reached %s value of sequence \"%s\" (" INT64_FORMAT ")",
852 853
                        elm->increment>0 ? "maximum":"minimum",
                        RelationGetRelationName(seqrel), elm->last)));
854 855 856
	}
	else
		elm->last_valid = true;
857 858

	relation_close(seqrel, NoLock);
859 860 861 862
	return elm->last;
}


863 864 865
static void
cdb_sequence_nextval(SeqTable elm,
					 Relation   seqrel,
866 867 868 869 870 871 872
                     int64     *plast,
                     int64     *pcached,
                     int64     *pincrement,
                     bool      *poverflow)
{
	MIRROREDLOCK_BUFMGR_DECLARE;

873
	Buffer		buf;
874
	Page		page;
875
	HeapTupleData seqtuple;
876
	Form_pg_sequence seq;
877
	int64		incby,
878 879
				maxv,
				minv,
V
Vadim B. Mikheev 已提交
880 881 882 883
				cache,
				log,
				fetch,
				last;
884
	int64		result,
885 886
				next,
				rescnt = 0;
887
	bool 		have_overflow = false;
V
Vadim B. Mikheev 已提交
888
	bool		logit = false;
889

890
	/* lock page' buffer and read tuple */
891 892 893 894
	
	// -------- MirroredLock ----------
	MIRROREDLOCK_BUFMGR_LOCK;
	
895
	seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
896
	page = BufferGetPage(buf);
897

V
Vadim B. Mikheev 已提交
898
	last = next = result = seq->last_value;
899 900 901
	incby = seq->increment_by;
	maxv = seq->max_value;
	minv = seq->min_value;
V
Vadim B. Mikheev 已提交
902 903
	fetch = cache = seq->cache_value;
	log = seq->log_cnt;
904

905
	if (!seq->is_called)
V
Vadim B. Mikheev 已提交
906
	{
907
		rescnt++;				/* return last_value if not is_called */
V
Vadim B. Mikheev 已提交
908 909
		fetch--;
	}
910

911
	/*
B
Bruce Momjian 已提交
912 913 914
	 * Decide whether we should emit a WAL log record.	If so, force up the
	 * fetch count to grab SEQ_LOG_VALS more values than we actually need to
	 * cache.  (These will then be usable without logging.)
915
	 *
916 917 918 919
	 * If this is the first nextval after a checkpoint, we must force a new
	 * WAL record to be written anyway, else replay starting from the
	 * checkpoint would fail to advance the sequence past the logged values.
	 * In this case we may as well fetch extra values.
920
	 */
921
	if (log < fetch || !seq->is_called)
V
Vadim B. Mikheev 已提交
922
	{
923 924
		/* forced log to satisfy local demand for values */
		fetch = log = fetch + SEQ_LOG_VALS;
V
Vadim B. Mikheev 已提交
925 926
		logit = true;
	}
927 928 929 930 931 932 933 934 935 936 937
	else
	{
		XLogRecPtr	redoptr = GetRedoRecPtr();

		if (XLByteLE(PageGetLSN(page), redoptr))
		{
			/* last update of seq was before checkpoint */
			fetch = log = fetch + SEQ_LOG_VALS;
			logit = true;
		}
	}
V
Vadim B. Mikheev 已提交
938

B
Bruce Momjian 已提交
939
	while (fetch)				/* try to fetch cache [+ log ] numbers */
940
	{
941
		/*
B
Bruce Momjian 已提交
942 943
		 * Check MAXVALUE for ascending sequences and MINVALUE for descending
		 * sequences
944
		 */
945
		if (incby > 0)
946
		{
947
			/* ascending sequence */
948 949 950 951
			if ((maxv >= 0 && next > maxv - incby) ||
				(maxv < 0 && next + incby > maxv))
			{
				if (rescnt > 0)
V
Vadim B. Mikheev 已提交
952
					break;		/* stop fetching */
953
				if (!seq->is_cycled)
954
				{
955 956 957 958 959
					have_overflow = true;
				}
				else
				{
					next = minv;
960
				}
961 962 963 964 965 966
			}
			else
				next += incby;
		}
		else
		{
967
			/* descending sequence */
968 969 970 971
			if ((minv < 0 && next < minv - incby) ||
				(minv >= 0 && next + incby < minv))
			{
				if (rescnt > 0)
V
Vadim B. Mikheev 已提交
972
					break;		/* stop fetching */
973
				if (!seq->is_cycled)
974
				{
975 976 977 978 979
					have_overflow = true;
				}
				else
				{
					next = maxv;
980
				}
981 982 983 984
			}
			else
				next += incby;
		}
V
Vadim B. Mikheev 已提交
985 986 987 988 989 990
		fetch--;
		if (rescnt < cache)
		{
			log--;
			rescnt++;
			last = next;
B
Bruce Momjian 已提交
991 992
			if (rescnt == 1)	/* if it's first result - */
				result = next;	/* it's what to return */
V
Vadim B. Mikheev 已提交
993
		}
994 995
	}

996 997 998
	log -= fetch;				/* adjust for any unfetched numbers */
	Assert(log >= 0);

999 1000 1001 1002 1003
    /* set results for caller */
	*poverflow = have_overflow; /* has the sequence overflown */
    *plast = result;            /* last returned number */
    *pcached = last;            /* last fetched number */
	*pincrement = incby;
V
Vadim B. Mikheev 已提交
1004

1005 1006 1007
	// Fetch gp_persistent_relation_node information that will be added to XLOG record.
	Assert(seqrel != NULL);
	Sequence_FetchGpRelationNodeForXLog(seqrel);
1008

1009
	/* ready to change the on-disk (or really, in-buffer) tuple */
1010
	START_CRIT_SECTION();
1011

1012 1013 1014 1015 1016 1017 1018 1019 1020
	/*
	 * We must mark the buffer dirty before doing XLogInsert(); see notes in
	 * SyncOneBuffer().  However, we don't apply the desired changes just yet.
	 * This looks like a violation of the buffer update protocol, but it is
	 * in fact safe because we hold exclusive lock on the buffer.  Any other
	 * process, including a checkpoint, that tries to examine the buffer
	 * contents will block until we release the lock, and then will see the
	 * final state that we install below.
	 */
1021 1022
	MarkBufferDirty(buf);

1023 1024
	/* XLOG stuff */
	if (logit && !seqrel->rd_istemp)
V
Vadim B. Mikheev 已提交
1025 1026 1027
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
B
Bruce Momjian 已提交
1028
		XLogRecData rdata[2];
V
Vadim B. Mikheev 已提交
1029

1030 1031 1032 1033 1034 1035
		/*
		 * We don't log the current state of the tuple, but rather the state
		 * as it would appear after "log" more fetches.  This lets us skip
		 * that many future WAL records, at the cost that we lose those
		 * sequence values if we crash.
		 */
1036

1037
		/* set values that will be saved in xlog */
1038
		seq->last_value = next;
1039
		seq->is_called = true;
1040
		seq->log_cnt = 0;
1041

1042
		xlrec.node = seqrel->rd_node;
1043
		RelationGetPTInfo(seqrel, &xlrec.persistentTid, &xlrec.persistentSerialNum);
B
Bruce Momjian 已提交
1044
		rdata[0].data = (char *) &xlrec;
1045
		rdata[0].len = sizeof(xl_seq_rec);
1046
		rdata[0].buffer = InvalidBuffer;
1047 1048
		rdata[0].next = &(rdata[1]);

1049 1050
		rdata[1].data = (char *) seqtuple.t_data;
		rdata[1].len = seqtuple.t_len;
1051
		rdata[1].buffer = InvalidBuffer;
1052 1053
		rdata[1].next = NULL;

1054
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);
V
Vadim B. Mikheev 已提交
1055

1056
		PageSetLSN(page, recptr);
1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069

		/* need to update where we've inserted to into shmem so that the QD can flush it
		 * when necessary
		 */
		LWLockAcquire(SeqServerControlLock, LW_EXCLUSIVE);

		if (XLByteLT(seqServerCtl->lastXlogEntry, recptr))
		{
			seqServerCtl->lastXlogEntry.xlogid = recptr.xlogid;
			seqServerCtl->lastXlogEntry.xrecoff = recptr.xrecoff;
		}

		LWLockRelease(SeqServerControlLock);
V
Vadim B. Mikheev 已提交
1070
	}
1071

1072
	/* Now update sequence tuple to the intended final state */
V
Vadim B. Mikheev 已提交
1073
	seq->last_value = last;		/* last fetched number */
1074
	seq->is_called = true;
V
Vadim B. Mikheev 已提交
1075
	seq->log_cnt = log;			/* how much is logged */
1076

1077
	END_CRIT_SECTION();
1078

1079
	UnlockReleaseBuffer(buf);
1080 1081 1082 1083 1084
	
	MIRROREDLOCK_BUFMGR_UNLOCK;
	// -------- MirroredLock ----------
	
}                               /* cdb_sequence_nextval */
1085

1086

1087
Datum
1088
currval_oid(PG_FUNCTION_ARGS)
1089
{
1090 1091
	Oid			relid = PG_GETARG_OID(0);
	int64		result;
1092
	SeqTable	elm;
1093
	Relation	seqrel;
1094

1095 1096 1097 1098 1099 1100 1101 1102
	/* For now, strictly forbidden on MPP. */
	if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE)
	{
		ereport(ERROR,
				(errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED),
				 errmsg("currval() not supported")));
	}

V
Vadim B. Mikheev 已提交
1103
	/* open and AccessShareLock sequence */
1104
	init_sequence(relid, &elm, &seqrel);
1105

1106 1107
	if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_SELECT) != ACLCHECK_OK &&
		pg_class_aclcheck(elm->relid, GetUserId(), ACL_USAGE) != ACLCHECK_OK)
1108 1109
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1110
				 errmsg("permission denied for sequence %s",
1111
						RelationGetRelationName(seqrel))));
1112

1113
	if (!elm->last_valid)
1114 1115
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1116
				 errmsg("currval of sequence \"%s\" is not yet defined in this session",
1117
						RelationGetRelationName(seqrel))));
1118 1119 1120

	result = elm->last;

1121 1122
	relation_close(seqrel, NoLock);

1123
	PG_RETURN_INT64(result);
1124 1125
}

1126 1127 1128 1129 1130 1131
Datum
lastval(PG_FUNCTION_ARGS)
{
	Relation	seqrel;
	int64		result;

1132 1133 1134 1135 1136 1137 1138 1139
	/* For now, strictly forbidden on MPP. */
	if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE)
	{
		ereport(ERROR,
				(errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED),
				 errmsg("lastval() not supported")));
	}

1140 1141 1142 1143 1144 1145
	if (last_used_seq == NULL)
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("lastval is not yet defined in this session")));

	/* Someone may have dropped the sequence since the last nextval() */
1146 1147 1148
	if (!SearchSysCacheExists(RELOID,
							  ObjectIdGetDatum(last_used_seq->relid),
							  0, 0, 0))
1149 1150 1151 1152
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("lastval is not yet defined in this session")));

1153
	seqrel = open_share_lock(last_used_seq);
1154 1155

	/* nextval() must have already been called for this sequence */
1156
	Assert(last_used_seq->last_valid);
1157

1158 1159
	if (pg_class_aclcheck(last_used_seq->relid, GetUserId(), ACL_SELECT) != ACLCHECK_OK &&
		pg_class_aclcheck(last_used_seq->relid, GetUserId(), ACL_USAGE) != ACLCHECK_OK)
1160 1161 1162 1163 1164 1165 1166
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 errmsg("permission denied for sequence %s",
						RelationGetRelationName(seqrel))));

	result = last_used_seq->last;
	relation_close(seqrel, NoLock);
1167

1168 1169 1170
	PG_RETURN_INT64(result);
}

B
Bruce Momjian 已提交
1171
/*
1172 1173 1174 1175
 * Main internal procedure that handles 2 & 3 arg forms of SETVAL.
 *
 * Note that the 3 arg version (which sets the is_called flag) is
 * only for use in pg_dump, and setting the is_called flag may not
B
Bruce Momjian 已提交
1176
 * work if multiple users are attached to the database and referencing
1177 1178
 * the sequence (unlikely if pg_dump is restoring it).
 *
B
Bruce Momjian 已提交
1179
 * It is necessary to have the 3 arg version so that pg_dump can
1180 1181 1182 1183
 * restore the state of a sequence exactly during data-only restores -
 * it is the only way to clear the is_called flag in an existing
 * sequence.
 */
B
Bruce Momjian 已提交
1184
static void
1185
do_setval(Oid relid, int64 next, bool iscalled)
M
 
Marc G. Fournier 已提交
1186
{
1187 1188
	MIRROREDLOCK_BUFMGR_DECLARE;

M
 
Marc G. Fournier 已提交
1189
	SeqTable	elm;
1190
	Relation	seqrel;
1191
	Buffer		buf;
1192
	HeapTupleData seqtuple;
1193
	Form_pg_sequence seq;
M
 
Marc G. Fournier 已提交
1194

1195 1196 1197 1198 1199 1200 1201
	if (Gp_role == GP_ROLE_EXECUTE)
	{
		ereport(ERROR,
				(errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED),
				 errmsg("setval() not supported in this context")));
	}

1202
	/* open and AccessShareLock sequence */
1203
	init_sequence(relid, &elm, &seqrel);
1204 1205

	if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
1206 1207
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1208
				 errmsg("permission denied for sequence %s",
1209
						RelationGetRelationName(seqrel))));
M
 
Marc G. Fournier 已提交
1210

1211
	/* lock page' buffer and read tuple */
1212 1213 1214 1215
	
	// -------- MirroredLock ----------
	MIRROREDLOCK_BUFMGR_LOCK;
	
1216
	seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
1217
	elm->increment = seq->increment_by;
M
 
Marc G. Fournier 已提交
1218

1219
	if ((next < seq->min_value) || (next > seq->max_value))
1220
	{
B
Bruce Momjian 已提交
1221 1222 1223 1224
		char		bufv[100],
					bufm[100],
					bufx[100];

1225 1226 1227
		snprintf(bufv, sizeof(bufv), INT64_FORMAT, next);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, seq->min_value);
		snprintf(bufx, sizeof(bufx), INT64_FORMAT, seq->max_value);
1228 1229
		ereport(ERROR,
				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1230
				 errmsg("setval: value %s is out of bounds for sequence \"%s\" (%s..%s)",
1231 1232
						bufv, RelationGetRelationName(seqrel),
						bufm, bufx)));
1233
	}
M
 
Marc G. Fournier 已提交
1234

1235 1236 1237 1238 1239 1240 1241 1242 1243
	/* Set the currval() state only if iscalled = true */
	if (iscalled)
	{
		elm->last = next;		/* last returned number */
		elm->last_valid = true;
	}

	/* In any case, forget any future cached numbers */
	elm->cached = elm->last;
M
 
Marc G. Fournier 已提交
1244

1245 1246 1247 1248
	// Fetch gp_persistent_relation_node information that will be added to XLOG record.
	Assert(seqrel != NULL);
	Sequence_FetchGpRelationNodeForXLog(seqrel);

1249
	/* ready to change the on-disk (or really, in-buffer) tuple */
1250
	START_CRIT_SECTION();
1251

1252 1253 1254 1255
	seq->last_value = next;		/* last fetched number */
	seq->is_called = iscalled;
	seq->log_cnt = 0;

1256 1257
	MarkBufferDirty(buf);

1258 1259
	/* XLOG stuff */
	if (!seqrel->rd_istemp)
V
Vadim B. Mikheev 已提交
1260 1261 1262
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
B
Bruce Momjian 已提交
1263
		XLogRecData rdata[2];
1264
		Page		page = BufferGetPage(buf);
V
Vadim B. Mikheev 已提交
1265

1266
		xlrec.node = seqrel->rd_node;
1267
		RelationGetPTInfo(seqrel, &xlrec.persistentTid, &xlrec.persistentSerialNum);
1268

B
Bruce Momjian 已提交
1269
		rdata[0].data = (char *) &xlrec;
1270
		rdata[0].len = sizeof(xl_seq_rec);
1271
		rdata[0].buffer = InvalidBuffer;
1272 1273
		rdata[0].next = &(rdata[1]);

1274 1275
		rdata[1].data = (char *) seqtuple.t_data;
		rdata[1].len = seqtuple.t_len;
1276
		rdata[1].buffer = InvalidBuffer;
1277 1278
		rdata[1].next = NULL;

1279
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);
1280 1281

		PageSetLSN(page, recptr);
V
Vadim B. Mikheev 已提交
1282
	}
1283

1284
	END_CRIT_SECTION();
M
 
Marc G. Fournier 已提交
1285

1286
	UnlockReleaseBuffer(buf);
1287

1288 1289 1290
	MIRROREDLOCK_BUFMGR_UNLOCK;
	// -------- MirroredLock ----------

1291
	relation_close(seqrel, NoLock);
1292 1293
}

1294 1295 1296 1297
/*
 * Implement the 2 arg setval procedure.
 * See do_setval for discussion.
 */
1298
Datum
1299
setval_oid(PG_FUNCTION_ARGS)
1300
{
1301
	Oid			relid = PG_GETARG_OID(0);
1302
	int64		next = PG_GETARG_INT64(1);
1303

1304
	do_setval(relid, next, true);
1305

1306
	PG_RETURN_INT64(next);
1307 1308
}

1309 1310 1311 1312
/*
 * Implement the 3 arg setval procedure.
 * See do_setval for discussion.
 */
1313
Datum
1314
setval3_oid(PG_FUNCTION_ARGS)
1315
{
1316
	Oid			relid = PG_GETARG_OID(0);
1317
	int64		next = PG_GETARG_INT64(1);
1318 1319
	bool		iscalled = PG_GETARG_BOOL(2);

1320
	do_setval(relid, next, iscalled);
1321

1322
	PG_RETURN_INT64(next);
M
 
Marc G. Fournier 已提交
1323 1324
}

1325

1326
/*
1327 1328
 * Open the sequence and acquire AccessShareLock if needed
 *
1329
 * If we haven't touched the sequence already in this transaction,
B
Bruce Momjian 已提交
1330
 * we need to acquire AccessShareLock.	We arrange for the lock to
1331 1332 1333
 * be owned by the top transaction, so that we don't need to do it
 * more than once per xact.
 */
1334 1335
static Relation
open_share_lock(SeqTable seq)
1336
{
1337
	LocalTransactionId thislxid = MyProc->lxid;
1338

1339
	/* Get the lock if not already held in this xact */
1340
	if (seq->lxid != thislxid)
1341 1342 1343 1344 1345 1346 1347
	{
		ResourceOwner currentOwner;

		currentOwner = CurrentResourceOwner;
		PG_TRY();
		{
			CurrentResourceOwner = TopTransactionResourceOwner;
1348
			LockRelationOid(seq->relid, AccessShareLock);
1349 1350 1351 1352 1353 1354 1355 1356 1357 1358
		}
		PG_CATCH();
		{
			/* Ensure CurrentResourceOwner is restored on error */
			CurrentResourceOwner = currentOwner;
			PG_RE_THROW();
		}
		PG_END_TRY();
		CurrentResourceOwner = currentOwner;

1359
		/* Flag that we have a lock in the current xact */
1360
		seq->lxid = thislxid;
1361
	}
1362 1363 1364

	/* We now know we have AccessShareLock, and can safely open the rel */
	return relation_open(seq->relid, NoLock);
1365 1366
}

1367
/*
1368
 * Given a relation OID, open and lock the sequence.  p_elm and p_rel are
1369
 * output parameters.
1370 1371
 *
 * GPDB: If p_rel is NULL, the sequence relation is not opened or locked.
1372 1373
 */
static void
1374
init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel)
1375
{
B
Bruce Momjian 已提交
1376
	SeqTable	elm;
1377
	Relation	seqrel;
1378

1379 1380 1381 1382 1383 1384 1385
	/* Look to see if we already have a seqtable entry for relation */
	for (elm = seqtab; elm != NULL; elm = elm->next)
	{
		if (elm->relid == relid)
			break;
	}

1386
	/*
1387
	 * Allocate new seqtable entry if we didn't find one.
1388
	 *
B
Bruce Momjian 已提交
1389 1390 1391
	 * NOTE: seqtable entries remain in the list for the life of a backend. If
	 * the sequence itself is deleted then the entry becomes wasted memory,
	 * but it's small enough that this should not matter.
B
Bruce Momjian 已提交
1392
	 */
1393
	if (elm == NULL)
1394
	{
1395
		/*
B
Bruce Momjian 已提交
1396 1397
		 * Time to make a new seqtable entry.  These entries live as long as
		 * the backend does, so we use plain malloc for them.
1398 1399
		 */
		elm = (SeqTable) malloc(sizeof(SeqTableData));
T
Tom Lane 已提交
1400
		if (elm == NULL)
1401 1402 1403
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
1404
		elm->relid = relid;
1405
		elm->lxid = InvalidLocalTransactionId;
1406
		elm->last_valid = false;
1407 1408 1409
		elm->last = elm->cached = elm->increment = 0;
		elm->next = seqtab;
		seqtab = elm;
1410 1411
	}

1412 1413 1414
	/*
	 * Open the sequence relation.
	 */
1415 1416 1417
	if (p_rel)
	{
		seqrel = open_share_lock(elm);
1418

1419 1420 1421 1422 1423
		if (seqrel->rd_rel->relkind != RELKIND_SEQUENCE)
			ereport(ERROR,
					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
					 errmsg("\"%s\" is not a sequence",
							RelationGetRelationName(seqrel))));
1424

1425 1426
		*p_rel = seqrel;
	}
1427
	*p_elm = elm;
1428 1429 1430
}


1431 1432 1433 1434 1435 1436 1437 1438 1439
/*
 * Given an opened sequence relation, lock the page buffer and find the tuple
 *
 * *buf receives the reference to the pinned-and-ex-locked buffer
 * *seqtuple receives the reference to the sequence tuple proper
 *		(this arg should point to a local variable of type HeapTupleData)
 *
 * Function's return value points to the data payload of the tuple
 */
1440
static Form_pg_sequence
1441
read_seq_tuple(SeqTable elm, Relation rel, Buffer *buf, HeapTuple seqtuple)
1442
{
1443
	Page		page;
1444 1445 1446
	ItemId		lp;
	sequence_magic *sm;
	Form_pg_sequence seq;
1447

1448 1449
	MIRROREDLOCK_BUFMGR_MUST_ALREADY_BE_HELD;

1450 1451 1452
	*buf = ReadBuffer(rel, 0);
	LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);

1453
	page = BufferGetPage(*buf);
1454 1455 1456
	sm = (sequence_magic *) PageGetSpecialPointer(page);

	if (sm->magic != SEQ_MAGIC)
1457 1458
		elog(ERROR, "bad magic number in sequence \"%s\": %08X",
			 RelationGetRelationName(rel), sm->magic);
1459 1460

	lp = PageGetItemId(page, FirstOffsetNumber);
1461
	Assert(ItemIdIsNormal(lp));
1462 1463 1464 1465

	/* Note we currently only bother to set these two fields of *seqtuple */
	seqtuple->t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
	seqtuple->t_len = ItemIdGetLength(lp);
1466

1467 1468 1469 1470 1471 1472 1473 1474
	/*
	 * Previous releases of Postgres neglected to prevent SELECT FOR UPDATE
	 * on a sequence, which would leave a non-frozen XID in the sequence
	 * tuple's xmax, which eventually leads to clog access failures or worse.
	 * If we see this has happened, clean up after it.  We treat this like a
	 * hint bit update, ie, don't bother to WAL-log it, since we can certainly
	 * do this again if the update gets lost.
	 */
1475
	if (HeapTupleHeaderGetXmax(seqtuple->t_data) != InvalidTransactionId)
1476
	{
1477 1478 1479
		HeapTupleHeaderSetXmax(seqtuple->t_data, InvalidTransactionId);
		seqtuple->t_data->t_infomask &= ~HEAP_XMAX_COMMITTED;
		seqtuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
1480
		MarkBufferDirtyHint(*buf, rel);
1481
	}
1482

1483
	seq = (Form_pg_sequence) GETSTRUCT(seqtuple);
1484

1485
	/* this is a handy place to update our copy of the increment */
1486 1487 1488
	elm->increment = seq->increment_by;

	return seq;
1489 1490
}

1491 1492
/*
 * init_params: process the options list of CREATE or ALTER SEQUENCE,
1493 1494
 * and store the values into appropriate fields of *new.  Also set
 * *owned_by to any OWNED BY option, or to NIL if there is none.
1495 1496 1497 1498
 *
 * If isInit is true, fill any unspecified options with default values;
 * otherwise, do not change existing options that aren't explicitly overridden.
 */
1499
static void
1500 1501
init_params(List *options, bool isInit,
			Form_pg_sequence new, List **owned_by)
1502
{
1503 1504
	DefElem    *start_value = NULL;
	DefElem    *restart_value = NULL;
1505 1506 1507 1508
	DefElem    *increment_by = NULL;
	DefElem    *max_value = NULL;
	DefElem    *min_value = NULL;
	DefElem    *cache_value = NULL;
1509
	DefElem    *is_cycled = NULL;
1510
	ListCell   *option;
1511

1512 1513
	*owned_by = NIL;

B
Bruce Momjian 已提交
1514
	foreach(option, options)
1515
	{
1516
		DefElem    *defel = (DefElem *) lfirst(option);
1517

1518
		if (strcmp(defel->defname, "increment") == 0)
1519 1520
		{
			if (increment_by)
1521 1522 1523
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1524
			increment_by = defel;
1525
		}
1526 1527
		else if (strcmp(defel->defname, "start") == 0)
		{
1528
			if (start_value)
1529 1530 1531
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1532
			start_value = defel;
1533 1534
		}
		else if (strcmp(defel->defname, "restart") == 0)
1535
		{
1536
			if (restart_value)
1537 1538 1539
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1540
			restart_value = defel;
1541
		}
1542
		else if (strcmp(defel->defname, "maxvalue") == 0)
1543 1544
		{
			if (max_value)
1545 1546 1547
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1548
			max_value = defel;
1549
		}
1550
		else if (strcmp(defel->defname, "minvalue") == 0)
1551 1552
		{
			if (min_value)
1553 1554 1555
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1556
			min_value = defel;
1557
		}
1558
		else if (strcmp(defel->defname, "cache") == 0)
1559 1560
		{
			if (cache_value)
1561 1562 1563
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1564
			cache_value = defel;
1565
		}
1566
		else if (strcmp(defel->defname, "cycle") == 0)
1567
		{
1568
			if (is_cycled)
1569 1570 1571
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1572
			is_cycled = defel;
1573
		}
1574 1575 1576 1577 1578 1579 1580 1581
		else if (strcmp(defel->defname, "owned_by") == 0)
		{
			if (*owned_by)
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
			*owned_by = defGetQualifiedName(defel);
		}
1582
		else
1583
			elog(ERROR, "option \"%s\" not recognized",
1584 1585 1586
				 defel->defname);
	}

1587 1588 1589 1590 1591 1592 1593
	/*
	 * We must reset log_cnt when isInit or when changing any parameters
	 * that would affect future nextval allocations.
	 */
	if (isInit)
		new->log_cnt = 0;

B
Bruce Momjian 已提交
1594
	/* INCREMENT BY */
1595
	if (increment_by != NULL)
B
Bruce Momjian 已提交
1596 1597
	{
		new->increment_by = defGetInt64(increment_by);
1598 1599 1600
		if (new->increment_by == 0)
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1601
					 errmsg("INCREMENT must not be zero")));
1602
		new->log_cnt = 0;
B
Bruce Momjian 已提交
1603
	}
1604 1605 1606 1607
	else if (isInit)
		new->increment_by = 1;

	/* CYCLE */
1608
	if (is_cycled != NULL)
1609 1610 1611
	{
		new->is_cycled = intVal(is_cycled->arg);
		Assert(new->is_cycled == false || new->is_cycled == true);
1612
		new->log_cnt = 0;
1613 1614 1615
	}
	else if (isInit)
		new->is_cycled = false;
1616

1617
	/* MAXVALUE (null arg means NO MAXVALUE) */
1618
	if (max_value != NULL && max_value->arg)
1619
	{
1620
		new->max_value = defGetInt64(max_value);
1621 1622
		new->log_cnt = 0;
	}
1623
	else if (isInit || max_value != NULL)
1624
	{
1625
		if (new->increment_by > 0)
B
Bruce Momjian 已提交
1626
			new->max_value = SEQ_MAXVALUE;		/* ascending seq */
1627
		else
B
Bruce Momjian 已提交
1628
			new->max_value = -1;	/* descending seq */
1629
		new->log_cnt = 0;
1630
	}
1631

1632
	/* MINVALUE (null arg means NO MINVALUE) */
1633
	if (min_value != NULL && min_value->arg)
1634
	{
1635
		new->min_value = defGetInt64(min_value);
1636 1637
		new->log_cnt = 0;
	}
1638
	else if (isInit || min_value != NULL)
1639
	{
1640
		if (new->increment_by > 0)
B
Bruce Momjian 已提交
1641
			new->min_value = 1; /* ascending seq */
1642
		else
B
Bruce Momjian 已提交
1643
			new->min_value = SEQ_MINVALUE;		/* descending seq */
1644
		new->log_cnt = 0;
1645
	}
1646

1647
	/* crosscheck min/max */
1648
	if (new->min_value >= new->max_value)
1649
	{
B
Bruce Momjian 已提交
1650 1651 1652
		char		bufm[100],
					bufx[100];

1653 1654
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->min_value);
		snprintf(bufx, sizeof(bufx), INT64_FORMAT, new->max_value);
1655 1656 1657 1658
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("MINVALUE (%s) must be less than MAXVALUE (%s)",
						bufm, bufx)));
1659
	}
1660

B
Bruce Momjian 已提交
1661
	/* START WITH */
1662 1663
	if (start_value != NULL)
		new->start_value = defGetInt64(start_value);
1664
	else if (isInit)
1665
	{
1666
		if (new->increment_by > 0)
1667
			new->start_value = new->min_value;	/* ascending seq */
1668
		else
1669
			new->start_value = new->max_value;	/* descending seq */
1670
	}
1671

1672 1673
	/* crosscheck START */
	if (new->start_value < new->min_value)
1674
	{
1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697
		char		bufs[100],
					bufm[100];

		snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->start_value);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->min_value);
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("START value (%s) cannot be less than MINVALUE (%s)",
						bufs, bufm)));
	}
	if (new->start_value > new->max_value)
	{
		char		bufs[100],
					bufm[100];

		snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->start_value);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->max_value);
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
			  errmsg("START value (%s) cannot be greater than MAXVALUE (%s)",
					 bufs, bufm)));
	}

1698 1699 1700 1701 1702 1703 1704
	/* RESTART [WITH] */
	if (restart_value != NULL)
	{
		if (restart_value->arg != NULL)
			new->last_value = defGetInt64(restart_value);
		else
			new->last_value = new->start_value;
1705 1706 1707
		new->is_called = false;
		new->log_cnt = 1;
	}
1708
	else if (isInit)
1709
	{
1710
		new->last_value = new->start_value;
1711 1712
		new->is_called = false;
		new->log_cnt = 1;
1713
	}
1714

1715
	/* crosscheck RESTART (or current value, if changing MIN/MAX) */
1716
	if (new->last_value < new->min_value)
1717
	{
B
Bruce Momjian 已提交
1718 1719 1720
		char		bufs[100],
					bufm[100];

1721 1722
		snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->last_value);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->min_value);
1723 1724
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1725 1726
			   errmsg("RESTART value (%s) cannot be less than MINVALUE (%s)",
					  bufs, bufm)));
1727
	}
1728
	if (new->last_value > new->max_value)
1729
	{
B
Bruce Momjian 已提交
1730 1731 1732
		char		bufs[100],
					bufm[100];

1733 1734
		snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->last_value);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->max_value);
1735 1736
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1737 1738
			errmsg("RESTART value (%s) cannot be greater than MAXVALUE (%s)",
				   bufs, bufm)));
1739
	}
1740

B
Bruce Momjian 已提交
1741
	/* CACHE */
1742
	if (cache_value != NULL)
1743
	{
1744 1745 1746 1747
		new->cache_value = defGetInt64(cache_value);
		if (new->cache_value <= 0)
		{
			char		buf[100];
B
Bruce Momjian 已提交
1748

1749 1750 1751 1752 1753 1754
			snprintf(buf, sizeof(buf), INT64_FORMAT, new->cache_value);
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					 errmsg("CACHE (%s) must be greater than zero",
							buf)));
		}
1755
		new->log_cnt = 0;
1756
	}
1757 1758
	else if (isInit)
		new->cache_value = 1;
1759 1760
}

1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784
/*
 * Process an OWNED BY option for CREATE/ALTER SEQUENCE
 *
 * Ownership permissions on the sequence are already checked,
 * but if we are establishing a new owned-by dependency, we must
 * enforce that the referenced table has the same owner and namespace
 * as the sequence.
 */
static void
process_owned_by(Relation seqrel, List *owned_by)
{
	int			nnames;
	Relation	tablerel;
	AttrNumber	attnum;

	nnames = list_length(owned_by);
	Assert(nnames > 0);
	if (nnames == 1)
	{
		/* Must be OWNED BY NONE */
		if (strcmp(strVal(linitial(owned_by)), "none") != 0)
			ereport(ERROR,
					(errcode(ERRCODE_SYNTAX_ERROR),
					 errmsg("invalid OWNED BY option"),
B
Bruce Momjian 已提交
1785
				errhint("Specify OWNED BY table.column or OWNED BY NONE.")));
1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813
		tablerel = NULL;
		attnum = 0;
	}
	else
	{
		List	   *relname;
		char	   *attrname;
		RangeVar   *rel;

		/* Separate relname and attr name */
		relname = list_truncate(list_copy(owned_by), nnames - 1);
		attrname = strVal(lfirst(list_tail(owned_by)));

		/* Open and lock rel to ensure it won't go away meanwhile */
		rel = makeRangeVarFromNameList(relname);
		tablerel = relation_openrv(rel, AccessShareLock);

		/* Must be a regular table */
		if (tablerel->rd_rel->relkind != RELKIND_RELATION)
			ereport(ERROR,
					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
					 errmsg("referenced relation \"%s\" is not a table",
							RelationGetRelationName(tablerel))));

		/* We insist on same owner and schema */
		if (seqrel->rd_rel->relowner != tablerel->rd_rel->relowner)
			ereport(ERROR,
					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
B
Bruce Momjian 已提交
1814
					 errmsg("sequence must have same owner as table it is linked to")));
1815 1816 1817
		if (RelationGetNamespace(seqrel) != RelationGetNamespace(tablerel))
			ereport(ERROR,
					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
P
Peter Eisentraut 已提交
1818
					 errmsg("sequence must be in same schema as table it is linked to")));
1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829

		/* Now, fetch the attribute number from the system cache */
		attnum = get_attnum(RelationGetRelid(tablerel), attrname);
		if (attnum == InvalidAttrNumber)
			ereport(ERROR,
					(errcode(ERRCODE_UNDEFINED_COLUMN),
					 errmsg("column \"%s\" of relation \"%s\" does not exist",
							attrname, RelationGetRelationName(tablerel))));
	}

	/*
B
Bruce Momjian 已提交
1830 1831
	 * OK, we are ready to update pg_depend.  First remove any existing AUTO
	 * dependencies for the sequence, then optionally add a new one.
1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853
	 */
	markSequenceUnowned(RelationGetRelid(seqrel));

	if (tablerel)
	{
		ObjectAddress refobject,
					depobject;

		refobject.classId = RelationRelationId;
		refobject.objectId = RelationGetRelid(tablerel);
		refobject.objectSubId = attnum;
		depobject.classId = RelationRelationId;
		depobject.objectId = RelationGetRelid(seqrel);
		depobject.objectSubId = 0;
		recordDependencyOn(&depobject, &refobject, DEPENDENCY_AUTO);
	}

	/* Done, but hold lock until commit */
	if (tablerel)
		relation_close(tablerel, NoLock);
}

V
Vadim B. Mikheev 已提交
1854

B
Bruce Momjian 已提交
1855
void
1856
seq_redo(XLogRecPtr beginLoc, XLogRecPtr lsn, XLogRecord *record)
V
Vadim B. Mikheev 已提交
1857
{
1858 1859
	MIRROREDLOCK_BUFMGR_DECLARE;

B
Bruce Momjian 已提交
1860 1861 1862 1863 1864 1865
	uint8		info = record->xl_info & ~XLR_INFO_MASK;
	Buffer		buffer;
	Page		page;
	char	   *item;
	Size		itemsz;
	xl_seq_rec *xlrec = (xl_seq_rec *) XLogRecGetData(record);
1866
	sequence_magic *sm;
V
Vadim B. Mikheev 已提交
1867

1868 1869 1870
	/* Backup blocks are not used in seq records */
	Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));

1871
	if (info != XLOG_SEQ_LOG)
1872
		elog(PANIC, "seq_redo: unknown op code %u", info);
1873 1874 1875 1876
	
	// -------- MirroredLock ----------
	MIRROREDLOCK_BUFMGR_LOCK;
	
1877
	buffer = XLogReadBuffer(xlrec->node, 0, true);
1878
	Assert(BufferIsValid(buffer));
V
Vadim B. Mikheev 已提交
1879 1880
	page = (Page) BufferGetPage(buffer);

1881 1882
	/* Always reinit the page and reinstall the magic number */
	/* See comments in DefineSequence */
1883 1884 1885
	PageInit((Page) page, BufferGetPageSize(buffer), sizeof(sequence_magic));
	sm = (sequence_magic *) PageGetSpecialPointer(page);
	sm->magic = SEQ_MAGIC;
V
Vadim B. Mikheev 已提交
1886

B
Bruce Momjian 已提交
1887
	item = (char *) xlrec + sizeof(xl_seq_rec);
1888
	itemsz = record->xl_len - sizeof(xl_seq_rec);
1889

B
Bruce Momjian 已提交
1890
	if (PageAddItem(page, (Item) item, itemsz,
1891
					FirstOffsetNumber, false, false) == InvalidOffsetNumber)
1892
		elog(PANIC, "seq_redo: failed to add item to page");
V
Vadim B. Mikheev 已提交
1893 1894

	PageSetLSN(page, lsn);
1895 1896
	MarkBufferDirty(buffer);
	UnlockReleaseBuffer(buffer);
1897 1898 1899 1900
	
	MIRROREDLOCK_BUFMGR_UNLOCK;
	// -------- MirroredLock ----------
	
V
Vadim B. Mikheev 已提交
1901 1902
}

B
Bruce Momjian 已提交
1903
void
1904
seq_desc(StringInfo buf, XLogRecPtr beginLoc, XLogRecord *record)
V
Vadim B. Mikheev 已提交
1905
{
1906 1907
	uint8		info = record->xl_info & ~XLR_INFO_MASK;
	char		*rec = XLogRecGetData(record);
B
Bruce Momjian 已提交
1908
	xl_seq_rec *xlrec = (xl_seq_rec *) rec;
V
Vadim B. Mikheev 已提交
1909 1910

	if (info == XLOG_SEQ_LOG)
1911
		appendStringInfo(buf, "log: ");
V
Vadim B. Mikheev 已提交
1912 1913
	else
	{
1914
		appendStringInfo(buf, "UNKNOWN");
V
Vadim B. Mikheev 已提交
1915 1916 1917
		return;
	}

1918
	appendStringInfo(buf, "rel %u/%u/%u",
B
Bruce Momjian 已提交
1919
			   xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode);
V
Vadim B. Mikheev 已提交
1920
}
1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004


/*
 * Initialize a pseudo relcache entry with just enough info to call bufmgr.
 */
static void
cdb_sequence_relation_init(Relation seqrel,
                           Oid      tablespaceid,
                           Oid      dbid,
                           Oid      relid,
                           bool     istemp)
{
    /* See RelationBuildDesc in relcache.c */
    memset(seqrel, 0, sizeof(*seqrel));

    seqrel->rd_smgr = NULL;
    seqrel->rd_refcnt = 99;

    seqrel->rd_id = relid;
    seqrel->rd_istemp = istemp;

    /* Must use shared buffer pool so seqserver & QDs can see the data. */
    seqrel->rd_isLocalBuf = false;

	seqrel->rd_rel = (Form_pg_class)palloc0(CLASS_TUPLE_SIZE);
    sprintf(seqrel->rd_rel->relname.data, "pg_class.oid=%d", relid);

    /* as in RelationInitPhysicalAddr... */
    seqrel->rd_node.spcNode = tablespaceid;
    seqrel->rd_node.dbNode = dbid;
    seqrel->rd_node.relNode = relid;
}                               /* cdb_sequence_relation_init */

/*
 * Clean up pseudo relcache entry.
 */
static void
cdb_sequence_relation_term(Relation seqrel)
{
    /* Close the file. */
    RelationCloseSmgr(seqrel);

    if (seqrel->rd_rel)
        pfree(seqrel->rd_rel);
}                               /* cdb_sequence_relation_term */



/*
 * CDB: forward a nextval request from qExec to the sequence server
 */
void
cdb_sequence_nextval_proxy(Relation	seqrel,
                           int64   *plast,
                           int64   *pcached,
                           int64   *pincrement,
                           bool    *poverflow)
{

	sendSequenceRequest(GetSeqServerFD(),
						seqrel,
    					gp_session_id,
    					plast,
    					pcached,
    					pincrement,
    					poverflow);

}                               /* cdb_sequence_server_nextval */


/*
 * CDB: nextval entry point called by sequence server
 */
void
cdb_sequence_nextval_server(Oid    tablespaceid,
                            Oid    dbid,
                            Oid    relid,
                            bool   istemp,
                            int64 *plast,
                            int64 *pcached,
                            int64 *pincrement,
                            bool  *poverflow)
{
    RelationData    fakerel;
2005
	SeqTable	elm;
2006 2007 2008 2009 2010 2011
	Relation	    seqrel = &fakerel;

    *plast = 0;
    *pcached = 0;
    *pincrement = 0;

2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026
	/*
	 * In Postgres, this method is to find the SeqTable entry for the sequence.
	 * This is not required by sequence server. We only need to initialize
	 * the `elm` which is used later in `cdb_sequence_nextval()`, which
	 * is calling `read_seq_tuple()` method, and require `elm` parameter.
	 *
	 * In GPDB, a sequence server is used to generate unique values for all the sequence.
	 * It doesn't have to lock on the sequence relation, because there will be
	 * only a single instance of sequence server to handle all the requests from
	 * segments to generate the sequence values.
	 * To prevent collision of generating sequence values between 'master'
	 * (e.g.`select nextval(seq)`) and 'segments' (e.g. `insert into table with
	 * serial column`), an BUFFER_LOCK_EXCLUSIVE lock is held on the shared buffer
	 * of the sequence relation.
	 */
2027 2028
	init_sequence(relid, &elm, NULL);

2029 2030 2031 2032 2033 2034 2035
    /* Build a pseudo relcache entry with just enough info to call bufmgr. */
    seqrel = &fakerel;
    cdb_sequence_relation_init(seqrel, tablespaceid, dbid, relid, istemp);

    /* CDB TODO: Catch errors. */

    /* Update the sequence object. */
2036
    cdb_sequence_nextval(elm, seqrel, plast, pcached, pincrement, poverflow);
2037 2038 2039 2040

    /* Cleanup. */
    cdb_sequence_relation_term(seqrel);
}                               /* cdb_sequence_server_nextval */
2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051

/*
 * Mask a Sequence page before performing consistency checks on it.
 */
void
seq_mask(char *page, BlockNumber blkno)
{
	mask_page_lsn_and_checksum(page);

	mask_unused_space(page);
}