sequence.c 53.5 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * sequence.c
4
 *	  PostgreSQL sequences support code.
5
 *
6 7
 * Portions Copyright (c) 2005-2008, Greenplum inc.
 * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
8 9 10 11
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
12
 *	  $PostgreSQL: pgsql/src/backend/commands/sequence.c,v 1.149 2008/01/01 19:45:49 momjian Exp $
13
 *
14 15
 *-------------------------------------------------------------------------
 */
16
#include "postgres.h"
17

18
#include "access/heapam.h"
19 20
#include "access/transam.h"
#include "access/xact.h"
21
#include "catalog/dependency.h"
22
#include "catalog/heap.h"
23
#include "catalog/namespace.h"
24
#include "catalog/pg_type.h"
25
#include "commands/defrem.h"
26
#include "commands/sequence.h"
27
#include "commands/tablecmds.h"
B
Bruce Momjian 已提交
28
#include "miscadmin.h"
29
#include "storage/smgr.h"               /* RelationCloseSmgr -> smgrclose */
30
#include "nodes/makefuncs.h"
31
#include "storage/proc.h"
32
#include "utils/acl.h"
B
Bruce Momjian 已提交
33
#include "utils/builtins.h"
34
#include "utils/formatting.h"
35
#include "utils/lsyscache.h"
36
#include "utils/resowner.h"
37
#include "utils/syscache.h"
38

39
#include "cdb/cdbdisp_query.h"
H
Heikki Linnakangas 已提交
40
#include "cdb/cdbdoublylinked.h"
41 42 43 44 45 46 47 48 49
#include "cdb/cdbsrlz.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbmotion.h"
#include "cdb/ml_ipc.h"

#include "cdb/cdbpersistentfilesysobj.h"

#include "postmaster/seqserver.h"

50

V
Vadim B. Mikheev 已提交
51
/*
52
 * We don't want to log each fetching of a value from a sequence,
V
Vadim B. Mikheev 已提交
53
 * so we pre-log a few fetches in advance. In the event of
54
 * crash we can lose (skip over) as many values as we pre-logged.
V
Vadim B. Mikheev 已提交
55
 */
B
Bruce Momjian 已提交
56
#define SEQ_LOG_VALS	32
57

58 59 60 61 62
/*
 * The "special area" of a sequence's buffer page looks like this.
 */
#define SEQ_MAGIC	  0x1717

63 64
typedef struct sequence_magic
{
65
	uint32		magic;
66
} sequence_magic;
67

68 69 70 71 72 73
/*
 * We store a SeqTable item for every sequence we have touched in the current
 * session.  This is needed to hold onto nextval/currval state.  (We can't
 * rely on the relcache, since it's only, well, a cache, and may decide to
 * discard entries.)
 *
B
Bruce Momjian 已提交
74
 * XXX We use linear search to find pre-existing SeqTable entries.	This is
75 76 77
 * good when only a small number of sequences are touched in a session, but
 * would suck with many different sequences.  Perhaps use a hashtable someday.
 */
78 79
typedef struct SeqTableData
{
80 81
	struct SeqTableData *next;	/* link to next SeqTable object */
	Oid			relid;			/* pg_class OID of this sequence */
82
	LocalTransactionId lxid;	/* xact in which we last did a seq op */
83
	bool		last_valid;		/* do we have a valid "last" value? */
84 85 86 87
	int64		last;			/* value last returned by nextval */
	int64		cached;			/* last value already cached for nextval */
	/* if last != cached, we have not used up all the cached values */
	int64		increment;		/* copy of sequence's increment field */
88
	/* note that increment is zero until we first do read_seq_tuple() */
89
} SeqTableData;
90 91 92

typedef SeqTableData *SeqTable;

93
static SeqTable seqtab = NULL;	/* Head of list of SeqTable items */
94

95 96 97 98 99
/*
 * last_used_seq is updated by nextval() to point to the last used
 * sequence.
 */
static SeqTableData *last_used_seq = NULL;
100

101
static int64 nextval_internal(Oid relid);
102
static Relation open_share_lock(SeqTable seq);
103
static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel);
104 105
static Form_pg_sequence read_seq_tuple(SeqTable elm, Relation rel,
			   Buffer *buf, HeapTuple seqtuple);
106
static void init_params(List *options, bool isInit,
B
Bruce Momjian 已提交
107
			Form_pg_sequence new, List **owned_by);
108
static void do_setval(Oid relid, int64 next, bool iscalled);
109 110
static void process_owned_by(Relation seqrel, List *owned_by);

111
static void
112 113
cdb_sequence_nextval(SeqTable elm,
					 Relation   seqrel,
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
                     int64     *plast,
                     int64     *pcached,
                     int64     *pincrement,
                     bool      *seq_overflow);
static void
cdb_sequence_nextval_proxy(Relation seqrel,
                           int64   *plast,
                           int64   *pcached,
                           int64   *pincrement,
                           bool    *poverflow);

typedef struct SequencePersistentInfoCacheEntryKey
{
	RelFileNode				relFileNode;
} SequencePersistentInfoCacheEntryKey;

typedef struct SequencePersistentInfoCacheEntryData
{
	SequencePersistentInfoCacheEntryKey	key;

	ItemPointerData		persistentTid;

	int64				persistentSerialNum;

	DoubleLinks			lruLinks;

} SequencePersistentInfoCacheEntryData;
typedef SequencePersistentInfoCacheEntryData *SequencePersistentInfoCacheEntry;

static HTAB *sequencePersistentInfoCacheTable = NULL;

static DoublyLinkedHead	sequencePersistentInfoCacheLruListHead;

static int sequencePersistentInfoCacheLruCount = 0;

static int sequencePersistentInfoCacheLruLimit = 100;

static void
Sequence_PersistentInfoCacheTableInit(void)
{
	HASHCTL			info;
	int				hash_flags;

	/* Set key and entry sizes. */
	MemSet(&info, 0, sizeof(info));
	info.keysize = sizeof(SequencePersistentInfoCacheEntryKey);
	info.entrysize = sizeof(SequencePersistentInfoCacheEntryData);
	info.hash = tag_hash;

	hash_flags = (HASH_ELEM | HASH_FUNCTION);

	sequencePersistentInfoCacheTable = hash_create("Sequence Persistent Info", 10, &info, hash_flags);

	DoublyLinkedHead_Init(
				&sequencePersistentInfoCacheLruListHead);
}

static bool Sequence_CheckPersistentInfoCache(
	RelFileNode 		*relFileNode,

	ItemPointer			persistentTid,

	int64				*persistentSerialNum)
{
	SequencePersistentInfoCacheEntryKey	key;

	SequencePersistentInfoCacheEntry persistentInfoCacheEntry;

	bool found;

	if (sequencePersistentInfoCacheTable == NULL)
		Sequence_PersistentInfoCacheTableInit();

	MemSet(&key, 0, sizeof(SequencePersistentInfoCacheEntryKey));
	key.relFileNode = *relFileNode;

	persistentInfoCacheEntry = 
		(SequencePersistentInfoCacheEntry) 
						hash_search(sequencePersistentInfoCacheTable,
									(void *) &key,
									HASH_FIND,
									&found);
	if (!found)
		return false;
	
	*persistentTid = persistentInfoCacheEntry->persistentTid;
	*persistentSerialNum = persistentInfoCacheEntry->persistentSerialNum;

	/*
	 * LRU.
	 */
	DoubleLinks_Remove(
		offsetof(SequencePersistentInfoCacheEntryData, lruLinks),
		&sequencePersistentInfoCacheLruListHead,
		persistentInfoCacheEntry);

	DoublyLinkedHead_AddFirst(
		offsetof(SequencePersistentInfoCacheEntryData, lruLinks),
		&sequencePersistentInfoCacheLruListHead,
		persistentInfoCacheEntry);

	return true;	
}

static void Sequence_AddPersistentInfoCache(
	RelFileNode 		*relFileNode,

	ItemPointer			persistentTid,

	int64				persistentSerialNum)
{
	SequencePersistentInfoCacheEntryKey	key;

	SequencePersistentInfoCacheEntry persistentInfoCacheEntry;

	bool found;

	if (sequencePersistentInfoCacheTable == NULL)
		Sequence_PersistentInfoCacheTableInit();

	MemSet(&key, 0, sizeof(SequencePersistentInfoCacheEntryKey));
	key.relFileNode = *relFileNode;

	persistentInfoCacheEntry = 
		(SequencePersistentInfoCacheEntry) 
						hash_search(
								sequencePersistentInfoCacheTable,
								(void *) &key,
								HASH_ENTER,
								&found);
	Assert (!found);
	
	persistentInfoCacheEntry->persistentTid = *persistentTid;
	persistentInfoCacheEntry->persistentSerialNum = persistentSerialNum;

	DoubleLinks_Init(&persistentInfoCacheEntry->lruLinks);

	/*
	 * LRU.
	 */
	DoublyLinkedHead_AddFirst(
		offsetof(SequencePersistentInfoCacheEntryData, lruLinks),
		&sequencePersistentInfoCacheLruListHead,
		persistentInfoCacheEntry);

	sequencePersistentInfoCacheLruCount++;

	if (sequencePersistentInfoCacheLruCount > sequencePersistentInfoCacheLruLimit)
	{
		SequencePersistentInfoCacheEntry lastPersistentInfoCacheEntry;

		lastPersistentInfoCacheEntry = 
			(SequencePersistentInfoCacheEntry) 
							DoublyLinkedHead_Last(
								offsetof(SequencePersistentInfoCacheEntryData, lruLinks),
								&sequencePersistentInfoCacheLruListHead);
		Assert(lastPersistentInfoCacheEntry != NULL);
		
		DoubleLinks_Remove(
			offsetof(SequencePersistentInfoCacheEntryData, lruLinks),
			&sequencePersistentInfoCacheLruListHead,
			lastPersistentInfoCacheEntry);
		
		if (Debug_persistent_print)
			elog(Persistent_DebugPrintLevel(), 
				 "Removed cached persistent information for sequence %u/%u/%u -- serial number " INT64_FORMAT ", TID %s",
				 lastPersistentInfoCacheEntry->key.relFileNode.spcNode,
				 lastPersistentInfoCacheEntry->key.relFileNode.dbNode,
				 lastPersistentInfoCacheEntry->key.relFileNode.relNode,
				 lastPersistentInfoCacheEntry->persistentSerialNum,
				 ItemPointerToString(&lastPersistentInfoCacheEntry->persistentTid));

		hash_search(
				sequencePersistentInfoCacheTable, 
				(void *) &lastPersistentInfoCacheEntry->key, 
				HASH_REMOVE, 
				NULL);
		
		sequencePersistentInfoCacheLruCount--;
	}
}


static void
Sequence_FetchGpRelationNodeForXLog(Relation rel)
{
	if (rel->rd_segfile0_relationnodeinfo.isPresent)
		return;

	/*
	 * For better performance, we cache the persistent information
	 * for sequences with upper bound and use LRU...
	 */
	if (Sequence_CheckPersistentInfoCache(
								&rel->rd_node,
								&rel->rd_segfile0_relationnodeinfo.persistentTid,
								&rel->rd_segfile0_relationnodeinfo.persistentSerialNum))
	{
		if (Debug_persistent_print)
			elog(Persistent_DebugPrintLevel(), 
				 "Found cached persistent information for sequence %u/%u/%u -- serial number " INT64_FORMAT ", TID %s",
				 rel->rd_node.spcNode,
				 rel->rd_node.dbNode,
				 rel->rd_node.relNode,
				 rel->rd_segfile0_relationnodeinfo.persistentSerialNum,
				 ItemPointerToString(&rel->rd_segfile0_relationnodeinfo.persistentTid));
	} 
	else 
	{
		if (!PersistentFileSysObj_ScanForRelation(
												&rel->rd_node,
												/* segmentFileNum */ 0,
												&rel->rd_segfile0_relationnodeinfo.persistentTid,
												&rel->rd_segfile0_relationnodeinfo.persistentSerialNum))
		{
			elog(ERROR, "Cound not find persistent information for sequence %u/%u/%u",
			     rel->rd_node.spcNode,
			     rel->rd_node.dbNode,
			     rel->rd_node.relNode);
		}

		Sequence_AddPersistentInfoCache(
								&rel->rd_node,
								&rel->rd_segfile0_relationnodeinfo.persistentTid,
								rel->rd_segfile0_relationnodeinfo.persistentSerialNum);

		if (Debug_persistent_print)
			elog(Persistent_DebugPrintLevel(), 
				 "Add cached persistent information for sequence %u/%u/%u -- serial number " INT64_FORMAT ", TID %s",
				 rel->rd_node.spcNode,
				 rel->rd_node.dbNode,
				 rel->rd_node.relNode,
				 rel->rd_segfile0_relationnodeinfo.persistentSerialNum,
				 ItemPointerToString(&rel->rd_segfile0_relationnodeinfo.persistentTid));
	}

350
	if (!Persistent_BeforePersistenceWork() &&
351 352 353 354 355 356 357 358 359 360 361 362 363
		PersistentStore_IsZeroTid(&rel->rd_segfile0_relationnodeinfo.persistentTid))
	{	
		elog(ERROR, 
			 "Sequence_FetchGpRelationNodeForXLog has invalid TID (0,0) for relation %u/%u/%u '%s', serial number " INT64_FORMAT,
			 rel->rd_node.spcNode,
			 rel->rd_node.dbNode,
			 rel->rd_node.relNode,
			 NameStr(rel->rd_rel->relname),
			 rel->rd_segfile0_relationnodeinfo.persistentSerialNum);
	}

	rel->rd_segfile0_relationnodeinfo.isPresent = true;
}
364 365

/*
B
Bruce Momjian 已提交
366
 * DefineSequence
367
 *				Creates a new sequence relation
368 369
 */
void
370
DefineSequence(CreateSeqStmt *seq)
371
{
372 373
	MIRROREDLOCK_BUFMGR_DECLARE;

374
	FormData_pg_sequence new;
375
	List	   *owned_by;
376
	CreateStmt *stmt = makeNode(CreateStmt);
377
	Oid			seqoid;
378 379 380
	Relation	rel;
	Buffer		buf;
	PageHeader	page;
381
	sequence_magic *sm;
382 383 384
	HeapTuple	tuple;
	TupleDesc	tupDesc;
	Datum		value[SEQ_COL_LASTCOL];
385
	bool		null[SEQ_COL_LASTCOL];
386
	int			i;
387
	NameData	name;
388

389 390
	bool shouldDispatch =  Gp_role == GP_ROLE_DISPATCH && !IsBootstrapProcessingMode();

391
	/* Check and set all option values */
392
	init_params(seq->options, true, &new, &owned_by);
393 394

	/*
395
	 * Create relation (and fill *null & *value)
396
	 */
397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417
	stmt->oidInfo.relOid = 0;
	stmt->oidInfo.comptypeOid = 0;
	stmt->oidInfo.toastOid = 0;
	stmt->oidInfo.toastIndexOid = 0;
	stmt->oidInfo.aosegOid = 0;
	stmt->oidInfo.aoblkdirOid = 0;
	stmt->oidInfo.aoblkdirIndexOid = 0;
	stmt->oidInfo.aovisimapOid = 0;
	stmt->oidInfo.aovisimapIndexOid = 0;

	if (shouldDispatch)
	{

			/* stmt->relOid = newOid(); */
	}
	else if (Gp_role == GP_ROLE_EXECUTE)
	{

			stmt->oidInfo.relOid = seq->relOid;

	}
418 419
	stmt->tableElts = NIL;
	for (i = SEQ_COL_FIRSTCOL; i <= SEQ_COL_LASTCOL; i++)
420
	{
421
		ColumnDef  *coldef = makeNode(ColumnDef);
422

423 424
		coldef->inhcount = 0;
		coldef->is_local = true;
425
		coldef->is_not_null = true;
426 427
		coldef->raw_default = NULL;
		coldef->cooked_default = NULL;
428 429
		coldef->constraints = NIL;

430
		null[i - 1] = false;
431 432 433

		switch (i)
		{
434
			case SEQ_COL_NAME:
435
				coldef->typname = makeTypeNameFromOid(NAMEOID, -1);
436
				coldef->colname = "sequence_name";
437
				namestrcpy(&name, seq->sequence->relname);
438
				value[i - 1] = NameGetDatum(&name);
439 440
				break;
			case SEQ_COL_LASTVAL:
441
				coldef->typname = makeTypeNameFromOid(INT8OID, -1);
442
				coldef->colname = "last_value";
443
				value[i - 1] = Int64GetDatumFast(new.last_value);
444 445
				break;
			case SEQ_COL_INCBY:
446
				coldef->typname = makeTypeNameFromOid(INT8OID, -1);
447
				coldef->colname = "increment_by";
448
				value[i - 1] = Int64GetDatumFast(new.increment_by);
449 450
				break;
			case SEQ_COL_MAXVALUE:
451
				coldef->typname = makeTypeNameFromOid(INT8OID, -1);
452
				coldef->colname = "max_value";
453
				value[i - 1] = Int64GetDatumFast(new.max_value);
454 455
				break;
			case SEQ_COL_MINVALUE:
456
				coldef->typname = makeTypeNameFromOid(INT8OID, -1);
457
				coldef->colname = "min_value";
458
				value[i - 1] = Int64GetDatumFast(new.min_value);
459 460
				break;
			case SEQ_COL_CACHE:
461
				coldef->typname = makeTypeNameFromOid(INT8OID, -1);
462
				coldef->colname = "cache_value";
463
				value[i - 1] = Int64GetDatumFast(new.cache_value);
464
				break;
V
Vadim B. Mikheev 已提交
465
			case SEQ_COL_LOG:
466
				coldef->typname = makeTypeNameFromOid(INT8OID, -1);
V
Vadim B. Mikheev 已提交
467
				coldef->colname = "log_cnt";
468
				value[i - 1] = Int64GetDatum((int64) 0);
V
Vadim B. Mikheev 已提交
469
				break;
470
			case SEQ_COL_CYCLE:
471
				coldef->typname = makeTypeNameFromOid(BOOLOID, -1);
472
				coldef->colname = "is_cycled";
473
				value[i - 1] = BoolGetDatum(new.is_cycled);
474 475
				break;
			case SEQ_COL_CALLED:
476
				coldef->typname = makeTypeNameFromOid(BOOLOID, -1);
477
				coldef->colname = "is_called";
478
				value[i - 1] = BoolGetDatum(false);
479
				break;
480 481 482 483
		}
		stmt->tableElts = lappend(stmt->tableElts, coldef);
	}

484 485
	stmt->relation = seq->sequence;
	stmt->inhRelations = NIL;
486
	stmt->constraints = NIL;
487 488
	stmt->inhOids = NIL;
	stmt->parentOidCount = 0;
B
Bruce Momjian 已提交
489
	stmt->options = list_make1(defWithOids(false));
490
	stmt->oncommit = ONCOMMIT_NOOP;
491
	stmt->tablespacename = NULL;
492 493 494
	stmt->relKind = RELKIND_SEQUENCE;
	stmt->oidInfo.comptypeOid = seq->comptypeOid;
	stmt->ownerid = GetUserId();
495

496
	seqoid = DefineRelation(stmt, RELKIND_SEQUENCE, RELSTORAGE_HEAP);
497

498 499 500 501 502 503 504 505
	/*
	 * Open and lock the new sequence.  (This lock is redundant; an
	 * AccessExclusiveLock was acquired above by DefineRelation and
	 * won't be released until end of transaction.)
	 *
	 * CDB: Acquire lock on qDisp before dispatching to qExecs, so
	 * qDisp can detect and resolve any deadlocks.
	 */
506
	rel = heap_open(seqoid, AccessExclusiveLock);
507
	tupDesc = RelationGetDescr(rel);
508

509 510
	stmt->oidInfo.relOid = seq->relOid = seqoid;

511 512
	/* Initialize first page of relation with special magic number */

513 514 515
	// -------- MirroredLock ----------
	MIRROREDLOCK_BUFMGR_LOCK;
	
516
	buf = ReadBuffer(rel, P_NEW);
517 518
	Assert(BufferGetBlockNumber(buf) == 0);

519 520
	page = (PageHeader) BufferGetPage(buf);

521 522
	LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);

523 524 525 526
	PageInit((Page) page, BufferGetPageSize(buf), sizeof(sequence_magic));
	sm = (sequence_magic *) PageGetSpecialPointer(page);
	sm->magic = SEQ_MAGIC;

527 528 529 530 531
	LockBuffer(buf, BUFFER_LOCK_UNLOCK);
	
	MIRROREDLOCK_BUFMGR_UNLOCK;
	// -------- MirroredLock ----------
	
532 533 534
	/* hack: ensure heap_insert will insert on the just-created page */
	rel->rd_targblock = 0;

535
	/* Now form & insert sequence tuple */
536
	tuple = heap_form_tuple(tupDesc, value, null);
537
	simple_heap_insert(rel, tuple);
538

539 540
	Assert(ItemPointerGetOffsetNumber(&(tuple->t_self)) == FirstOffsetNumber);

541 542 543 544
	// Fetch gp_persistent_relation_node information that will be added to XLOG record.
	Assert(rel != NULL);
	Sequence_FetchGpRelationNodeForXLog(rel);

545
	/*
546 547
	 * Two special hacks here:
	 *
548 549
	 * 1. Since VACUUM does not process sequences, we have to force the tuple
	 * to have xmin = FrozenTransactionId now.	Otherwise it would become
B
Bruce Momjian 已提交
550
	 * invisible to SELECTs after 2G transactions.	It is okay to do this
551 552 553
	 * because if the current transaction aborts, no other xact will ever
	 * examine the sequence tuple anyway.
	 *
B
Bruce Momjian 已提交
554 555 556 557 558
	 * 2. Even though heap_insert emitted a WAL log record, we have to emit an
	 * XLOG_SEQ_LOG record too, since (a) the heap_insert record will not have
	 * the right xmin, and (b) REDO of the heap_insert record would re-init
	 * page and sequence magic number would be lost.  This means two log
	 * records instead of one :-(
559
	 */
560 561 562 563

	// -------- MirroredLock ----------
	MIRROREDLOCK_BUFMGR_LOCK;

564
	LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
565

566
	START_CRIT_SECTION();
567 568 569

	{
		/*
B
Bruce Momjian 已提交
570
		 * Note that the "tuple" structure is still just a local tuple record
571
		 * created by heap_form_tuple; its t_data pointer doesn't point at the
B
Bruce Momjian 已提交
572 573 574
		 * disk buffer.  To scribble on the disk buffer we need to fetch the
		 * item pointer.  But do the same to the local tuple, since that will
		 * be the source for the WAL log record, below.
575 576 577 578 579 580 581
		 */
		ItemId		itemId;
		Item		item;

		itemId = PageGetItemId((Page) page, FirstOffsetNumber);
		item = PageGetItem((Page) page, itemId);

582
		HeapTupleHeaderSetXmin((HeapTupleHeader) item, FrozenTransactionId);
583 584
		((HeapTupleHeader) item)->t_infomask |= HEAP_XMIN_COMMITTED;

585
		HeapTupleHeaderSetXmin(tuple->t_data, FrozenTransactionId);
586 587 588
		tuple->t_data->t_infomask |= HEAP_XMIN_COMMITTED;
	}

589 590
	MarkBufferDirty(buf);

591 592
	/* XLOG stuff */
	if (!rel->rd_istemp)
593
	{
594 595 596
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
		XLogRecData rdata[2];
597 598

		xlrec.node = rel->rd_node;
599
		RelationGetPTInfo(rel, &xlrec.persistentTid, &xlrec.persistentSerialNum);
600

601 602
		rdata[0].data = (char *) &xlrec;
		rdata[0].len = sizeof(xl_seq_rec);
603
		rdata[0].buffer = InvalidBuffer;
604 605
		rdata[0].next = &(rdata[1]);

606
		rdata[1].data = (char *) tuple->t_data;
607
		rdata[1].len = tuple->t_len;
608
		rdata[1].buffer = InvalidBuffer;
609 610
		rdata[1].next = NULL;

611
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);
612 613

		PageSetLSN(page, recptr);
614
		PageSetTLI(page, ThisTimeLineID);
615
	}
616

617
	END_CRIT_SECTION();
618

619 620
	UnlockReleaseBuffer(buf);

621 622 623
	MIRROREDLOCK_BUFMGR_UNLOCK;
	// -------- MirroredLock ----------

624 625 626 627
	/* process OWNED BY if given */
	if (owned_by)
		process_owned_by(rel, owned_by);

628
	heap_close(rel, NoLock);
629 630 631 632 633 634

	
	/* Dispatch to segments */
	if (shouldDispatch)
	{
		seq->comptypeOid = stmt->oidInfo.comptypeOid;
635 636 637 638 639
		CdbDispatchUtilityStatement((Node *) seq,
									DF_CANCEL_ON_ERROR|
									DF_WITH_SNAPSHOT|
									DF_NEED_TWO_PHASE,
									NULL);
640
	}
641 642
}

B
Bruce Momjian 已提交
643 644 645
/*
 * AlterSequence
 *
646
 * Modify the definition of a sequence relation
B
Bruce Momjian 已提交
647 648
 */
void
649
AlterSequence(AlterSeqStmt *stmt)
B
Bruce Momjian 已提交
650
{
651 652
	MIRROREDLOCK_BUFMGR_DECLARE;

653
	Oid			relid;
B
Bruce Momjian 已提交
654 655 656
	SeqTable	elm;
	Relation	seqrel;
	Buffer		buf;
657
	HeapTupleData seqtuple;
B
Bruce Momjian 已提交
658 659
	Form_pg_sequence seq;
	FormData_pg_sequence new;
660
	List	   *owned_by;
661 662 663 664 665
	int64		save_increment;
	bool		bSeqIsTemp	   = false;
	int			numopts	   = 0;
	char	   *alter_subtype = "";		/* metadata tracking: kind of
										   redundant to say "role" */
B
Bruce Momjian 已提交
666 667

	/* open and AccessShareLock sequence */
668 669
	relid = RangeVarGetRelid(stmt->sequence, false);
	init_sequence(relid, &elm, &seqrel);
B
Bruce Momjian 已提交
670

671
	/* allow ALTER to sequence owner only */
B
Bruce Momjian 已提交
672
	if (!pg_class_ownercheck(elm->relid, GetUserId()))
673 674
		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
					   stmt->sequence->relname);
B
Bruce Momjian 已提交
675

676 677 678
	/* hack to keep ALTER SEQUENCE OWNED BY from changing currval state */
	save_increment = elm->increment;

B
Bruce Momjian 已提交
679
	/* lock page' buffer and read tuple into new sequence structure */
680 681 682 683
	
	// -------- MirroredLock ----------
	MIRROREDLOCK_BUFMGR_LOCK;
	
684
	seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
685
	elm->increment = seq->increment_by;
B
Bruce Momjian 已提交
686

687 688
	/* Copy old values of options into workspace */
	memcpy(&new, seq, sizeof(FormData_pg_sequence));
B
Bruce Momjian 已提交
689

690
	/* Check and set new values */
691
	init_params(stmt->options, false, &new, &owned_by);
B
Bruce Momjian 已提交
692

693 694 695 696 697 698 699 700
	if (owned_by)
	{
		/* Restore previous state of elm (assume nothing else changes) */
		elm->increment = save_increment;
	}
	else
	{
		/* Clear local cache so that we don't think we have cached numbers */
701 702
		/* Note that we do not change the currval() state */
		elm->cached = elm->last;
703 704 705 706 707
	}

	// Fetch gp_persistent_relation_node information that will be added to XLOG record.
	Assert(seqrel != NULL);
	Sequence_FetchGpRelationNodeForXLog(seqrel);
708

709
	/* Now okay to update the on-disk tuple */
B
Bruce Momjian 已提交
710 711
	START_CRIT_SECTION();

712 713
	memcpy(seq, &new, sizeof(FormData_pg_sequence));

714 715
	MarkBufferDirty(buf);

B
Bruce Momjian 已提交
716
	/* XLOG stuff */
717 718 719 720

	bSeqIsTemp = seqrel->rd_istemp;

	if (!bSeqIsTemp)
B
Bruce Momjian 已提交
721 722 723 724
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
		XLogRecData rdata[2];
725
		Page		page = BufferGetPage(buf);
B
Bruce Momjian 已提交
726 727

		xlrec.node = seqrel->rd_node;
728
		RelationGetPTInfo(seqrel, &xlrec.persistentTid, &xlrec.persistentSerialNum);
729

B
Bruce Momjian 已提交
730 731
		rdata[0].data = (char *) &xlrec;
		rdata[0].len = sizeof(xl_seq_rec);
732
		rdata[0].buffer = InvalidBuffer;
B
Bruce Momjian 已提交
733 734
		rdata[0].next = &(rdata[1]);

735 736
		rdata[1].data = (char *) seqtuple.t_data;
		rdata[1].len = seqtuple.t_len;
737
		rdata[1].buffer = InvalidBuffer;
B
Bruce Momjian 已提交
738 739
		rdata[1].next = NULL;

740
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);
B
Bruce Momjian 已提交
741 742

		PageSetLSN(page, recptr);
743
		PageSetTLI(page, ThisTimeLineID);
B
Bruce Momjian 已提交
744 745 746 747
	}

	END_CRIT_SECTION();

748
	UnlockReleaseBuffer(buf);
B
Bruce Momjian 已提交
749

750 751 752
	MIRROREDLOCK_BUFMGR_UNLOCK;
	// -------- MirroredLock ----------

753 754 755 756
	/* process OWNED BY if given */
	if (owned_by)
		process_owned_by(seqrel, owned_by);

B
Bruce Momjian 已提交
757
	relation_close(seqrel, NoLock);
758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790

	numopts = list_length(stmt->options);

	if (numopts > 1)
	{
		char allopts[NAMEDATALEN];

		sprintf(allopts, "%d OPTIONS", numopts);

		alter_subtype = pstrdup(allopts);
	}
	else if (0 == numopts)
	{
		alter_subtype = "0 OPTIONS";
	}
	else if ((Gp_role == GP_ROLE_DISPATCH) && (!bSeqIsTemp))
	{
		ListCell		*option = list_head(stmt->options);
		DefElem			*defel	= (DefElem *) lfirst(option);
		char			*tempo	= NULL;

		alter_subtype = defel->defname;
		if (0 == strcmp(alter_subtype, "owned_by"))
			alter_subtype = "OWNED BY";

		tempo = str_toupper(alter_subtype, strlen(alter_subtype));

		alter_subtype = tempo;

	}

	if (Gp_role == GP_ROLE_DISPATCH)
	{
791 792 793 794 795
		CdbDispatchUtilityStatement((Node *) stmt,
									DF_CANCEL_ON_ERROR|
									DF_WITH_SNAPSHOT|
									DF_NEED_TWO_PHASE,
									NULL);
796 797 798 799 800 801 802 803 804 805 806 807

		if (!bSeqIsTemp)
		{
			/* MPP-6929: metadata tracking */
			MetaTrackUpdObject(RelationRelationId,
							   relid,
							   GetUserId(),
							   "ALTER", alter_subtype
					);
		}

	}
B
Bruce Momjian 已提交
808 809
}

810

811 812 813 814 815
/*
 * Note: nextval with a text argument is no longer exported as a pg_proc
 * entry, but we keep it around to ease porting of C code that may have
 * called the function directly.
 */
816 817
Datum
nextval(PG_FUNCTION_ARGS)
818
{
819
	text	   *seqin = PG_GETARG_TEXT_P(0);
820
	RangeVar   *sequence;
821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839
	Oid			relid;

	sequence = makeRangeVarFromNameList(textToQualifiedNameList(seqin));
	relid = RangeVarGetRelid(sequence, false);

	PG_RETURN_INT64(nextval_internal(relid));
}

Datum
nextval_oid(PG_FUNCTION_ARGS)
{
	Oid			relid = PG_GETARG_OID(0);

	PG_RETURN_INT64(nextval_internal(relid));
}

static int64
nextval_internal(Oid relid)
{
840
	SeqTable	elm;
841
	Relation	seqrel;
842 843 844 845 846 847 848
	bool is_overflow = false;

	/* open and AccessShareLock sequence */
	init_sequence(relid, &elm, &seqrel);

	if (elm->last != elm->cached)		/* some numbers were cached */
	{
849 850
		Assert(elm->last_valid);
		Assert(elm->increment != 0);
851 852
		elm->last += elm->increment;
		relation_close(seqrel, NoLock);
853
		last_used_seq = elm;
854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870
		return elm->last;
	}

	if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 errmsg("permission denied for sequence %s",
						RelationGetRelationName(seqrel))));

    /* Update the sequence object. */
    if (Gp_role == GP_ROLE_EXECUTE)
        cdb_sequence_nextval_proxy(seqrel,
                                   &elm->last,
                                   &elm->cached,
                                   &elm->increment,
                                   &is_overflow);
    else
871 872
        cdb_sequence_nextval(elm,
							 seqrel,
873 874 875 876 877 878
                             &elm->last,
                             &elm->cached,
                             &elm->increment,
                             &is_overflow);
	last_used_seq = elm;

879 880 881
	if (is_overflow)
	{
		relation_close(seqrel, NoLock);
882

883 884 885
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("nextval: reached %s value of sequence \"%s\" (" INT64_FORMAT ")",
886 887
                        elm->increment>0 ? "maximum":"minimum",
                        RelationGetRelationName(seqrel), elm->last)));
888 889 890
	}
	else
		elm->last_valid = true;
891 892

	relation_close(seqrel, NoLock);
893 894 895 896
	return elm->last;
}


897 898 899
static void
cdb_sequence_nextval(SeqTable elm,
					 Relation   seqrel,
900 901 902 903 904 905 906
                     int64     *plast,
                     int64     *pcached,
                     int64     *pincrement,
                     bool      *poverflow)
{
	MIRROREDLOCK_BUFMGR_DECLARE;

907
	Buffer		buf;
908
	Page		page;
909
	HeapTupleData seqtuple;
910
	Form_pg_sequence seq;
911
	int64		incby,
912 913
				maxv,
				minv,
V
Vadim B. Mikheev 已提交
914 915 916 917
				cache,
				log,
				fetch,
				last;
918
	int64		result,
919 920
				next,
				rescnt = 0;
921
	bool 		have_overflow = false;
V
Vadim B. Mikheev 已提交
922
	bool		logit = false;
923

924
	/* lock page' buffer and read tuple */
925 926 927 928
	
	// -------- MirroredLock ----------
	MIRROREDLOCK_BUFMGR_LOCK;
	
929
	seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
930
	page = BufferGetPage(buf);
931

V
Vadim B. Mikheev 已提交
932
	last = next = result = seq->last_value;
933 934 935
	incby = seq->increment_by;
	maxv = seq->max_value;
	minv = seq->min_value;
V
Vadim B. Mikheev 已提交
936 937
	fetch = cache = seq->cache_value;
	log = seq->log_cnt;
938

939
	if (!seq->is_called)
V
Vadim B. Mikheev 已提交
940
	{
941
		rescnt++;				/* return last_value if not is_called */
V
Vadim B. Mikheev 已提交
942 943
		fetch--;
	}
944

945
	/*
B
Bruce Momjian 已提交
946 947 948
	 * Decide whether we should emit a WAL log record.	If so, force up the
	 * fetch count to grab SEQ_LOG_VALS more values than we actually need to
	 * cache.  (These will then be usable without logging.)
949
	 *
950 951 952 953
	 * If this is the first nextval after a checkpoint, we must force a new
	 * WAL record to be written anyway, else replay starting from the
	 * checkpoint would fail to advance the sequence past the logged values.
	 * In this case we may as well fetch extra values.
954
	 */
955
	if (log < fetch || !seq->is_called)
V
Vadim B. Mikheev 已提交
956
	{
957 958
		/* forced log to satisfy local demand for values */
		fetch = log = fetch + SEQ_LOG_VALS;
V
Vadim B. Mikheev 已提交
959 960
		logit = true;
	}
961 962 963 964 965 966 967 968 969 970 971
	else
	{
		XLogRecPtr	redoptr = GetRedoRecPtr();

		if (XLByteLE(PageGetLSN(page), redoptr))
		{
			/* last update of seq was before checkpoint */
			fetch = log = fetch + SEQ_LOG_VALS;
			logit = true;
		}
	}
V
Vadim B. Mikheev 已提交
972

B
Bruce Momjian 已提交
973
	while (fetch)				/* try to fetch cache [+ log ] numbers */
974
	{
975
		/*
B
Bruce Momjian 已提交
976 977
		 * Check MAXVALUE for ascending sequences and MINVALUE for descending
		 * sequences
978
		 */
979
		if (incby > 0)
980
		{
981
			/* ascending sequence */
982 983 984 985
			if ((maxv >= 0 && next > maxv - incby) ||
				(maxv < 0 && next + incby > maxv))
			{
				if (rescnt > 0)
V
Vadim B. Mikheev 已提交
986
					break;		/* stop fetching */
987
				if (!seq->is_cycled)
988
				{
989 990 991 992 993
					have_overflow = true;
				}
				else
				{
					next = minv;
994
				}
995 996 997 998 999 1000
			}
			else
				next += incby;
		}
		else
		{
1001
			/* descending sequence */
1002 1003 1004 1005
			if ((minv < 0 && next < minv - incby) ||
				(minv >= 0 && next + incby < minv))
			{
				if (rescnt > 0)
V
Vadim B. Mikheev 已提交
1006
					break;		/* stop fetching */
1007
				if (!seq->is_cycled)
1008
				{
1009 1010 1011 1012 1013
					have_overflow = true;
				}
				else
				{
					next = maxv;
1014
				}
1015 1016 1017 1018
			}
			else
				next += incby;
		}
V
Vadim B. Mikheev 已提交
1019 1020 1021 1022 1023 1024
		fetch--;
		if (rescnt < cache)
		{
			log--;
			rescnt++;
			last = next;
B
Bruce Momjian 已提交
1025 1026
			if (rescnt == 1)	/* if it's first result - */
				result = next;	/* it's what to return */
V
Vadim B. Mikheev 已提交
1027
		}
1028 1029
	}

1030 1031 1032
	log -= fetch;				/* adjust for any unfetched numbers */
	Assert(log >= 0);

1033 1034 1035 1036 1037
    /* set results for caller */
	*poverflow = have_overflow; /* has the sequence overflown */
    *plast = result;            /* last returned number */
    *pcached = last;            /* last fetched number */
	*pincrement = incby;
V
Vadim B. Mikheev 已提交
1038

1039 1040 1041
	// Fetch gp_persistent_relation_node information that will be added to XLOG record.
	Assert(seqrel != NULL);
	Sequence_FetchGpRelationNodeForXLog(seqrel);
1042

1043
	/* ready to change the on-disk (or really, in-buffer) tuple */
1044
	START_CRIT_SECTION();
1045

1046 1047 1048 1049 1050 1051 1052 1053 1054
	/*
	 * We must mark the buffer dirty before doing XLogInsert(); see notes in
	 * SyncOneBuffer().  However, we don't apply the desired changes just yet.
	 * This looks like a violation of the buffer update protocol, but it is
	 * in fact safe because we hold exclusive lock on the buffer.  Any other
	 * process, including a checkpoint, that tries to examine the buffer
	 * contents will block until we release the lock, and then will see the
	 * final state that we install below.
	 */
1055 1056
	MarkBufferDirty(buf);

1057 1058
	/* XLOG stuff */
	if (logit && !seqrel->rd_istemp)
V
Vadim B. Mikheev 已提交
1059 1060 1061
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
B
Bruce Momjian 已提交
1062
		XLogRecData rdata[2];
V
Vadim B. Mikheev 已提交
1063

1064 1065 1066 1067 1068 1069
		/*
		 * We don't log the current state of the tuple, but rather the state
		 * as it would appear after "log" more fetches.  This lets us skip
		 * that many future WAL records, at the cost that we lose those
		 * sequence values if we crash.
		 */
1070

1071
		/* set values that will be saved in xlog */
1072
		seq->last_value = next;
1073
		seq->is_called = true;
1074
		seq->log_cnt = 0;
1075

1076
		xlrec.node = seqrel->rd_node;
1077
		RelationGetPTInfo(seqrel, &xlrec.persistentTid, &xlrec.persistentSerialNum);
B
Bruce Momjian 已提交
1078
		rdata[0].data = (char *) &xlrec;
1079
		rdata[0].len = sizeof(xl_seq_rec);
1080
		rdata[0].buffer = InvalidBuffer;
1081 1082
		rdata[0].next = &(rdata[1]);

1083 1084
		rdata[1].data = (char *) seqtuple.t_data;
		rdata[1].len = seqtuple.t_len;
1085
		rdata[1].buffer = InvalidBuffer;
1086 1087
		rdata[1].next = NULL;

1088
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);
V
Vadim B. Mikheev 已提交
1089

1090
		PageSetLSN(page, recptr);
1091
		PageSetTLI(page, ThisTimeLineID);
1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104

		/* need to update where we've inserted to into shmem so that the QD can flush it
		 * when necessary
		 */
		LWLockAcquire(SeqServerControlLock, LW_EXCLUSIVE);

		if (XLByteLT(seqServerCtl->lastXlogEntry, recptr))
		{
			seqServerCtl->lastXlogEntry.xlogid = recptr.xlogid;
			seqServerCtl->lastXlogEntry.xrecoff = recptr.xrecoff;
		}

		LWLockRelease(SeqServerControlLock);
V
Vadim B. Mikheev 已提交
1105
	}
1106

1107
	/* Now update sequence tuple to the intended final state */
V
Vadim B. Mikheev 已提交
1108
	seq->last_value = last;		/* last fetched number */
1109
	seq->is_called = true;
V
Vadim B. Mikheev 已提交
1110
	seq->log_cnt = log;			/* how much is logged */
1111

1112
	END_CRIT_SECTION();
1113

1114
	UnlockReleaseBuffer(buf);
1115 1116 1117 1118 1119
	
	MIRROREDLOCK_BUFMGR_UNLOCK;
	// -------- MirroredLock ----------
	
}                               /* cdb_sequence_nextval */
1120

1121

1122
Datum
1123
currval_oid(PG_FUNCTION_ARGS)
1124
{
1125 1126
	Oid			relid = PG_GETARG_OID(0);
	int64		result;
1127
	SeqTable	elm;
1128
	Relation	seqrel;
1129

1130 1131 1132 1133 1134 1135 1136 1137
	/* For now, strictly forbidden on MPP. */
	if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE)
	{
		ereport(ERROR,
				(errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED),
				 errmsg("currval() not supported")));
	}

V
Vadim B. Mikheev 已提交
1138
	/* open and AccessShareLock sequence */
1139
	init_sequence(relid, &elm, &seqrel);
1140

1141 1142
	if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_SELECT) != ACLCHECK_OK &&
		pg_class_aclcheck(elm->relid, GetUserId(), ACL_USAGE) != ACLCHECK_OK)
1143 1144
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1145
				 errmsg("permission denied for sequence %s",
1146
						RelationGetRelationName(seqrel))));
1147

1148
	if (!elm->last_valid)
1149 1150
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1151
				 errmsg("currval of sequence \"%s\" is not yet defined in this session",
1152
						RelationGetRelationName(seqrel))));
1153 1154 1155

	result = elm->last;

1156 1157
	relation_close(seqrel, NoLock);

1158
	PG_RETURN_INT64(result);
1159 1160
}

1161 1162 1163 1164 1165 1166
Datum
lastval(PG_FUNCTION_ARGS)
{
	Relation	seqrel;
	int64		result;

1167 1168 1169 1170 1171 1172 1173 1174
	/* For now, strictly forbidden on MPP. */
	if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE)
	{
		ereport(ERROR,
				(errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED),
				 errmsg("lastval() not supported")));
	}

1175 1176 1177 1178 1179 1180
	if (last_used_seq == NULL)
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("lastval is not yet defined in this session")));

	/* Someone may have dropped the sequence since the last nextval() */
1181 1182 1183
	if (!SearchSysCacheExists(RELOID,
							  ObjectIdGetDatum(last_used_seq->relid),
							  0, 0, 0))
1184 1185 1186 1187
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("lastval is not yet defined in this session")));

1188
	seqrel = open_share_lock(last_used_seq);
1189 1190

	/* nextval() must have already been called for this sequence */
1191
	Assert(last_used_seq->last_valid);
1192

1193 1194
	if (pg_class_aclcheck(last_used_seq->relid, GetUserId(), ACL_SELECT) != ACLCHECK_OK &&
		pg_class_aclcheck(last_used_seq->relid, GetUserId(), ACL_USAGE) != ACLCHECK_OK)
1195 1196 1197 1198 1199 1200 1201
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 errmsg("permission denied for sequence %s",
						RelationGetRelationName(seqrel))));

	result = last_used_seq->last;
	relation_close(seqrel, NoLock);
1202

1203 1204 1205
	PG_RETURN_INT64(result);
}

B
Bruce Momjian 已提交
1206
/*
1207 1208 1209 1210
 * Main internal procedure that handles 2 & 3 arg forms of SETVAL.
 *
 * Note that the 3 arg version (which sets the is_called flag) is
 * only for use in pg_dump, and setting the is_called flag may not
B
Bruce Momjian 已提交
1211
 * work if multiple users are attached to the database and referencing
1212 1213
 * the sequence (unlikely if pg_dump is restoring it).
 *
B
Bruce Momjian 已提交
1214
 * It is necessary to have the 3 arg version so that pg_dump can
1215 1216 1217 1218
 * restore the state of a sequence exactly during data-only restores -
 * it is the only way to clear the is_called flag in an existing
 * sequence.
 */
B
Bruce Momjian 已提交
1219
static void
1220
do_setval(Oid relid, int64 next, bool iscalled)
M
 
Marc G. Fournier 已提交
1221
{
1222 1223
	MIRROREDLOCK_BUFMGR_DECLARE;

M
 
Marc G. Fournier 已提交
1224
	SeqTable	elm;
1225
	Relation	seqrel;
1226
	Buffer		buf;
1227
	HeapTupleData seqtuple;
1228
	Form_pg_sequence seq;
M
 
Marc G. Fournier 已提交
1229

1230 1231 1232 1233 1234 1235 1236
	if (Gp_role == GP_ROLE_EXECUTE)
	{
		ereport(ERROR,
				(errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED),
				 errmsg("setval() not supported in this context")));
	}

1237
	/* open and AccessShareLock sequence */
1238
	init_sequence(relid, &elm, &seqrel);
1239 1240

	if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
1241 1242
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1243
				 errmsg("permission denied for sequence %s",
1244
						RelationGetRelationName(seqrel))));
M
 
Marc G. Fournier 已提交
1245

1246
	/* lock page' buffer and read tuple */
1247 1248 1249 1250
	
	// -------- MirroredLock ----------
	MIRROREDLOCK_BUFMGR_LOCK;
	
1251
	seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
1252
	elm->increment = seq->increment_by;
M
 
Marc G. Fournier 已提交
1253

1254
	if ((next < seq->min_value) || (next > seq->max_value))
1255
	{
B
Bruce Momjian 已提交
1256 1257 1258 1259
		char		bufv[100],
					bufm[100],
					bufx[100];

1260 1261 1262
		snprintf(bufv, sizeof(bufv), INT64_FORMAT, next);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, seq->min_value);
		snprintf(bufx, sizeof(bufx), INT64_FORMAT, seq->max_value);
1263 1264
		ereport(ERROR,
				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1265
				 errmsg("setval: value %s is out of bounds for sequence \"%s\" (%s..%s)",
1266 1267
						bufv, RelationGetRelationName(seqrel),
						bufm, bufx)));
1268
	}
M
 
Marc G. Fournier 已提交
1269

1270 1271 1272 1273 1274 1275 1276 1277 1278
	/* Set the currval() state only if iscalled = true */
	if (iscalled)
	{
		elm->last = next;		/* last returned number */
		elm->last_valid = true;
	}

	/* In any case, forget any future cached numbers */
	elm->cached = elm->last;
M
 
Marc G. Fournier 已提交
1279

1280 1281 1282 1283
	// Fetch gp_persistent_relation_node information that will be added to XLOG record.
	Assert(seqrel != NULL);
	Sequence_FetchGpRelationNodeForXLog(seqrel);

1284
	/* ready to change the on-disk (or really, in-buffer) tuple */
1285
	START_CRIT_SECTION();
1286

1287 1288 1289 1290
	seq->last_value = next;		/* last fetched number */
	seq->is_called = iscalled;
	seq->log_cnt = 0;

1291 1292
	MarkBufferDirty(buf);

1293 1294
	/* XLOG stuff */
	if (!seqrel->rd_istemp)
V
Vadim B. Mikheev 已提交
1295 1296 1297
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
B
Bruce Momjian 已提交
1298
		XLogRecData rdata[2];
1299
		Page		page = BufferGetPage(buf);
V
Vadim B. Mikheev 已提交
1300

1301
		xlrec.node = seqrel->rd_node;
1302
		RelationGetPTInfo(seqrel, &xlrec.persistentTid, &xlrec.persistentSerialNum);
1303

B
Bruce Momjian 已提交
1304
		rdata[0].data = (char *) &xlrec;
1305
		rdata[0].len = sizeof(xl_seq_rec);
1306
		rdata[0].buffer = InvalidBuffer;
1307 1308
		rdata[0].next = &(rdata[1]);

1309 1310
		rdata[1].data = (char *) seqtuple.t_data;
		rdata[1].len = seqtuple.t_len;
1311
		rdata[1].buffer = InvalidBuffer;
1312 1313
		rdata[1].next = NULL;

1314
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);
1315 1316

		PageSetLSN(page, recptr);
1317
		PageSetTLI(page, ThisTimeLineID);
V
Vadim B. Mikheev 已提交
1318
	}
1319

1320
	END_CRIT_SECTION();
M
 
Marc G. Fournier 已提交
1321

1322
	UnlockReleaseBuffer(buf);
1323

1324 1325 1326
	MIRROREDLOCK_BUFMGR_UNLOCK;
	// -------- MirroredLock ----------

1327
	relation_close(seqrel, NoLock);
1328 1329
}

1330 1331 1332 1333
/*
 * Implement the 2 arg setval procedure.
 * See do_setval for discussion.
 */
1334
Datum
1335
setval_oid(PG_FUNCTION_ARGS)
1336
{
1337
	Oid			relid = PG_GETARG_OID(0);
1338
	int64		next = PG_GETARG_INT64(1);
1339

1340
	do_setval(relid, next, true);
1341

1342
	PG_RETURN_INT64(next);
1343 1344
}

1345 1346 1347 1348
/*
 * Implement the 3 arg setval procedure.
 * See do_setval for discussion.
 */
1349
Datum
1350
setval3_oid(PG_FUNCTION_ARGS)
1351
{
1352
	Oid			relid = PG_GETARG_OID(0);
1353
	int64		next = PG_GETARG_INT64(1);
1354 1355
	bool		iscalled = PG_GETARG_BOOL(2);

1356
	do_setval(relid, next, iscalled);
1357

1358
	PG_RETURN_INT64(next);
M
 
Marc G. Fournier 已提交
1359 1360
}

1361

1362
/*
1363 1364
 * Open the sequence and acquire AccessShareLock if needed
 *
1365
 * If we haven't touched the sequence already in this transaction,
B
Bruce Momjian 已提交
1366
 * we need to acquire AccessShareLock.	We arrange for the lock to
1367 1368 1369
 * be owned by the top transaction, so that we don't need to do it
 * more than once per xact.
 */
1370 1371
static Relation
open_share_lock(SeqTable seq)
1372
{
1373
	LocalTransactionId thislxid = MyProc->lxid;
1374

1375
	/* Get the lock if not already held in this xact */
1376
	if (seq->lxid != thislxid)
1377 1378 1379 1380 1381 1382 1383
	{
		ResourceOwner currentOwner;

		currentOwner = CurrentResourceOwner;
		PG_TRY();
		{
			CurrentResourceOwner = TopTransactionResourceOwner;
1384
			LockRelationOid(seq->relid, AccessShareLock);
1385 1386 1387 1388 1389 1390 1391 1392 1393 1394
		}
		PG_CATCH();
		{
			/* Ensure CurrentResourceOwner is restored on error */
			CurrentResourceOwner = currentOwner;
			PG_RE_THROW();
		}
		PG_END_TRY();
		CurrentResourceOwner = currentOwner;

1395
		/* Flag that we have a lock in the current xact */
1396
		seq->lxid = thislxid;
1397
	}
1398 1399 1400

	/* We now know we have AccessShareLock, and can safely open the rel */
	return relation_open(seq->relid, NoLock);
1401 1402
}

1403
/*
1404
 * Given a relation OID, open and lock the sequence.  p_elm and p_rel are
1405
 * output parameters.
1406 1407
 *
 * GPDB: If p_rel is NULL, the sequence relation is not opened or locked.
1408 1409
 */
static void
1410
init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel)
1411
{
B
Bruce Momjian 已提交
1412
	SeqTable	elm;
1413
	Relation	seqrel;
1414

1415 1416 1417 1418 1419 1420 1421
	/* Look to see if we already have a seqtable entry for relation */
	for (elm = seqtab; elm != NULL; elm = elm->next)
	{
		if (elm->relid == relid)
			break;
	}

1422
	/*
1423
	 * Allocate new seqtable entry if we didn't find one.
1424
	 *
B
Bruce Momjian 已提交
1425 1426 1427
	 * NOTE: seqtable entries remain in the list for the life of a backend. If
	 * the sequence itself is deleted then the entry becomes wasted memory,
	 * but it's small enough that this should not matter.
B
Bruce Momjian 已提交
1428
	 */
1429
	if (elm == NULL)
1430
	{
1431
		/*
B
Bruce Momjian 已提交
1432 1433
		 * Time to make a new seqtable entry.  These entries live as long as
		 * the backend does, so we use plain malloc for them.
1434 1435
		 */
		elm = (SeqTable) malloc(sizeof(SeqTableData));
T
Tom Lane 已提交
1436
		if (elm == NULL)
1437 1438 1439
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
1440
		elm->relid = relid;
1441
		elm->lxid = InvalidLocalTransactionId;
1442
		elm->last_valid = false;
1443 1444 1445
		elm->last = elm->cached = elm->increment = 0;
		elm->next = seqtab;
		seqtab = elm;
1446 1447
	}

1448 1449 1450
	/*
	 * Open the sequence relation.
	 */
1451 1452 1453
	if (p_rel)
	{
		seqrel = open_share_lock(elm);
1454

1455 1456 1457 1458 1459
		if (seqrel->rd_rel->relkind != RELKIND_SEQUENCE)
			ereport(ERROR,
					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
					 errmsg("\"%s\" is not a sequence",
							RelationGetRelationName(seqrel))));
1460

1461 1462
		*p_rel = seqrel;
	}
1463
	*p_elm = elm;
1464 1465 1466
}


1467 1468 1469 1470 1471 1472 1473 1474 1475
/*
 * Given an opened sequence relation, lock the page buffer and find the tuple
 *
 * *buf receives the reference to the pinned-and-ex-locked buffer
 * *seqtuple receives the reference to the sequence tuple proper
 *		(this arg should point to a local variable of type HeapTupleData)
 *
 * Function's return value points to the data payload of the tuple
 */
1476
static Form_pg_sequence
1477
read_seq_tuple(SeqTable elm, Relation rel, Buffer *buf, HeapTuple seqtuple)
1478
{
1479 1480 1481 1482
	PageHeader	page;
	ItemId		lp;
	sequence_magic *sm;
	Form_pg_sequence seq;
1483

1484 1485
	MIRROREDLOCK_BUFMGR_MUST_ALREADY_BE_HELD;

1486 1487 1488 1489 1490 1491 1492
	*buf = ReadBuffer(rel, 0);
	LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);

	page = (PageHeader) BufferGetPage(*buf);
	sm = (sequence_magic *) PageGetSpecialPointer(page);

	if (sm->magic != SEQ_MAGIC)
1493 1494
		elog(ERROR, "bad magic number in sequence \"%s\": %08X",
			 RelationGetRelationName(rel), sm->magic);
1495 1496

	lp = PageGetItemId(page, FirstOffsetNumber);
1497
	Assert(ItemIdIsNormal(lp));
1498 1499 1500 1501

	/* Note we currently only bother to set these two fields of *seqtuple */
	seqtuple->t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
	seqtuple->t_len = ItemIdGetLength(lp);
1502

1503 1504 1505 1506 1507 1508 1509 1510
	/*
	 * Previous releases of Postgres neglected to prevent SELECT FOR UPDATE
	 * on a sequence, which would leave a non-frozen XID in the sequence
	 * tuple's xmax, which eventually leads to clog access failures or worse.
	 * If we see this has happened, clean up after it.  We treat this like a
	 * hint bit update, ie, don't bother to WAL-log it, since we can certainly
	 * do this again if the update gets lost.
	 */
1511
	if (HeapTupleHeaderGetXmax(seqtuple->t_data) != InvalidTransactionId)
1512
	{
1513 1514 1515
		HeapTupleHeaderSetXmax(seqtuple->t_data, InvalidTransactionId);
		seqtuple->t_data->t_infomask &= ~HEAP_XMAX_COMMITTED;
		seqtuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
1516 1517
		SetBufferCommitInfoNeedsSave(*buf);
	}
1518

1519
	seq = (Form_pg_sequence) GETSTRUCT(seqtuple);
1520

1521
	/* this is a handy place to update our copy of the increment */
1522 1523 1524
	elm->increment = seq->increment_by;

	return seq;
1525 1526
}

1527 1528
/*
 * init_params: process the options list of CREATE or ALTER SEQUENCE,
1529 1530
 * and store the values into appropriate fields of *new.  Also set
 * *owned_by to any OWNED BY option, or to NIL if there is none.
1531 1532 1533 1534
 *
 * If isInit is true, fill any unspecified options with default values;
 * otherwise, do not change existing options that aren't explicitly overridden.
 */
1535
static void
1536 1537
init_params(List *options, bool isInit,
			Form_pg_sequence new, List **owned_by)
1538
{
1539 1540 1541 1542 1543
	DefElem    *last_value = NULL;
	DefElem    *increment_by = NULL;
	DefElem    *max_value = NULL;
	DefElem    *min_value = NULL;
	DefElem    *cache_value = NULL;
1544
	DefElem    *is_cycled = NULL;
1545
	ListCell   *option;
1546

1547 1548
	*owned_by = NIL;

B
Bruce Momjian 已提交
1549
	foreach(option, options)
1550
	{
1551
		DefElem    *defel = (DefElem *) lfirst(option);
1552

1553
		if (strcmp(defel->defname, "increment") == 0)
1554 1555
		{
			if (increment_by)
1556 1557 1558
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1559
			increment_by = defel;
1560
		}
B
Bruce Momjian 已提交
1561

B
Bruce Momjian 已提交
1562
		/*
B
Bruce Momjian 已提交
1563
		 * start is for a new sequence restart is for alter
B
Bruce Momjian 已提交
1564
		 */
1565 1566
		else if (strcmp(defel->defname, "start") == 0 ||
				 strcmp(defel->defname, "restart") == 0)
1567 1568
		{
			if (last_value)
1569 1570 1571
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1572
			last_value = defel;
1573
		}
1574
		else if (strcmp(defel->defname, "maxvalue") == 0)
1575 1576
		{
			if (max_value)
1577 1578 1579
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1580
			max_value = defel;
1581
		}
1582
		else if (strcmp(defel->defname, "minvalue") == 0)
1583 1584
		{
			if (min_value)
1585 1586 1587
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1588
			min_value = defel;
1589
		}
1590
		else if (strcmp(defel->defname, "cache") == 0)
1591 1592
		{
			if (cache_value)
1593 1594 1595
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1596
			cache_value = defel;
1597
		}
1598
		else if (strcmp(defel->defname, "cycle") == 0)
1599
		{
1600
			if (is_cycled)
1601 1602 1603
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1604
			is_cycled = defel;
1605
		}
1606 1607 1608 1609 1610 1611 1612 1613
		else if (strcmp(defel->defname, "owned_by") == 0)
		{
			if (*owned_by)
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
			*owned_by = defGetQualifiedName(defel);
		}
1614
		else
1615
			elog(ERROR, "option \"%s\" not recognized",
1616 1617 1618
				 defel->defname);
	}

1619 1620 1621 1622 1623 1624 1625
	/*
	 * We must reset log_cnt when isInit or when changing any parameters
	 * that would affect future nextval allocations.
	 */
	if (isInit)
		new->log_cnt = 0;

B
Bruce Momjian 已提交
1626
	/* INCREMENT BY */
1627
	if (increment_by != NULL)
B
Bruce Momjian 已提交
1628 1629
	{
		new->increment_by = defGetInt64(increment_by);
1630 1631 1632
		if (new->increment_by == 0)
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1633
					 errmsg("INCREMENT must not be zero")));
1634
		new->log_cnt = 0;
B
Bruce Momjian 已提交
1635
	}
1636 1637 1638 1639
	else if (isInit)
		new->increment_by = 1;

	/* CYCLE */
1640
	if (is_cycled != NULL)
1641 1642 1643
	{
		new->is_cycled = intVal(is_cycled->arg);
		Assert(new->is_cycled == false || new->is_cycled == true);
1644
		new->log_cnt = 0;
1645 1646 1647
	}
	else if (isInit)
		new->is_cycled = false;
1648

1649
	/* MAXVALUE (null arg means NO MAXVALUE) */
1650
	if (max_value != NULL && max_value->arg)
1651
	{
1652
		new->max_value = defGetInt64(max_value);
1653 1654
		new->log_cnt = 0;
	}
1655
	else if (isInit || max_value != NULL)
1656
	{
1657
		if (new->increment_by > 0)
B
Bruce Momjian 已提交
1658
			new->max_value = SEQ_MAXVALUE;		/* ascending seq */
1659
		else
B
Bruce Momjian 已提交
1660
			new->max_value = -1;	/* descending seq */
1661
		new->log_cnt = 0;
1662
	}
1663

1664
	/* MINVALUE (null arg means NO MINVALUE) */
1665
	if (min_value != NULL && min_value->arg)
1666
	{
1667
		new->min_value = defGetInt64(min_value);
1668 1669
		new->log_cnt = 0;
	}
1670
	else if (isInit || min_value != NULL)
1671
	{
1672
		if (new->increment_by > 0)
B
Bruce Momjian 已提交
1673
			new->min_value = 1; /* ascending seq */
1674
		else
B
Bruce Momjian 已提交
1675
			new->min_value = SEQ_MINVALUE;		/* descending seq */
1676
		new->log_cnt = 0;
1677
	}
1678

1679
	/* crosscheck min/max */
1680
	if (new->min_value >= new->max_value)
1681
	{
B
Bruce Momjian 已提交
1682 1683 1684
		char		bufm[100],
					bufx[100];

1685 1686
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->min_value);
		snprintf(bufx, sizeof(bufx), INT64_FORMAT, new->max_value);
1687 1688 1689 1690
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("MINVALUE (%s) must be less than MAXVALUE (%s)",
						bufm, bufx)));
1691
	}
1692

B
Bruce Momjian 已提交
1693
	/* START WITH */
1694
	if (last_value != NULL)
1695
	{
1696
		new->last_value = defGetInt64(last_value);
1697 1698 1699
		new->is_called = false;
		new->log_cnt = 1;
	}
1700
	else if (isInit)
1701
	{
1702 1703 1704 1705
		if (new->increment_by > 0)
			new->last_value = new->min_value;	/* ascending seq */
		else
			new->last_value = new->max_value;	/* descending seq */
1706 1707
		new->is_called = false;
		new->log_cnt = 1;
1708
	}
1709

1710
	/* crosscheck */
1711
	if (new->last_value < new->min_value)
1712
	{
B
Bruce Momjian 已提交
1713 1714 1715
		char		bufs[100],
					bufm[100];

1716 1717
		snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->last_value);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->min_value);
1718 1719
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1720
				 errmsg("START value (%s) cannot be less than MINVALUE (%s)",
B
Bruce Momjian 已提交
1721
						bufs, bufm)));
1722
	}
1723
	if (new->last_value > new->max_value)
1724
	{
B
Bruce Momjian 已提交
1725 1726 1727
		char		bufs[100],
					bufm[100];

1728 1729
		snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->last_value);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->max_value);
1730 1731
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
B
Bruce Momjian 已提交
1732 1733
			  errmsg("START value (%s) cannot be greater than MAXVALUE (%s)",
					 bufs, bufm)));
1734
	}
1735

B
Bruce Momjian 已提交
1736
	/* CACHE */
1737
	if (cache_value != NULL)
1738
	{
1739 1740 1741 1742
		new->cache_value = defGetInt64(cache_value);
		if (new->cache_value <= 0)
		{
			char		buf[100];
B
Bruce Momjian 已提交
1743

1744 1745 1746 1747 1748 1749
			snprintf(buf, sizeof(buf), INT64_FORMAT, new->cache_value);
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					 errmsg("CACHE (%s) must be greater than zero",
							buf)));
		}
1750
		new->log_cnt = 0;
1751
	}
1752 1753
	else if (isInit)
		new->cache_value = 1;
1754 1755
}

1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779
/*
 * Process an OWNED BY option for CREATE/ALTER SEQUENCE
 *
 * Ownership permissions on the sequence are already checked,
 * but if we are establishing a new owned-by dependency, we must
 * enforce that the referenced table has the same owner and namespace
 * as the sequence.
 */
static void
process_owned_by(Relation seqrel, List *owned_by)
{
	int			nnames;
	Relation	tablerel;
	AttrNumber	attnum;

	nnames = list_length(owned_by);
	Assert(nnames > 0);
	if (nnames == 1)
	{
		/* Must be OWNED BY NONE */
		if (strcmp(strVal(linitial(owned_by)), "none") != 0)
			ereport(ERROR,
					(errcode(ERRCODE_SYNTAX_ERROR),
					 errmsg("invalid OWNED BY option"),
B
Bruce Momjian 已提交
1780
				errhint("Specify OWNED BY table.column or OWNED BY NONE.")));
1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808
		tablerel = NULL;
		attnum = 0;
	}
	else
	{
		List	   *relname;
		char	   *attrname;
		RangeVar   *rel;

		/* Separate relname and attr name */
		relname = list_truncate(list_copy(owned_by), nnames - 1);
		attrname = strVal(lfirst(list_tail(owned_by)));

		/* Open and lock rel to ensure it won't go away meanwhile */
		rel = makeRangeVarFromNameList(relname);
		tablerel = relation_openrv(rel, AccessShareLock);

		/* Must be a regular table */
		if (tablerel->rd_rel->relkind != RELKIND_RELATION)
			ereport(ERROR,
					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
					 errmsg("referenced relation \"%s\" is not a table",
							RelationGetRelationName(tablerel))));

		/* We insist on same owner and schema */
		if (seqrel->rd_rel->relowner != tablerel->rd_rel->relowner)
			ereport(ERROR,
					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
B
Bruce Momjian 已提交
1809
					 errmsg("sequence must have same owner as table it is linked to")));
1810 1811 1812
		if (RelationGetNamespace(seqrel) != RelationGetNamespace(tablerel))
			ereport(ERROR,
					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
P
Peter Eisentraut 已提交
1813
					 errmsg("sequence must be in same schema as table it is linked to")));
1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824

		/* Now, fetch the attribute number from the system cache */
		attnum = get_attnum(RelationGetRelid(tablerel), attrname);
		if (attnum == InvalidAttrNumber)
			ereport(ERROR,
					(errcode(ERRCODE_UNDEFINED_COLUMN),
					 errmsg("column \"%s\" of relation \"%s\" does not exist",
							attrname, RelationGetRelationName(tablerel))));
	}

	/*
B
Bruce Momjian 已提交
1825 1826
	 * OK, we are ready to update pg_depend.  First remove any existing AUTO
	 * dependencies for the sequence, then optionally add a new one.
1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848
	 */
	markSequenceUnowned(RelationGetRelid(seqrel));

	if (tablerel)
	{
		ObjectAddress refobject,
					depobject;

		refobject.classId = RelationRelationId;
		refobject.objectId = RelationGetRelid(tablerel);
		refobject.objectSubId = attnum;
		depobject.classId = RelationRelationId;
		depobject.objectId = RelationGetRelid(seqrel);
		depobject.objectSubId = 0;
		recordDependencyOn(&depobject, &refobject, DEPENDENCY_AUTO);
	}

	/* Done, but hold lock until commit */
	if (tablerel)
		relation_close(tablerel, NoLock);
}

V
Vadim B. Mikheev 已提交
1849

B
Bruce Momjian 已提交
1850
void
1851
seq_redo(XLogRecPtr beginLoc, XLogRecPtr lsn, XLogRecord *record)
V
Vadim B. Mikheev 已提交
1852
{
1853 1854
	MIRROREDLOCK_BUFMGR_DECLARE;

B
Bruce Momjian 已提交
1855 1856 1857 1858 1859 1860 1861
	uint8		info = record->xl_info & ~XLR_INFO_MASK;
	Relation	reln;
	Buffer		buffer;
	Page		page;
	char	   *item;
	Size		itemsz;
	xl_seq_rec *xlrec = (xl_seq_rec *) XLogRecGetData(record);
1862
	sequence_magic *sm;
V
Vadim B. Mikheev 已提交
1863

1864
	if (info != XLOG_SEQ_LOG)
1865
		elog(PANIC, "seq_redo: unknown op code %u", info);
V
Vadim B. Mikheev 已提交
1866

1867
	reln = XLogOpenRelation(xlrec->node);
1868 1869 1870 1871
	
	// -------- MirroredLock ----------
	MIRROREDLOCK_BUFMGR_LOCK;
	
1872 1873
	buffer = XLogReadBuffer(reln, 0, true);
	Assert(BufferIsValid(buffer));
V
Vadim B. Mikheev 已提交
1874 1875
	page = (Page) BufferGetPage(buffer);

1876 1877
	/* Always reinit the page and reinstall the magic number */
	/* See comments in DefineSequence */
1878 1879 1880
	PageInit((Page) page, BufferGetPageSize(buffer), sizeof(sequence_magic));
	sm = (sequence_magic *) PageGetSpecialPointer(page);
	sm->magic = SEQ_MAGIC;
V
Vadim B. Mikheev 已提交
1881

B
Bruce Momjian 已提交
1882
	item = (char *) xlrec + sizeof(xl_seq_rec);
1883
	itemsz = record->xl_len - sizeof(xl_seq_rec);
1884

B
Bruce Momjian 已提交
1885
	if (PageAddItem(page, (Item) item, itemsz,
1886
					FirstOffsetNumber, false, false) == InvalidOffsetNumber)
1887
		elog(PANIC, "seq_redo: failed to add item to page");
V
Vadim B. Mikheev 已提交
1888 1889

	PageSetLSN(page, lsn);
1890
	PageSetTLI(page, ThisTimeLineID);
1891 1892
	MarkBufferDirty(buffer);
	UnlockReleaseBuffer(buffer);
1893 1894 1895 1896
	
	MIRROREDLOCK_BUFMGR_UNLOCK;
	// -------- MirroredLock ----------
	
V
Vadim B. Mikheev 已提交
1897 1898
}

B
Bruce Momjian 已提交
1899
void
1900
seq_desc(StringInfo buf, XLogRecPtr beginLoc, XLogRecord *record)
V
Vadim B. Mikheev 已提交
1901
{
1902 1903
	uint8		info = record->xl_info & ~XLR_INFO_MASK;
	char		*rec = XLogRecGetData(record);
B
Bruce Momjian 已提交
1904
	xl_seq_rec *xlrec = (xl_seq_rec *) rec;
V
Vadim B. Mikheev 已提交
1905 1906

	if (info == XLOG_SEQ_LOG)
1907
		appendStringInfo(buf, "log: ");
V
Vadim B. Mikheev 已提交
1908 1909
	else
	{
1910
		appendStringInfo(buf, "UNKNOWN");
V
Vadim B. Mikheev 已提交
1911 1912 1913
		return;
	}

1914
	appendStringInfo(buf, "rel %u/%u/%u",
B
Bruce Momjian 已提交
1915
			   xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode);
V
Vadim B. Mikheev 已提交
1916
}
1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000


/*
 * Initialize a pseudo relcache entry with just enough info to call bufmgr.
 */
static void
cdb_sequence_relation_init(Relation seqrel,
                           Oid      tablespaceid,
                           Oid      dbid,
                           Oid      relid,
                           bool     istemp)
{
    /* See RelationBuildDesc in relcache.c */
    memset(seqrel, 0, sizeof(*seqrel));

    seqrel->rd_smgr = NULL;
    seqrel->rd_refcnt = 99;

    seqrel->rd_id = relid;
    seqrel->rd_istemp = istemp;

    /* Must use shared buffer pool so seqserver & QDs can see the data. */
    seqrel->rd_isLocalBuf = false;

	seqrel->rd_rel = (Form_pg_class)palloc0(CLASS_TUPLE_SIZE);
    sprintf(seqrel->rd_rel->relname.data, "pg_class.oid=%d", relid);

    /* as in RelationInitPhysicalAddr... */
    seqrel->rd_node.spcNode = tablespaceid;
    seqrel->rd_node.dbNode = dbid;
    seqrel->rd_node.relNode = relid;
}                               /* cdb_sequence_relation_init */

/*
 * Clean up pseudo relcache entry.
 */
static void
cdb_sequence_relation_term(Relation seqrel)
{
    /* Close the file. */
    RelationCloseSmgr(seqrel);

    if (seqrel->rd_rel)
        pfree(seqrel->rd_rel);
}                               /* cdb_sequence_relation_term */



/*
 * CDB: forward a nextval request from qExec to the sequence server
 */
void
cdb_sequence_nextval_proxy(Relation	seqrel,
                           int64   *plast,
                           int64   *pcached,
                           int64   *pincrement,
                           bool    *poverflow)
{

	sendSequenceRequest(GetSeqServerFD(),
						seqrel,
    					gp_session_id,
    					plast,
    					pcached,
    					pincrement,
    					poverflow);

}                               /* cdb_sequence_server_nextval */


/*
 * CDB: nextval entry point called by sequence server
 */
void
cdb_sequence_nextval_server(Oid    tablespaceid,
                            Oid    dbid,
                            Oid    relid,
                            bool   istemp,
                            int64 *plast,
                            int64 *pcached,
                            int64 *pincrement,
                            bool  *poverflow)
{
    RelationData    fakerel;
2001
	SeqTable	elm;
2002 2003 2004 2005 2006 2007
	Relation	    seqrel = &fakerel;

    *plast = 0;
    *pcached = 0;
    *pincrement = 0;

2008 2009 2010 2011 2012 2013
	/* Find the SeqTable entry for the sequence. Note that we don't lock the
	 * relation, because the sequence server cannot hold heavy-weight locks.
	 * GPDB_83_MERGE_FIXME: that's why I assume we don't hold the lock, anyway...
	 * */
	init_sequence(relid, &elm, NULL);

2014 2015 2016 2017 2018 2019 2020
    /* Build a pseudo relcache entry with just enough info to call bufmgr. */
    seqrel = &fakerel;
    cdb_sequence_relation_init(seqrel, tablespaceid, dbid, relid, istemp);

    /* CDB TODO: Catch errors. */

    /* Update the sequence object. */
2021
    cdb_sequence_nextval(elm, seqrel, plast, pcached, pincrement, poverflow);
2022 2023 2024 2025

    /* Cleanup. */
    cdb_sequence_relation_term(seqrel);
}                               /* cdb_sequence_server_nextval */