sequence.c 50.9 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * sequence.c
4
 *	  PostgreSQL sequences support code.
5
 *
6 7
 * Portions Copyright (c) 2005-2008, Greenplum inc.
 * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
8 9 10 11
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
P
Peter Eisentraut 已提交
12
 *	  $PostgreSQL: pgsql/src/backend/commands/sequence.c,v 1.141 2006/10/06 17:13:58 petere Exp $
13
 *
14 15
 *-------------------------------------------------------------------------
 */
16
#include "postgres.h"
17

18
#include "access/heapam.h"
19 20
#include "access/transam.h"
#include "access/xact.h"
21
#include "catalog/catquery.h"
22
#include "catalog/dependency.h"
23
#include "catalog/heap.h"
24
#include "catalog/namespace.h"
25
#include "catalog/pg_type.h"
26
#include "commands/defrem.h"
27
#include "commands/sequence.h"
28
#include "commands/tablecmds.h"
B
Bruce Momjian 已提交
29
#include "miscadmin.h"
30
#include "storage/smgr.h"               /* RelationCloseSmgr -> smgrclose */
31
#include "nodes/makefuncs.h"
32
#include "utils/acl.h"
B
Bruce Momjian 已提交
33
#include "utils/builtins.h"
34
#include "utils/formatting.h"
35
#include "utils/lsyscache.h"
36
#include "utils/resowner.h"
37
#include "utils/syscache.h"
38

39 40 41 42 43 44 45 46 47 48 49
#include "cdb/cdbdisp.h"
#include "cdb/cdbsrlz.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbcat.h"
#include "cdb/cdbmotion.h"
#include "cdb/ml_ipc.h"

#include "cdb/cdbpersistentfilesysobj.h"

#include "postmaster/seqserver.h"

50

V
Vadim B. Mikheev 已提交
51
/*
52
 * We don't want to log each fetching of a value from a sequence,
V
Vadim B. Mikheev 已提交
53 54 55
 * so we pre-log a few fetches in advance. In the event of
 * crash we can lose as much as we pre-logged.
 */
B
Bruce Momjian 已提交
56
#define SEQ_LOG_VALS	32
57

58 59 60 61 62
/*
 * The "special area" of a sequence's buffer page looks like this.
 */
#define SEQ_MAGIC	  0x1717

63 64
typedef struct sequence_magic
{
65
	uint32		magic;
66
} sequence_magic;
67

68 69 70 71 72 73
/*
 * We store a SeqTable item for every sequence we have touched in the current
 * session.  This is needed to hold onto nextval/currval state.  (We can't
 * rely on the relcache, since it's only, well, a cache, and may decide to
 * discard entries.)
 *
B
Bruce Momjian 已提交
74
 * XXX We use linear search to find pre-existing SeqTable entries.	This is
75 76 77
 * good when only a small number of sequences are touched in a session, but
 * would suck with many different sequences.  Perhaps use a hashtable someday.
 */
78 79
typedef struct SeqTableData
{
80 81 82 83 84 85 86
	struct SeqTableData *next;	/* link to next SeqTable object */
	Oid			relid;			/* pg_class OID of this sequence */
	TransactionId xid;			/* xact in which we last did a seq op */
	int64		last;			/* value last returned by nextval */
	int64		cached;			/* last value already cached for nextval */
	/* if last != cached, we have not used up all the cached values */
	int64		increment;		/* copy of sequence's increment field */
87
} SeqTableData;
88 89 90

typedef SeqTableData *SeqTable;

91
static SeqTable seqtab = NULL;	/* Head of list of SeqTable items */
92

93 94 95 96 97
/*
 * last_used_seq is updated by nextval() to point to the last used
 * sequence.
 */
static SeqTableData *last_used_seq = NULL;
98

99
static int64 nextval_internal(Oid relid);
100
static Relation open_share_lock(SeqTable seq);
101
static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel);
102
static Form_pg_sequence read_info(Relation rel, Buffer *buf);
103
static void init_params(List *options, bool isInit,
B
Bruce Momjian 已提交
104
			Form_pg_sequence new, List **owned_by);
105
static void do_setval(Oid relid, int64 next, bool iscalled);
106 107
static void process_owned_by(Relation seqrel, List *owned_by);

108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360
static void
cdb_sequence_nextval(Relation   seqrel,
                     int64     *plast,
                     int64     *pcached,
                     int64     *pincrement,
                     bool      *seq_overflow);
static void
cdb_sequence_nextval_proxy(Relation seqrel,
                           int64   *plast,
                           int64   *pcached,
                           int64   *pincrement,
                           bool    *poverflow);

typedef struct SequencePersistentInfoCacheEntryKey
{
	RelFileNode				relFileNode;
} SequencePersistentInfoCacheEntryKey;

typedef struct SequencePersistentInfoCacheEntryData
{
	SequencePersistentInfoCacheEntryKey	key;

	ItemPointerData		persistentTid;

	int64				persistentSerialNum;

	DoubleLinks			lruLinks;

} SequencePersistentInfoCacheEntryData;
typedef SequencePersistentInfoCacheEntryData *SequencePersistentInfoCacheEntry;

static HTAB *sequencePersistentInfoCacheTable = NULL;

static DoublyLinkedHead	sequencePersistentInfoCacheLruListHead;

static int sequencePersistentInfoCacheLruCount = 0;

static int sequencePersistentInfoCacheLruLimit = 100;

static void
Sequence_PersistentInfoCacheTableInit(void)
{
	HASHCTL			info;
	int				hash_flags;

	/* Set key and entry sizes. */
	MemSet(&info, 0, sizeof(info));
	info.keysize = sizeof(SequencePersistentInfoCacheEntryKey);
	info.entrysize = sizeof(SequencePersistentInfoCacheEntryData);
	info.hash = tag_hash;

	hash_flags = (HASH_ELEM | HASH_FUNCTION);

	sequencePersistentInfoCacheTable = hash_create("Sequence Persistent Info", 10, &info, hash_flags);

	DoublyLinkedHead_Init(
				&sequencePersistentInfoCacheLruListHead);
}

static bool Sequence_CheckPersistentInfoCache(
	RelFileNode 		*relFileNode,

	ItemPointer			persistentTid,

	int64				*persistentSerialNum)
{
	SequencePersistentInfoCacheEntryKey	key;

	SequencePersistentInfoCacheEntry persistentInfoCacheEntry;

	bool found;

	if (sequencePersistentInfoCacheTable == NULL)
		Sequence_PersistentInfoCacheTableInit();

	MemSet(&key, 0, sizeof(SequencePersistentInfoCacheEntryKey));
	key.relFileNode = *relFileNode;

	persistentInfoCacheEntry = 
		(SequencePersistentInfoCacheEntry) 
						hash_search(sequencePersistentInfoCacheTable,
									(void *) &key,
									HASH_FIND,
									&found);
	if (!found)
		return false;
	
	*persistentTid = persistentInfoCacheEntry->persistentTid;
	*persistentSerialNum = persistentInfoCacheEntry->persistentSerialNum;

	/*
	 * LRU.
	 */
	DoubleLinks_Remove(
		offsetof(SequencePersistentInfoCacheEntryData, lruLinks),
		&sequencePersistentInfoCacheLruListHead,
		persistentInfoCacheEntry);

	DoublyLinkedHead_AddFirst(
		offsetof(SequencePersistentInfoCacheEntryData, lruLinks),
		&sequencePersistentInfoCacheLruListHead,
		persistentInfoCacheEntry);

	return true;	
}

static void Sequence_AddPersistentInfoCache(
	RelFileNode 		*relFileNode,

	ItemPointer			persistentTid,

	int64				persistentSerialNum)
{
	SequencePersistentInfoCacheEntryKey	key;

	SequencePersistentInfoCacheEntry persistentInfoCacheEntry;

	bool found;

	if (sequencePersistentInfoCacheTable == NULL)
		Sequence_PersistentInfoCacheTableInit();

	MemSet(&key, 0, sizeof(SequencePersistentInfoCacheEntryKey));
	key.relFileNode = *relFileNode;

	persistentInfoCacheEntry = 
		(SequencePersistentInfoCacheEntry) 
						hash_search(
								sequencePersistentInfoCacheTable,
								(void *) &key,
								HASH_ENTER,
								&found);
	Assert (!found);
	
	persistentInfoCacheEntry->persistentTid = *persistentTid;
	persistentInfoCacheEntry->persistentSerialNum = persistentSerialNum;

	DoubleLinks_Init(&persistentInfoCacheEntry->lruLinks);

	/*
	 * LRU.
	 */
	DoublyLinkedHead_AddFirst(
		offsetof(SequencePersistentInfoCacheEntryData, lruLinks),
		&sequencePersistentInfoCacheLruListHead,
		persistentInfoCacheEntry);

	sequencePersistentInfoCacheLruCount++;

	if (sequencePersistentInfoCacheLruCount > sequencePersistentInfoCacheLruLimit)
	{
		SequencePersistentInfoCacheEntry lastPersistentInfoCacheEntry;

		lastPersistentInfoCacheEntry = 
			(SequencePersistentInfoCacheEntry) 
							DoublyLinkedHead_Last(
								offsetof(SequencePersistentInfoCacheEntryData, lruLinks),
								&sequencePersistentInfoCacheLruListHead);
		Assert(lastPersistentInfoCacheEntry != NULL);
		
		DoubleLinks_Remove(
			offsetof(SequencePersistentInfoCacheEntryData, lruLinks),
			&sequencePersistentInfoCacheLruListHead,
			lastPersistentInfoCacheEntry);
		
		if (Debug_persistent_print)
			elog(Persistent_DebugPrintLevel(), 
				 "Removed cached persistent information for sequence %u/%u/%u -- serial number " INT64_FORMAT ", TID %s",
				 lastPersistentInfoCacheEntry->key.relFileNode.spcNode,
				 lastPersistentInfoCacheEntry->key.relFileNode.dbNode,
				 lastPersistentInfoCacheEntry->key.relFileNode.relNode,
				 lastPersistentInfoCacheEntry->persistentSerialNum,
				 ItemPointerToString(&lastPersistentInfoCacheEntry->persistentTid));

		hash_search(
				sequencePersistentInfoCacheTable, 
				(void *) &lastPersistentInfoCacheEntry->key, 
				HASH_REMOVE, 
				NULL);
		
		sequencePersistentInfoCacheLruCount--;
	}
}


static void
Sequence_FetchGpRelationNodeForXLog(Relation rel)
{
	if (rel->rd_segfile0_relationnodeinfo.isPresent)
		return;

	/*
	 * For better performance, we cache the persistent information
	 * for sequences with upper bound and use LRU...
	 */
	if (Sequence_CheckPersistentInfoCache(
								&rel->rd_node,
								&rel->rd_segfile0_relationnodeinfo.persistentTid,
								&rel->rd_segfile0_relationnodeinfo.persistentSerialNum))
	{
		if (Debug_persistent_print)
			elog(Persistent_DebugPrintLevel(), 
				 "Found cached persistent information for sequence %u/%u/%u -- serial number " INT64_FORMAT ", TID %s",
				 rel->rd_node.spcNode,
				 rel->rd_node.dbNode,
				 rel->rd_node.relNode,
				 rel->rd_segfile0_relationnodeinfo.persistentSerialNum,
				 ItemPointerToString(&rel->rd_segfile0_relationnodeinfo.persistentTid));
	} 
	else 
	{
		if (!PersistentFileSysObj_ScanForRelation(
												&rel->rd_node,
												/* segmentFileNum */ 0,
												&rel->rd_segfile0_relationnodeinfo.persistentTid,
												&rel->rd_segfile0_relationnodeinfo.persistentSerialNum))
		{
			elog(ERROR, "Cound not find persistent information for sequence %u/%u/%u",
			     rel->rd_node.spcNode,
			     rel->rd_node.dbNode,
			     rel->rd_node.relNode);
		}

		Sequence_AddPersistentInfoCache(
								&rel->rd_node,
								&rel->rd_segfile0_relationnodeinfo.persistentTid,
								rel->rd_segfile0_relationnodeinfo.persistentSerialNum);

		if (Debug_persistent_print)
			elog(Persistent_DebugPrintLevel(), 
				 "Add cached persistent information for sequence %u/%u/%u -- serial number " INT64_FORMAT ", TID %s",
				 rel->rd_node.spcNode,
				 rel->rd_node.dbNode,
				 rel->rd_node.relNode,
				 rel->rd_segfile0_relationnodeinfo.persistentSerialNum,
				 ItemPointerToString(&rel->rd_segfile0_relationnodeinfo.persistentTid));
	}

	if (Debug_check_for_invalid_persistent_tid &&
		!Persistent_BeforePersistenceWork() &&
		PersistentStore_IsZeroTid(&rel->rd_segfile0_relationnodeinfo.persistentTid))
	{	
		elog(ERROR, 
			 "Sequence_FetchGpRelationNodeForXLog has invalid TID (0,0) for relation %u/%u/%u '%s', serial number " INT64_FORMAT,
			 rel->rd_node.spcNode,
			 rel->rd_node.dbNode,
			 rel->rd_node.relNode,
			 NameStr(rel->rd_rel->relname),
			 rel->rd_segfile0_relationnodeinfo.persistentSerialNum);
	}

	rel->rd_segfile0_relationnodeinfo.isPresent = true;
}
361 362

/*
B
Bruce Momjian 已提交
363
 * DefineSequence
364
 *				Creates a new sequence relation
365 366
 */
void
367
DefineSequence(CreateSeqStmt *seq)
368
{
369 370
	MIRROREDLOCK_BUFMGR_DECLARE;

371
	FormData_pg_sequence new;
372
	List	   *owned_by;
373
	CreateStmt *stmt = makeNode(CreateStmt);
374
	Oid			seqoid;
375 376 377
	Relation	rel;
	Buffer		buf;
	PageHeader	page;
378
	sequence_magic *sm;
379 380 381
	HeapTuple	tuple;
	TupleDesc	tupDesc;
	Datum		value[SEQ_COL_LASTCOL];
382
	bool		null[SEQ_COL_LASTCOL];
383
	int			i;
384
	NameData	name;
385

386 387
	bool shouldDispatch =  Gp_role == GP_ROLE_DISPATCH && !IsBootstrapProcessingMode();

388
	/* Check and set all option values */
389
	init_params(seq->options, true, &new, &owned_by);
390 391

	/*
392
	 * Create relation (and fill *null & *value)
393
	 */
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415
	stmt->oidInfo.relOid = 0;
	stmt->oidInfo.comptypeOid = 0;
	stmt->oidInfo.toastOid = 0;
	stmt->oidInfo.toastIndexOid = 0;
	stmt->oidInfo.aosegOid = 0;
	stmt->oidInfo.aosegIndexOid = 0;
	stmt->oidInfo.aoblkdirOid = 0;
	stmt->oidInfo.aoblkdirIndexOid = 0;
	stmt->oidInfo.aovisimapOid = 0;
	stmt->oidInfo.aovisimapIndexOid = 0;

	if (shouldDispatch)
	{

			/* stmt->relOid = newOid(); */
	}
	else if (Gp_role == GP_ROLE_EXECUTE)
	{

			stmt->oidInfo.relOid = seq->relOid;

	}
416 417
	stmt->tableElts = NIL;
	for (i = SEQ_COL_FIRSTCOL; i <= SEQ_COL_LASTCOL; i++)
418
	{
419
		ColumnDef  *coldef = makeNode(ColumnDef);
420

421 422
		coldef->inhcount = 0;
		coldef->is_local = true;
423
		coldef->is_not_null = true;
424 425
		coldef->raw_default = NULL;
		coldef->cooked_default = NULL;
426 427
		coldef->constraints = NIL;

428
		null[i - 1] = false;
429 430 431

		switch (i)
		{
432
			case SEQ_COL_NAME:
433
				coldef->typname = makeTypeNameFromOid(NAMEOID, -1);
434
				coldef->colname = "sequence_name";
435
				namestrcpy(&name, seq->sequence->relname);
436
				value[i - 1] = NameGetDatum(&name);
437 438
				break;
			case SEQ_COL_LASTVAL:
439
				coldef->typname = makeTypeNameFromOid(INT8OID, -1);
440
				coldef->colname = "last_value";
441
				value[i - 1] = Int64GetDatumFast(new.last_value);
442 443
				break;
			case SEQ_COL_INCBY:
444
				coldef->typname = makeTypeNameFromOid(INT8OID, -1);
445
				coldef->colname = "increment_by";
446
				value[i - 1] = Int64GetDatumFast(new.increment_by);
447 448
				break;
			case SEQ_COL_MAXVALUE:
449
				coldef->typname = makeTypeNameFromOid(INT8OID, -1);
450
				coldef->colname = "max_value";
451
				value[i - 1] = Int64GetDatumFast(new.max_value);
452 453
				break;
			case SEQ_COL_MINVALUE:
454
				coldef->typname = makeTypeNameFromOid(INT8OID, -1);
455
				coldef->colname = "min_value";
456
				value[i - 1] = Int64GetDatumFast(new.min_value);
457 458
				break;
			case SEQ_COL_CACHE:
459
				coldef->typname = makeTypeNameFromOid(INT8OID, -1);
460
				coldef->colname = "cache_value";
461
				value[i - 1] = Int64GetDatumFast(new.cache_value);
462
				break;
V
Vadim B. Mikheev 已提交
463
			case SEQ_COL_LOG:
464
				coldef->typname = makeTypeNameFromOid(INT8OID, -1);
V
Vadim B. Mikheev 已提交
465
				coldef->colname = "log_cnt";
466
				value[i - 1] = Int64GetDatum((int64) 1);
V
Vadim B. Mikheev 已提交
467
				break;
468
			case SEQ_COL_CYCLE:
469
				coldef->typname = makeTypeNameFromOid(BOOLOID, -1);
470
				coldef->colname = "is_cycled";
471
				value[i - 1] = BoolGetDatum(new.is_cycled);
472 473
				break;
			case SEQ_COL_CALLED:
474
				coldef->typname = makeTypeNameFromOid(BOOLOID, -1);
475
				coldef->colname = "is_called";
476
				value[i - 1] = BoolGetDatum(false);
477
				break;
478 479 480 481
		}
		stmt->tableElts = lappend(stmt->tableElts, coldef);
	}

482 483
	stmt->relation = seq->sequence;
	stmt->inhRelations = NIL;
484
	stmt->constraints = NIL;
485 486
	stmt->inhOids = NIL;
	stmt->parentOidCount = 0;
B
Bruce Momjian 已提交
487
	stmt->options = list_make1(defWithOids(false));
488
	stmt->oncommit = ONCOMMIT_NOOP;
489
	stmt->tablespacename = NULL;
490 491 492
	stmt->relKind = RELKIND_SEQUENCE;
	stmt->oidInfo.comptypeOid = seq->comptypeOid;
	stmt->ownerid = GetUserId();
493

494
	seqoid = DefineRelation(stmt, RELKIND_SEQUENCE, RELSTORAGE_HEAP);
495

496 497 498 499 500 501 502 503
	/*
	 * Open and lock the new sequence.  (This lock is redundant; an
	 * AccessExclusiveLock was acquired above by DefineRelation and
	 * won't be released until end of transaction.)
	 *
	 * CDB: Acquire lock on qDisp before dispatching to qExecs, so
	 * qDisp can detect and resolve any deadlocks.
	 */
504
	rel = heap_open(seqoid, AccessExclusiveLock);
505
	tupDesc = RelationGetDescr(rel);
506

507 508
	stmt->oidInfo.relOid = seq->relOid = seqoid;

509 510
	/* Initialize first page of relation with special magic number */

511 512 513
	// -------- MirroredLock ----------
	MIRROREDLOCK_BUFMGR_LOCK;
	
514
	buf = ReadBuffer(rel, P_NEW);
515 516
	Assert(BufferGetBlockNumber(buf) == 0);

517 518
	page = (PageHeader) BufferGetPage(buf);

519 520
	LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);

521 522 523 524
	PageInit((Page) page, BufferGetPageSize(buf), sizeof(sequence_magic));
	sm = (sequence_magic *) PageGetSpecialPointer(page);
	sm->magic = SEQ_MAGIC;

525 526 527 528 529
	LockBuffer(buf, BUFFER_LOCK_UNLOCK);
	
	MIRROREDLOCK_BUFMGR_UNLOCK;
	// -------- MirroredLock ----------
	
530 531 532
	/* hack: ensure heap_insert will insert on the just-created page */
	rel->rd_targblock = 0;

533
	/* Now form & insert sequence tuple */
534
	tuple = heap_form_tuple(tupDesc, value, null);
535
	simple_heap_insert(rel, tuple);
536

537 538
	Assert(ItemPointerGetOffsetNumber(&(tuple->t_self)) == FirstOffsetNumber);

539 540 541 542
	// Fetch gp_persistent_relation_node information that will be added to XLOG record.
	Assert(rel != NULL);
	Sequence_FetchGpRelationNodeForXLog(rel);

543
	/*
544 545
	 * Two special hacks here:
	 *
546 547
	 * 1. Since VACUUM does not process sequences, we have to force the tuple
	 * to have xmin = FrozenTransactionId now.	Otherwise it would become
B
Bruce Momjian 已提交
548
	 * invisible to SELECTs after 2G transactions.	It is okay to do this
549 550 551
	 * because if the current transaction aborts, no other xact will ever
	 * examine the sequence tuple anyway.
	 *
B
Bruce Momjian 已提交
552 553 554 555 556
	 * 2. Even though heap_insert emitted a WAL log record, we have to emit an
	 * XLOG_SEQ_LOG record too, since (a) the heap_insert record will not have
	 * the right xmin, and (b) REDO of the heap_insert record would re-init
	 * page and sequence magic number would be lost.  This means two log
	 * records instead of one :-(
557
	 */
558 559 560 561

	// -------- MirroredLock ----------
	MIRROREDLOCK_BUFMGR_LOCK;

562
	LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
563

564
	START_CRIT_SECTION();
565 566 567

	{
		/*
B
Bruce Momjian 已提交
568
		 * Note that the "tuple" structure is still just a local tuple record
569
		 * created by heap_form_tuple; its t_data pointer doesn't point at the
B
Bruce Momjian 已提交
570 571 572
		 * disk buffer.  To scribble on the disk buffer we need to fetch the
		 * item pointer.  But do the same to the local tuple, since that will
		 * be the source for the WAL log record, below.
573 574 575 576 577 578 579
		 */
		ItemId		itemId;
		Item		item;

		itemId = PageGetItemId((Page) page, FirstOffsetNumber);
		item = PageGetItem((Page) page, itemId);

580
		HeapTupleHeaderSetXmin((HeapTupleHeader) item, FrozenTransactionId);
581 582
		((HeapTupleHeader) item)->t_infomask |= HEAP_XMIN_COMMITTED;

583
		HeapTupleHeaderSetXmin(tuple->t_data, FrozenTransactionId);
584 585 586
		tuple->t_data->t_infomask |= HEAP_XMIN_COMMITTED;
	}

587 588
	MarkBufferDirty(buf);

589 590
	/* XLOG stuff */
	if (!rel->rd_istemp)
591
	{
592 593 594 595
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
		XLogRecData rdata[2];
		Form_pg_sequence newseq = (Form_pg_sequence) GETSTRUCT(tuple);
596 597

		/* We do not log first nextval call, so "advance" sequence here */
598
		/* Note we are scribbling on local tuple, not the disk buffer */
599
		newseq->is_called = true;
600 601 602
		newseq->log_cnt = 0;

		xlrec.node = rel->rd_node;
603 604 605
		xlrec.persistentTid = rel->rd_segfile0_relationnodeinfo.persistentTid;
		xlrec.persistentSerialNum = rel->rd_segfile0_relationnodeinfo.persistentSerialNum;

606 607
		rdata[0].data = (char *) &xlrec;
		rdata[0].len = sizeof(xl_seq_rec);
608
		rdata[0].buffer = InvalidBuffer;
609 610
		rdata[0].next = &(rdata[1]);

611
		rdata[1].data = (char *) tuple->t_data;
612
		rdata[1].len = tuple->t_len;
613
		rdata[1].buffer = InvalidBuffer;
614 615 616 617 618
		rdata[1].next = NULL;

		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG | XLOG_NO_TRAN, rdata);

		PageSetLSN(page, recptr);
619
		PageSetTLI(page, ThisTimeLineID);
620
	}
621

622
	END_CRIT_SECTION();
623

624 625
	UnlockReleaseBuffer(buf);

626 627 628
	MIRROREDLOCK_BUFMGR_UNLOCK;
	// -------- MirroredLock ----------

629 630 631 632
	/* process OWNED BY if given */
	if (owned_by)
		process_owned_by(rel, owned_by);

633
	heap_close(rel, NoLock);
634 635 636 637 638 639 640 641

	
	/* Dispatch to segments */
	if (shouldDispatch)
	{
		seq->comptypeOid = stmt->oidInfo.comptypeOid;
		CdbDispatchUtilityStatement((Node *)seq, "DefineSequence");
	}
642 643
}

B
Bruce Momjian 已提交
644 645 646
/*
 * AlterSequence
 *
647
 * Modify the definition of a sequence relation
B
Bruce Momjian 已提交
648 649
 */
void
650
AlterSequence(AlterSeqStmt *stmt)
B
Bruce Momjian 已提交
651
{
652 653
	MIRROREDLOCK_BUFMGR_DECLARE;

654
	Oid			relid;
B
Bruce Momjian 已提交
655 656 657 658 659 660
	SeqTable	elm;
	Relation	seqrel;
	Buffer		buf;
	Page		page;
	Form_pg_sequence seq;
	FormData_pg_sequence new;
661
	List	   *owned_by;
662 663 664 665 666
	int64		save_increment;
	bool		bSeqIsTemp	   = false;
	int			numopts	   = 0;
	char	   *alter_subtype = "";		/* metadata tracking: kind of
										   redundant to say "role" */
B
Bruce Momjian 已提交
667 668

	/* open and AccessShareLock sequence */
669 670
	relid = RangeVarGetRelid(stmt->sequence, false);
	init_sequence(relid, &elm, &seqrel);
B
Bruce Momjian 已提交
671

672
	/* allow ALTER to sequence owner only */
B
Bruce Momjian 已提交
673
	if (!pg_class_ownercheck(elm->relid, GetUserId()))
674 675
		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
					   stmt->sequence->relname);
B
Bruce Momjian 已提交
676

677 678 679
	/* hack to keep ALTER SEQUENCE OWNED BY from changing currval state */
	save_increment = elm->increment;

B
Bruce Momjian 已提交
680
	/* lock page' buffer and read tuple into new sequence structure */
681 682 683 684 685 686
	
	// -------- MirroredLock ----------
	MIRROREDLOCK_BUFMGR_LOCK;
	
	seq = read_info(seqrel, &buf);
	elm->increment = seq->increment_by;
B
Bruce Momjian 已提交
687 688
	page = BufferGetPage(buf);

689 690
	/* Copy old values of options into workspace */
	memcpy(&new, seq, sizeof(FormData_pg_sequence));
B
Bruce Momjian 已提交
691

692
	/* Check and set new values */
693
	init_params(stmt->options, false, &new, &owned_by);
B
Bruce Momjian 已提交
694

695
	/* Now okay to update the on-disk tuple */
696
	memcpy(seq, &new, sizeof(FormData_pg_sequence));
B
Bruce Momjian 已提交
697

698 699 700 701 702 703 704 705 706 707
	if (owned_by)
	{
		/* Restore previous state of elm (assume nothing else changes) */
		elm->increment = save_increment;
	}
	else
	{
		/* Clear local cache so that we don't think we have cached numbers */
		elm->last = new.last_value; /* last returned number */
		elm->cached = new.last_value;	/* last cached number (forget cached
B
Bruce Momjian 已提交
708
										 * values) */
709 710 711 712 713
	}

	// Fetch gp_persistent_relation_node information that will be added to XLOG record.
	Assert(seqrel != NULL);
	Sequence_FetchGpRelationNodeForXLog(seqrel);
714

B
Bruce Momjian 已提交
715 716
	START_CRIT_SECTION();

717 718
	MarkBufferDirty(buf);

B
Bruce Momjian 已提交
719
	/* XLOG stuff */
720 721 722 723

	bSeqIsTemp = seqrel->rd_istemp;

	if (!bSeqIsTemp)
B
Bruce Momjian 已提交
724 725 726 727 728 729
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
		XLogRecData rdata[2];

		xlrec.node = seqrel->rd_node;
730 731 732
		xlrec.persistentTid = seqrel->rd_segfile0_relationnodeinfo.persistentTid;
		xlrec.persistentSerialNum = seqrel->rd_segfile0_relationnodeinfo.persistentSerialNum;

B
Bruce Momjian 已提交
733 734
		rdata[0].data = (char *) &xlrec;
		rdata[0].len = sizeof(xl_seq_rec);
735
		rdata[0].buffer = InvalidBuffer;
B
Bruce Momjian 已提交
736 737 738 739 740
		rdata[0].next = &(rdata[1]);

		rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;
		rdata[1].len = ((PageHeader) page)->pd_special -
			((PageHeader) page)->pd_upper;
741
		rdata[1].buffer = InvalidBuffer;
B
Bruce Momjian 已提交
742 743 744 745 746
		rdata[1].next = NULL;

		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG | XLOG_NO_TRAN, rdata);

		PageSetLSN(page, recptr);
747
		PageSetTLI(page, ThisTimeLineID);
B
Bruce Momjian 已提交
748 749 750 751
	}

	END_CRIT_SECTION();

752
	UnlockReleaseBuffer(buf);
B
Bruce Momjian 已提交
753

754 755 756
	MIRROREDLOCK_BUFMGR_UNLOCK;
	// -------- MirroredLock ----------

757 758 759 760
	/* process OWNED BY if given */
	if (owned_by)
		process_owned_by(seqrel, owned_by);

B
Bruce Momjian 已提交
761
	relation_close(seqrel, NoLock);
762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807

	numopts = list_length(stmt->options);

	if (numopts > 1)
	{
		char allopts[NAMEDATALEN];

		sprintf(allopts, "%d OPTIONS", numopts);

		alter_subtype = pstrdup(allopts);
	}
	else if (0 == numopts)
	{
		alter_subtype = "0 OPTIONS";
	}
	else if ((Gp_role == GP_ROLE_DISPATCH) && (!bSeqIsTemp))
	{
		ListCell		*option = list_head(stmt->options);
		DefElem			*defel	= (DefElem *) lfirst(option);
		char			*tempo	= NULL;

		alter_subtype = defel->defname;
		if (0 == strcmp(alter_subtype, "owned_by"))
			alter_subtype = "OWNED BY";

		tempo = str_toupper(alter_subtype, strlen(alter_subtype));

		alter_subtype = tempo;

	}

	if (Gp_role == GP_ROLE_DISPATCH)
	{
		CdbDispatchUtilityStatement((Node *)stmt, "AlterSequence");

		if (!bSeqIsTemp)
		{
			/* MPP-6929: metadata tracking */
			MetaTrackUpdObject(RelationRelationId,
							   relid,
							   GetUserId(),
							   "ALTER", alter_subtype
					);
		}

	}
B
Bruce Momjian 已提交
808 809
}

810

811 812 813 814 815
/*
 * Note: nextval with a text argument is no longer exported as a pg_proc
 * entry, but we keep it around to ease porting of C code that may have
 * called the function directly.
 */
816 817
Datum
nextval(PG_FUNCTION_ARGS)
818
{
819
	text	   *seqin = PG_GETARG_TEXT_P(0);
820
	RangeVar   *sequence;
821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839
	Oid			relid;

	sequence = makeRangeVarFromNameList(textToQualifiedNameList(seqin));
	relid = RangeVarGetRelid(sequence, false);

	PG_RETURN_INT64(nextval_internal(relid));
}

Datum
nextval_oid(PG_FUNCTION_ARGS)
{
	Oid			relid = PG_GETARG_OID(0);

	PG_RETURN_INT64(nextval_internal(relid));
}

static int64
nextval_internal(Oid relid)
{
840
	SeqTable	elm;
841
	Relation	seqrel;
842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889
	bool is_overflow = false;

	/* open and AccessShareLock sequence */
	init_sequence(relid, &elm, &seqrel);

	if (elm->last != elm->cached)		/* some numbers were cached */
	{
		last_used_seq = elm;
		elm->last += elm->increment;
		relation_close(seqrel, NoLock);
		return elm->last;
	}

	if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 errmsg("permission denied for sequence %s",
						RelationGetRelationName(seqrel))));

    /* Update the sequence object. */
    if (Gp_role == GP_ROLE_EXECUTE)
        cdb_sequence_nextval_proxy(seqrel,
                                   &elm->last,
                                   &elm->cached,
                                   &elm->increment,
                                   &is_overflow);
    else
        cdb_sequence_nextval(seqrel,
                             &elm->last,
                             &elm->cached,
                             &elm->increment,
                             &is_overflow);
	last_used_seq = elm;

    relation_close(seqrel, NoLock);
	return elm->last;
}


void
cdb_sequence_nextval(Relation   seqrel,
                     int64     *plast,
                     int64     *pcached,
                     int64     *pincrement,
                     bool      *poverflow)
{
	MIRROREDLOCK_BUFMGR_DECLARE;

890
	Buffer		buf;
891
	Page		page;
892
	Form_pg_sequence seq;
893
	int64		incby,
894 895
				maxv,
				minv,
V
Vadim B. Mikheev 已提交
896 897 898 899
				cache,
				log,
				fetch,
				last;
900
	int64		result,
901 902
				next,
				rescnt = 0;
903
	bool 		have_overflow = false;
V
Vadim B. Mikheev 已提交
904
	bool		logit = false;
905

906
	/* lock page' buffer and read tuple */
907 908 909 910 911
	
	// -------- MirroredLock ----------
	MIRROREDLOCK_BUFMGR_LOCK;
	
	seq = read_info(seqrel, &buf);
912
	page = BufferGetPage(buf);
913

V
Vadim B. Mikheev 已提交
914
	last = next = result = seq->last_value;
915 916 917
	incby = seq->increment_by;
	maxv = seq->max_value;
	minv = seq->min_value;
V
Vadim B. Mikheev 已提交
918 919
	fetch = cache = seq->cache_value;
	log = seq->log_cnt;
920

921
	if (!seq->is_called)
V
Vadim B. Mikheev 已提交
922
	{
923
		rescnt++;				/* last_value if not called */
V
Vadim B. Mikheev 已提交
924 925 926
		fetch--;
		log--;
	}
927

928
	/*
B
Bruce Momjian 已提交
929 930 931
	 * Decide whether we should emit a WAL log record.	If so, force up the
	 * fetch count to grab SEQ_LOG_VALS more values than we actually need to
	 * cache.  (These will then be usable without logging.)
932
	 *
933 934 935 936
	 * If this is the first nextval after a checkpoint, we must force a new
	 * WAL record to be written anyway, else replay starting from the
	 * checkpoint would fail to advance the sequence past the logged values.
	 * In this case we may as well fetch extra values.
937
	 */
V
Vadim B. Mikheev 已提交
938 939
	if (log < fetch)
	{
940 941
		/* forced log to satisfy local demand for values */
		fetch = log = fetch + SEQ_LOG_VALS;
V
Vadim B. Mikheev 已提交
942 943
		logit = true;
	}
944 945 946 947 948 949 950 951 952 953 954
	else
	{
		XLogRecPtr	redoptr = GetRedoRecPtr();

		if (XLByteLE(PageGetLSN(page), redoptr))
		{
			/* last update of seq was before checkpoint */
			fetch = log = fetch + SEQ_LOG_VALS;
			logit = true;
		}
	}
V
Vadim B. Mikheev 已提交
955

B
Bruce Momjian 已提交
956
	while (fetch)				/* try to fetch cache [+ log ] numbers */
957
	{
958
		/*
B
Bruce Momjian 已提交
959 960
		 * Check MAXVALUE for ascending sequences and MINVALUE for descending
		 * sequences
961
		 */
962
		if (incby > 0)
963
		{
964
			/* ascending sequence */
965 966 967 968
			if ((maxv >= 0 && next > maxv - incby) ||
				(maxv < 0 && next + incby > maxv))
			{
				if (rescnt > 0)
V
Vadim B. Mikheev 已提交
969
					break;		/* stop fetching */
970
				if (!seq->is_cycled)
971
				{
972 973 974 975 976
					have_overflow = true;
				}
				else
				{
					next = minv;
977
				}
978 979 980 981 982 983
			}
			else
				next += incby;
		}
		else
		{
984
			/* descending sequence */
985 986 987 988
			if ((minv < 0 && next < minv - incby) ||
				(minv >= 0 && next + incby < minv))
			{
				if (rescnt > 0)
V
Vadim B. Mikheev 已提交
989
					break;		/* stop fetching */
990
				if (!seq->is_cycled)
991
				{
992 993 994 995 996
					have_overflow = true;
				}
				else
				{
					next = maxv;
997
				}
998 999 1000 1001
			}
			else
				next += incby;
		}
V
Vadim B. Mikheev 已提交
1002 1003 1004 1005 1006 1007
		fetch--;
		if (rescnt < cache)
		{
			log--;
			rescnt++;
			last = next;
B
Bruce Momjian 已提交
1008 1009
			if (rescnt == 1)	/* if it's first result - */
				result = next;	/* it's what to return */
V
Vadim B. Mikheev 已提交
1010
		}
1011 1012
	}

1013 1014 1015
	log -= fetch;				/* adjust for any unfetched numbers */
	Assert(log >= 0);

1016 1017 1018 1019 1020
    /* set results for caller */
	*poverflow = have_overflow; /* has the sequence overflown */
    *plast = result;            /* last returned number */
    *pcached = last;            /* last fetched number */
	*pincrement = incby;
V
Vadim B. Mikheev 已提交
1021

1022 1023 1024
	// Fetch gp_persistent_relation_node information that will be added to XLOG record.
	Assert(seqrel != NULL);
	Sequence_FetchGpRelationNodeForXLog(seqrel);
1025

1026
	START_CRIT_SECTION();
1027

1028 1029
	MarkBufferDirty(buf);

1030 1031
	/* XLOG stuff */
	if (logit && !seqrel->rd_istemp)
V
Vadim B. Mikheev 已提交
1032 1033 1034
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
B
Bruce Momjian 已提交
1035
		XLogRecData rdata[2];
V
Vadim B. Mikheev 已提交
1036

1037
		xlrec.node = seqrel->rd_node;
1038 1039 1040
		xlrec.persistentTid = seqrel->rd_segfile0_relationnodeinfo.persistentTid;
		xlrec.persistentSerialNum = seqrel->rd_segfile0_relationnodeinfo.persistentSerialNum;

B
Bruce Momjian 已提交
1041
		rdata[0].data = (char *) &xlrec;
1042
		rdata[0].len = sizeof(xl_seq_rec);
1043
		rdata[0].buffer = InvalidBuffer;
1044 1045
		rdata[0].next = &(rdata[1]);

1046
		/* set values that will be saved in xlog */
1047
		seq->last_value = next;
1048
		seq->is_called = true;
1049
		seq->log_cnt = 0;
1050

B
Bruce Momjian 已提交
1051 1052 1053
		rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;
		rdata[1].len = ((PageHeader) page)->pd_special -
			((PageHeader) page)->pd_upper;
1054
		rdata[1].buffer = InvalidBuffer;
1055 1056
		rdata[1].next = NULL;

B
Bruce Momjian 已提交
1057
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG | XLOG_NO_TRAN, rdata);
V
Vadim B. Mikheev 已提交
1058

1059
		PageSetLSN(page, recptr);
1060
		PageSetTLI(page, ThisTimeLineID);
1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073

		/* need to update where we've inserted to into shmem so that the QD can flush it
		 * when necessary
		 */
		LWLockAcquire(SeqServerControlLock, LW_EXCLUSIVE);

		if (XLByteLT(seqServerCtl->lastXlogEntry, recptr))
		{
			seqServerCtl->lastXlogEntry.xlogid = recptr.xlogid;
			seqServerCtl->lastXlogEntry.xrecoff = recptr.xrecoff;
		}

		LWLockRelease(SeqServerControlLock);
V
Vadim B. Mikheev 已提交
1074
	}
1075

1076
	/* update on-disk data */
V
Vadim B. Mikheev 已提交
1077
	seq->last_value = last;		/* last fetched number */
1078
	seq->is_called = true;
V
Vadim B. Mikheev 已提交
1079
	seq->log_cnt = log;			/* how much is logged */
1080

1081
	END_CRIT_SECTION();
1082

1083
	UnlockReleaseBuffer(buf);
1084 1085 1086 1087 1088
	
	MIRROREDLOCK_BUFMGR_UNLOCK;
	// -------- MirroredLock ----------
	
}                               /* cdb_sequence_nextval */
1089

1090

1091
Datum
1092
currval_oid(PG_FUNCTION_ARGS)
1093
{
1094 1095
	Oid			relid = PG_GETARG_OID(0);
	int64		result;
1096
	SeqTable	elm;
1097
	Relation	seqrel;
1098

1099 1100 1101 1102 1103 1104 1105 1106
	/* For now, strictly forbidden on MPP. */
	if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE)
	{
		ereport(ERROR,
				(errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED),
				 errmsg("currval() not supported")));
	}

V
Vadim B. Mikheev 已提交
1107
	/* open and AccessShareLock sequence */
1108
	init_sequence(relid, &elm, &seqrel);
1109

1110 1111
	if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_SELECT) != ACLCHECK_OK &&
		pg_class_aclcheck(elm->relid, GetUserId(), ACL_USAGE) != ACLCHECK_OK)
1112 1113
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1114
				 errmsg("permission denied for sequence %s",
1115
						RelationGetRelationName(seqrel))));
1116

1117
	if (elm->increment == 0)	/* nextval/read_info were not called */
1118 1119
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1120
				 errmsg("currval of sequence \"%s\" is not yet defined in this session",
1121
						RelationGetRelationName(seqrel))));
1122 1123 1124

	result = elm->last;

1125 1126
	relation_close(seqrel, NoLock);

1127
	PG_RETURN_INT64(result);
1128 1129
}

1130 1131 1132 1133 1134 1135
Datum
lastval(PG_FUNCTION_ARGS)
{
	Relation	seqrel;
	int64		result;

1136 1137 1138 1139 1140 1141 1142 1143
	/* For now, strictly forbidden on MPP. */
	if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE)
	{
		ereport(ERROR,
				(errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED),
				 errmsg("lastval() not supported")));
	}

1144 1145 1146 1147 1148 1149
	if (last_used_seq == NULL)
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("lastval is not yet defined in this session")));

	/* Someone may have dropped the sequence since the last nextval() */
1150 1151 1152 1153 1154 1155
	if (0 == caql_getcount(
				NULL,
				cql("SELECT COUNT(*) FROM pg_class "
					" WHERE oid = :1 ",
					ObjectIdGetDatum(last_used_seq->relid))))
	{
1156 1157 1158
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("lastval is not yet defined in this session")));
1159
	}
1160

1161
	seqrel = open_share_lock(last_used_seq);
1162 1163 1164 1165

	/* nextval() must have already been called for this sequence */
	Assert(last_used_seq->increment != 0);

1166 1167
	if (pg_class_aclcheck(last_used_seq->relid, GetUserId(), ACL_SELECT) != ACLCHECK_OK &&
		pg_class_aclcheck(last_used_seq->relid, GetUserId(), ACL_USAGE) != ACLCHECK_OK)
1168 1169 1170 1171 1172 1173 1174
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 errmsg("permission denied for sequence %s",
						RelationGetRelationName(seqrel))));

	result = last_used_seq->last;
	relation_close(seqrel, NoLock);
1175

1176 1177 1178
	PG_RETURN_INT64(result);
}

B
Bruce Momjian 已提交
1179
/*
1180 1181 1182 1183
 * Main internal procedure that handles 2 & 3 arg forms of SETVAL.
 *
 * Note that the 3 arg version (which sets the is_called flag) is
 * only for use in pg_dump, and setting the is_called flag may not
B
Bruce Momjian 已提交
1184
 * work if multiple users are attached to the database and referencing
1185 1186
 * the sequence (unlikely if pg_dump is restoring it).
 *
B
Bruce Momjian 已提交
1187
 * It is necessary to have the 3 arg version so that pg_dump can
1188 1189 1190 1191
 * restore the state of a sequence exactly during data-only restores -
 * it is the only way to clear the is_called flag in an existing
 * sequence.
 */
B
Bruce Momjian 已提交
1192
static void
1193
do_setval(Oid relid, int64 next, bool iscalled)
M
 
Marc G. Fournier 已提交
1194
{
1195 1196
	MIRROREDLOCK_BUFMGR_DECLARE;

M
 
Marc G. Fournier 已提交
1197
	SeqTable	elm;
1198
	Relation	seqrel;
1199
	Buffer		buf;
1200
	Form_pg_sequence seq;
M
 
Marc G. Fournier 已提交
1201

1202 1203 1204 1205 1206 1207 1208
	if (Gp_role == GP_ROLE_EXECUTE)
	{
		ereport(ERROR,
				(errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED),
				 errmsg("setval() not supported in this context")));
	}

1209
	/* open and AccessShareLock sequence */
1210
	init_sequence(relid, &elm, &seqrel);
1211 1212

	if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
1213 1214
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1215
				 errmsg("permission denied for sequence %s",
1216
						RelationGetRelationName(seqrel))));
M
 
Marc G. Fournier 已提交
1217

1218
	/* lock page' buffer and read tuple */
1219 1220 1221 1222 1223 1224
	
	// -------- MirroredLock ----------
	MIRROREDLOCK_BUFMGR_LOCK;
	
	seq = read_info(seqrel, &buf);
	elm->increment = seq->increment_by;
M
 
Marc G. Fournier 已提交
1225

1226
	if ((next < seq->min_value) || (next > seq->max_value))
1227
	{
B
Bruce Momjian 已提交
1228 1229 1230 1231
		char		bufv[100],
					bufm[100],
					bufx[100];

1232 1233 1234
		snprintf(bufv, sizeof(bufv), INT64_FORMAT, next);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, seq->min_value);
		snprintf(bufx, sizeof(bufx), INT64_FORMAT, seq->max_value);
1235 1236
		ereport(ERROR,
				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1237
				 errmsg("setval: value %s is out of bounds for sequence \"%s\" (%s..%s)",
1238 1239
						bufv, RelationGetRelationName(seqrel),
						bufm, bufx)));
1240
	}
M
 
Marc G. Fournier 已提交
1241 1242 1243

	/* save info in local cache */
	elm->last = next;			/* last returned number */
B
Bruce Momjian 已提交
1244
	elm->cached = next;			/* last cached number (forget cached values) */
M
 
Marc G. Fournier 已提交
1245

1246 1247 1248 1249
	// Fetch gp_persistent_relation_node information that will be added to XLOG record.
	Assert(seqrel != NULL);
	Sequence_FetchGpRelationNodeForXLog(seqrel);

1250
	START_CRIT_SECTION();
1251

1252 1253
	MarkBufferDirty(buf);

1254 1255
	/* XLOG stuff */
	if (!seqrel->rd_istemp)
V
Vadim B. Mikheev 已提交
1256 1257 1258
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
B
Bruce Momjian 已提交
1259
		XLogRecData rdata[2];
1260
		Page		page = BufferGetPage(buf);
V
Vadim B. Mikheev 已提交
1261

1262
		xlrec.node = seqrel->rd_node;
1263 1264 1265
		xlrec.persistentTid = seqrel->rd_segfile0_relationnodeinfo.persistentTid;
		xlrec.persistentSerialNum = seqrel->rd_segfile0_relationnodeinfo.persistentSerialNum;

B
Bruce Momjian 已提交
1266
		rdata[0].data = (char *) &xlrec;
1267
		rdata[0].len = sizeof(xl_seq_rec);
1268
		rdata[0].buffer = InvalidBuffer;
1269 1270
		rdata[0].next = &(rdata[1]);

1271
		/* set values that will be saved in xlog */
1272
		seq->last_value = next;
1273
		seq->is_called = true;
1274
		seq->log_cnt = 0;
1275

B
Bruce Momjian 已提交
1276 1277 1278
		rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;
		rdata[1].len = ((PageHeader) page)->pd_special -
			((PageHeader) page)->pd_upper;
1279
		rdata[1].buffer = InvalidBuffer;
1280 1281
		rdata[1].next = NULL;

B
Bruce Momjian 已提交
1282
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG | XLOG_NO_TRAN, rdata);
1283 1284

		PageSetLSN(page, recptr);
1285
		PageSetTLI(page, ThisTimeLineID);
V
Vadim B. Mikheev 已提交
1286
	}
1287

1288 1289
	/* save info in sequence relation */
	seq->last_value = next;		/* last fetched number */
1290
	seq->is_called = iscalled;
1291
	seq->log_cnt = (iscalled) ? 0 : 1;
1292

1293
	END_CRIT_SECTION();
M
 
Marc G. Fournier 已提交
1294

1295
	UnlockReleaseBuffer(buf);
1296

1297 1298 1299
	MIRROREDLOCK_BUFMGR_UNLOCK;
	// -------- MirroredLock ----------

1300
	relation_close(seqrel, NoLock);
1301 1302
}

1303 1304 1305 1306
/*
 * Implement the 2 arg setval procedure.
 * See do_setval for discussion.
 */
1307
Datum
1308
setval_oid(PG_FUNCTION_ARGS)
1309
{
1310
	Oid			relid = PG_GETARG_OID(0);
1311
	int64		next = PG_GETARG_INT64(1);
1312

1313
	do_setval(relid, next, true);
1314

1315
	PG_RETURN_INT64(next);
1316 1317
}

1318 1319 1320 1321
/*
 * Implement the 3 arg setval procedure.
 * See do_setval for discussion.
 */
1322
Datum
1323
setval3_oid(PG_FUNCTION_ARGS)
1324
{
1325
	Oid			relid = PG_GETARG_OID(0);
1326
	int64		next = PG_GETARG_INT64(1);
1327 1328
	bool		iscalled = PG_GETARG_BOOL(2);

1329
	do_setval(relid, next, iscalled);
1330

1331
	PG_RETURN_INT64(next);
M
 
Marc G. Fournier 已提交
1332 1333
}

1334

1335
/*
1336 1337
 * Open the sequence and acquire AccessShareLock if needed
 *
1338
 * If we haven't touched the sequence already in this transaction,
B
Bruce Momjian 已提交
1339
 * we need to acquire AccessShareLock.	We arrange for the lock to
1340 1341 1342
 * be owned by the top transaction, so that we don't need to do it
 * more than once per xact.
 */
1343 1344
static Relation
open_share_lock(SeqTable seq)
1345 1346 1347
{
	TransactionId thisxid = GetTopTransactionId();

1348
	/* Get the lock if not already held in this xact */
1349 1350 1351 1352 1353 1354 1355 1356
	if (seq->xid != thisxid)
	{
		ResourceOwner currentOwner;

		currentOwner = CurrentResourceOwner;
		PG_TRY();
		{
			CurrentResourceOwner = TopTransactionResourceOwner;
1357
			LockRelationOid(seq->relid, AccessShareLock);
1358 1359 1360 1361 1362 1363 1364 1365 1366 1367
		}
		PG_CATCH();
		{
			/* Ensure CurrentResourceOwner is restored on error */
			CurrentResourceOwner = currentOwner;
			PG_RE_THROW();
		}
		PG_END_TRY();
		CurrentResourceOwner = currentOwner;

1368
		/* Flag that we have a lock in the current xact */
1369 1370
		seq->xid = thisxid;
	}
1371 1372 1373

	/* We now know we have AccessShareLock, and can safely open the rel */
	return relation_open(seq->relid, NoLock);
1374 1375
}

1376
/*
1377
 * Given a relation OID, open and lock the sequence.  p_elm and p_rel are
1378 1379 1380
 * output parameters.
 */
static void
1381
init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel)
1382
{
B
Bruce Momjian 已提交
1383
	SeqTable	elm;
1384
	Relation	seqrel;
1385

1386 1387 1388 1389 1390 1391 1392
	/* Look to see if we already have a seqtable entry for relation */
	for (elm = seqtab; elm != NULL; elm = elm->next)
	{
		if (elm->relid == relid)
			break;
	}

1393
	/*
1394
	 * Allocate new seqtable entry if we didn't find one.
1395
	 *
B
Bruce Momjian 已提交
1396 1397 1398
	 * NOTE: seqtable entries remain in the list for the life of a backend. If
	 * the sequence itself is deleted then the entry becomes wasted memory,
	 * but it's small enough that this should not matter.
B
Bruce Momjian 已提交
1399
	 */
1400
	if (elm == NULL)
1401
	{
1402
		/*
B
Bruce Momjian 已提交
1403 1404
		 * Time to make a new seqtable entry.  These entries live as long as
		 * the backend does, so we use plain malloc for them.
1405 1406
		 */
		elm = (SeqTable) malloc(sizeof(SeqTableData));
T
Tom Lane 已提交
1407
		if (elm == NULL)
1408 1409 1410
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
1411
		elm->relid = relid;
1412
		elm->xid = InvalidTransactionId;
1413 1414 1415 1416
		/* increment is set to 0 until we do read_info (see currval) */
		elm->last = elm->cached = elm->increment = 0;
		elm->next = seqtab;
		seqtab = elm;
1417 1418
	}

1419 1420 1421 1422 1423 1424 1425 1426 1427 1428
	/*
	 * Open the sequence relation.
	 */
	seqrel = open_share_lock(elm);

	if (seqrel->rd_rel->relkind != RELKIND_SEQUENCE)
		ereport(ERROR,
				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
				 errmsg("\"%s\" is not a sequence",
						RelationGetRelationName(seqrel))));
1429 1430 1431

	*p_elm = elm;
	*p_rel = seqrel;
1432 1433 1434
}


1435 1436
/* Given an opened relation, lock the page buffer and find the tuple */
static Form_pg_sequence
1437
read_info(Relation rel, Buffer *buf)
1438
{
1439 1440 1441 1442 1443
	PageHeader	page;
	ItemId		lp;
	HeapTupleData tuple;
	sequence_magic *sm;
	Form_pg_sequence seq;
1444

1445 1446
	MIRROREDLOCK_BUFMGR_MUST_ALREADY_BE_HELD;

1447 1448 1449 1450 1451 1452 1453
	*buf = ReadBuffer(rel, 0);
	LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);

	page = (PageHeader) BufferGetPage(*buf);
	sm = (sequence_magic *) PageGetSpecialPointer(page);

	if (sm->magic != SEQ_MAGIC)
1454 1455
		elog(ERROR, "bad magic number in sequence \"%s\": %08X",
			 RelationGetRelationName(rel), sm->magic);
1456 1457 1458 1459 1460 1461 1462 1463

	lp = PageGetItemId(page, FirstOffsetNumber);
	Assert(ItemIdIsUsed(lp));
	tuple.t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);

	seq = (Form_pg_sequence) GETSTRUCT(&tuple);

	return seq;
1464 1465
}

1466 1467
/*
 * init_params: process the options list of CREATE or ALTER SEQUENCE,
1468 1469
 * and store the values into appropriate fields of *new.  Also set
 * *owned_by to any OWNED BY option, or to NIL if there is none.
1470 1471 1472 1473
 *
 * If isInit is true, fill any unspecified options with default values;
 * otherwise, do not change existing options that aren't explicitly overridden.
 */
1474
static void
1475 1476
init_params(List *options, bool isInit,
			Form_pg_sequence new, List **owned_by)
1477
{
1478 1479 1480 1481 1482
	DefElem    *last_value = NULL;
	DefElem    *increment_by = NULL;
	DefElem    *max_value = NULL;
	DefElem    *min_value = NULL;
	DefElem    *cache_value = NULL;
1483
	DefElem    *is_cycled = NULL;
1484
	ListCell   *option;
1485

1486 1487
	*owned_by = NIL;

B
Bruce Momjian 已提交
1488
	foreach(option, options)
1489
	{
1490
		DefElem    *defel = (DefElem *) lfirst(option);
1491

1492
		if (strcmp(defel->defname, "increment") == 0)
1493 1494
		{
			if (increment_by)
1495 1496 1497
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1498
			increment_by = defel;
1499
		}
B
Bruce Momjian 已提交
1500

B
Bruce Momjian 已提交
1501
		/*
B
Bruce Momjian 已提交
1502
		 * start is for a new sequence restart is for alter
B
Bruce Momjian 已提交
1503
		 */
1504 1505
		else if (strcmp(defel->defname, "start") == 0 ||
				 strcmp(defel->defname, "restart") == 0)
1506 1507
		{
			if (last_value)
1508 1509 1510
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1511
			last_value = defel;
1512
		}
1513
		else if (strcmp(defel->defname, "maxvalue") == 0)
1514 1515
		{
			if (max_value)
1516 1517 1518
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1519
			max_value = defel;
1520
		}
1521
		else if (strcmp(defel->defname, "minvalue") == 0)
1522 1523
		{
			if (min_value)
1524 1525 1526
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1527
			min_value = defel;
1528
		}
1529
		else if (strcmp(defel->defname, "cache") == 0)
1530 1531
		{
			if (cache_value)
1532 1533 1534
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1535
			cache_value = defel;
1536
		}
1537
		else if (strcmp(defel->defname, "cycle") == 0)
1538
		{
1539
			if (is_cycled)
1540 1541 1542
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1543
			is_cycled = defel;
1544
		}
1545 1546 1547 1548 1549 1550 1551 1552
		else if (strcmp(defel->defname, "owned_by") == 0)
		{
			if (*owned_by)
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
			*owned_by = defGetQualifiedName(defel);
		}
1553
		else
1554
			elog(ERROR, "option \"%s\" not recognized",
1555 1556 1557
				 defel->defname);
	}

B
Bruce Momjian 已提交
1558
	/* INCREMENT BY */
1559
	if (increment_by != NULL)
B
Bruce Momjian 已提交
1560 1561
	{
		new->increment_by = defGetInt64(increment_by);
1562 1563 1564
		if (new->increment_by == 0)
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1565
					 errmsg("INCREMENT must not be zero")));
B
Bruce Momjian 已提交
1566
	}
1567 1568 1569 1570
	else if (isInit)
		new->increment_by = 1;

	/* CYCLE */
1571
	if (is_cycled != NULL)
1572 1573 1574 1575 1576 1577
	{
		new->is_cycled = intVal(is_cycled->arg);
		Assert(new->is_cycled == false || new->is_cycled == true);
	}
	else if (isInit)
		new->is_cycled = false;
1578

1579
	/* MAXVALUE (null arg means NO MAXVALUE) */
1580
	if (max_value != NULL && max_value->arg)
1581
		new->max_value = defGetInt64(max_value);
1582
	else if (isInit || max_value != NULL)
1583
	{
1584
		if (new->increment_by > 0)
B
Bruce Momjian 已提交
1585
			new->max_value = SEQ_MAXVALUE;		/* ascending seq */
1586
		else
B
Bruce Momjian 已提交
1587
			new->max_value = -1;	/* descending seq */
1588
	}
1589

1590
	/* MINVALUE (null arg means NO MINVALUE) */
1591
	if (min_value != NULL && min_value->arg)
1592
		new->min_value = defGetInt64(min_value);
1593
	else if (isInit || min_value != NULL)
1594
	{
1595
		if (new->increment_by > 0)
B
Bruce Momjian 已提交
1596
			new->min_value = 1; /* ascending seq */
1597
		else
B
Bruce Momjian 已提交
1598
			new->min_value = SEQ_MINVALUE;		/* descending seq */
1599
	}
1600

1601
	/* crosscheck min/max */
1602
	if (new->min_value >= new->max_value)
1603
	{
B
Bruce Momjian 已提交
1604 1605 1606
		char		bufm[100],
					bufx[100];

1607 1608
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->min_value);
		snprintf(bufx, sizeof(bufx), INT64_FORMAT, new->max_value);
1609 1610 1611 1612
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("MINVALUE (%s) must be less than MAXVALUE (%s)",
						bufm, bufx)));
1613
	}
1614

B
Bruce Momjian 已提交
1615
	/* START WITH */
1616
	if (last_value != NULL)
1617
	{
1618
		new->last_value = defGetInt64(last_value);
1619 1620 1621
		new->is_called = false;
		new->log_cnt = 1;
	}
1622
	else if (isInit)
1623
	{
1624 1625 1626 1627
		if (new->increment_by > 0)
			new->last_value = new->min_value;	/* ascending seq */
		else
			new->last_value = new->max_value;	/* descending seq */
1628 1629
		new->is_called = false;
		new->log_cnt = 1;
1630
	}
1631

1632
	/* crosscheck */
1633
	if (new->last_value < new->min_value)
1634
	{
B
Bruce Momjian 已提交
1635 1636 1637
		char		bufs[100],
					bufm[100];

1638 1639
		snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->last_value);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->min_value);
1640 1641
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
B
Bruce Momjian 已提交
1642 1643
				 errmsg("START value (%s) can't be less than MINVALUE (%s)",
						bufs, bufm)));
1644
	}
1645
	if (new->last_value > new->max_value)
1646
	{
B
Bruce Momjian 已提交
1647 1648 1649
		char		bufs[100],
					bufm[100];

1650 1651
		snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->last_value);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->max_value);
1652 1653
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
B
Bruce Momjian 已提交
1654 1655
			   errmsg("START value (%s) can't be greater than MAXVALUE (%s)",
					  bufs, bufm)));
1656
	}
1657

B
Bruce Momjian 已提交
1658
	/* CACHE */
1659
	if (cache_value != NULL)
1660
	{
1661 1662 1663 1664
		new->cache_value = defGetInt64(cache_value);
		if (new->cache_value <= 0)
		{
			char		buf[100];
B
Bruce Momjian 已提交
1665

1666 1667 1668 1669 1670 1671
			snprintf(buf, sizeof(buf), INT64_FORMAT, new->cache_value);
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					 errmsg("CACHE (%s) must be greater than zero",
							buf)));
		}
1672
	}
1673 1674
	else if (isInit)
		new->cache_value = 1;
1675 1676
}

1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700
/*
 * Process an OWNED BY option for CREATE/ALTER SEQUENCE
 *
 * Ownership permissions on the sequence are already checked,
 * but if we are establishing a new owned-by dependency, we must
 * enforce that the referenced table has the same owner and namespace
 * as the sequence.
 */
static void
process_owned_by(Relation seqrel, List *owned_by)
{
	int			nnames;
	Relation	tablerel;
	AttrNumber	attnum;

	nnames = list_length(owned_by);
	Assert(nnames > 0);
	if (nnames == 1)
	{
		/* Must be OWNED BY NONE */
		if (strcmp(strVal(linitial(owned_by)), "none") != 0)
			ereport(ERROR,
					(errcode(ERRCODE_SYNTAX_ERROR),
					 errmsg("invalid OWNED BY option"),
B
Bruce Momjian 已提交
1701
				errhint("Specify OWNED BY table.column or OWNED BY NONE.")));
1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729
		tablerel = NULL;
		attnum = 0;
	}
	else
	{
		List	   *relname;
		char	   *attrname;
		RangeVar   *rel;

		/* Separate relname and attr name */
		relname = list_truncate(list_copy(owned_by), nnames - 1);
		attrname = strVal(lfirst(list_tail(owned_by)));

		/* Open and lock rel to ensure it won't go away meanwhile */
		rel = makeRangeVarFromNameList(relname);
		tablerel = relation_openrv(rel, AccessShareLock);

		/* Must be a regular table */
		if (tablerel->rd_rel->relkind != RELKIND_RELATION)
			ereport(ERROR,
					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
					 errmsg("referenced relation \"%s\" is not a table",
							RelationGetRelationName(tablerel))));

		/* We insist on same owner and schema */
		if (seqrel->rd_rel->relowner != tablerel->rd_rel->relowner)
			ereport(ERROR,
					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
P
Peter Eisentraut 已提交
1730
			errmsg("sequence must have same owner as table it is linked to")));
1731 1732 1733
		if (RelationGetNamespace(seqrel) != RelationGetNamespace(tablerel))
			ereport(ERROR,
					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
P
Peter Eisentraut 已提交
1734
					 errmsg("sequence must be in same schema as table it is linked to")));
1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745

		/* Now, fetch the attribute number from the system cache */
		attnum = get_attnum(RelationGetRelid(tablerel), attrname);
		if (attnum == InvalidAttrNumber)
			ereport(ERROR,
					(errcode(ERRCODE_UNDEFINED_COLUMN),
					 errmsg("column \"%s\" of relation \"%s\" does not exist",
							attrname, RelationGetRelationName(tablerel))));
	}

	/*
B
Bruce Momjian 已提交
1746 1747
	 * OK, we are ready to update pg_depend.  First remove any existing AUTO
	 * dependencies for the sequence, then optionally add a new one.
1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769
	 */
	markSequenceUnowned(RelationGetRelid(seqrel));

	if (tablerel)
	{
		ObjectAddress refobject,
					depobject;

		refobject.classId = RelationRelationId;
		refobject.objectId = RelationGetRelid(tablerel);
		refobject.objectSubId = attnum;
		depobject.classId = RelationRelationId;
		depobject.objectId = RelationGetRelid(seqrel);
		depobject.objectSubId = 0;
		recordDependencyOn(&depobject, &refobject, DEPENDENCY_AUTO);
	}

	/* Done, but hold lock until commit */
	if (tablerel)
		relation_close(tablerel, NoLock);
}

V
Vadim B. Mikheev 已提交
1770

B
Bruce Momjian 已提交
1771
void
1772
seq_redo(XLogRecPtr beginLoc, XLogRecPtr lsn, XLogRecord *record)
V
Vadim B. Mikheev 已提交
1773
{
1774 1775
	MIRROREDLOCK_BUFMGR_DECLARE;

B
Bruce Momjian 已提交
1776 1777 1778 1779 1780 1781 1782
	uint8		info = record->xl_info & ~XLR_INFO_MASK;
	Relation	reln;
	Buffer		buffer;
	Page		page;
	char	   *item;
	Size		itemsz;
	xl_seq_rec *xlrec = (xl_seq_rec *) XLogRecGetData(record);
1783
	sequence_magic *sm;
V
Vadim B. Mikheev 已提交
1784

1785
	if (info != XLOG_SEQ_LOG)
1786
		elog(PANIC, "seq_redo: unknown op code %u", info);
V
Vadim B. Mikheev 已提交
1787

1788
	reln = XLogOpenRelation(xlrec->node);
1789 1790 1791 1792
	
	// -------- MirroredLock ----------
	MIRROREDLOCK_BUFMGR_LOCK;
	
1793 1794
	buffer = XLogReadBuffer(reln, 0, true);
	Assert(BufferIsValid(buffer));
V
Vadim B. Mikheev 已提交
1795 1796
	page = (Page) BufferGetPage(buffer);

1797 1798
	/* Always reinit the page and reinstall the magic number */
	/* See comments in DefineSequence */
1799 1800 1801
	PageInit((Page) page, BufferGetPageSize(buffer), sizeof(sequence_magic));
	sm = (sequence_magic *) PageGetSpecialPointer(page);
	sm->magic = SEQ_MAGIC;
V
Vadim B. Mikheev 已提交
1802

B
Bruce Momjian 已提交
1803
	item = (char *) xlrec + sizeof(xl_seq_rec);
1804 1805
	itemsz = record->xl_len - sizeof(xl_seq_rec);
	itemsz = MAXALIGN(itemsz);
B
Bruce Momjian 已提交
1806
	if (PageAddItem(page, (Item) item, itemsz,
1807
					FirstOffsetNumber, LP_USED) == InvalidOffsetNumber)
1808
		elog(PANIC, "seq_redo: failed to add item to page");
V
Vadim B. Mikheev 已提交
1809 1810

	PageSetLSN(page, lsn);
1811
	PageSetTLI(page, ThisTimeLineID);
1812 1813
	MarkBufferDirty(buffer);
	UnlockReleaseBuffer(buffer);
1814 1815 1816 1817
	
	MIRROREDLOCK_BUFMGR_UNLOCK;
	// -------- MirroredLock ----------
	
V
Vadim B. Mikheev 已提交
1818 1819
}

B
Bruce Momjian 已提交
1820
void
1821
seq_desc(StringInfo buf, XLogRecPtr beginLoc, XLogRecord *record)
V
Vadim B. Mikheev 已提交
1822
{
1823 1824
	uint8		info = record->xl_info & ~XLR_INFO_MASK;
	char		*rec = XLogRecGetData(record);
B
Bruce Momjian 已提交
1825
	xl_seq_rec *xlrec = (xl_seq_rec *) rec;
V
Vadim B. Mikheev 已提交
1826 1827

	if (info == XLOG_SEQ_LOG)
1828
		appendStringInfo(buf, "log: ");
V
Vadim B. Mikheev 已提交
1829 1830
	else
	{
1831
		appendStringInfo(buf, "UNKNOWN");
V
Vadim B. Mikheev 已提交
1832 1833 1834
		return;
	}

1835
	appendStringInfo(buf, "rel %u/%u/%u",
B
Bruce Momjian 已提交
1836
			   xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode);
V
Vadim B. Mikheev 已提交
1837
}
1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939


/*
 * Initialize a pseudo relcache entry with just enough info to call bufmgr.
 */
static void
cdb_sequence_relation_init(Relation seqrel,
                           Oid      tablespaceid,
                           Oid      dbid,
                           Oid      relid,
                           bool     istemp)
{
    /* See RelationBuildDesc in relcache.c */
    memset(seqrel, 0, sizeof(*seqrel));

    seqrel->rd_smgr = NULL;
    seqrel->rd_refcnt = 99;

    seqrel->rd_id = relid;
    seqrel->rd_istemp = istemp;

    /* Must use shared buffer pool so seqserver & QDs can see the data. */
    seqrel->rd_isLocalBuf = false;

	seqrel->rd_rel = (Form_pg_class)palloc0(CLASS_TUPLE_SIZE);
    sprintf(seqrel->rd_rel->relname.data, "pg_class.oid=%d", relid);

    /* as in RelationInitPhysicalAddr... */
    seqrel->rd_node.spcNode = tablespaceid;
    seqrel->rd_node.dbNode = dbid;
    seqrel->rd_node.relNode = relid;
}                               /* cdb_sequence_relation_init */

/*
 * Clean up pseudo relcache entry.
 */
static void
cdb_sequence_relation_term(Relation seqrel)
{
    /* Close the file. */
    RelationCloseSmgr(seqrel);

    if (seqrel->rd_rel)
        pfree(seqrel->rd_rel);
}                               /* cdb_sequence_relation_term */



/*
 * CDB: forward a nextval request from qExec to the sequence server
 */
void
cdb_sequence_nextval_proxy(Relation	seqrel,
                           int64   *plast,
                           int64   *pcached,
                           int64   *pincrement,
                           bool    *poverflow)
{

	sendSequenceRequest(GetSeqServerFD(),
						seqrel,
    					gp_session_id,
    					plast,
    					pcached,
    					pincrement,
    					poverflow);

}                               /* cdb_sequence_server_nextval */


/*
 * CDB: nextval entry point called by sequence server
 */
void
cdb_sequence_nextval_server(Oid    tablespaceid,
                            Oid    dbid,
                            Oid    relid,
                            bool   istemp,
                            int64 *plast,
                            int64 *pcached,
                            int64 *pincrement,
                            bool  *poverflow)
{
    RelationData    fakerel;
	Relation	    seqrel = &fakerel;

    *plast = 0;
    *pcached = 0;
    *pincrement = 0;

    /* Build a pseudo relcache entry with just enough info to call bufmgr. */
    seqrel = &fakerel;
    cdb_sequence_relation_init(seqrel, tablespaceid, dbid, relid, istemp);

    /* CDB TODO: Catch errors. */

    /* Update the sequence object. */
    cdb_sequence_nextval(seqrel, plast, pcached, pincrement, poverflow);

    /* Cleanup. */
    cdb_sequence_relation_term(seqrel);
}                               /* cdb_sequence_server_nextval */