sequence.c 51.9 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * sequence.c
4
 *	  PostgreSQL sequences support code.
5
 *
6
 * Portions Copyright (c) 2005-2008, Greenplum inc.
7
 * Portions Copyright (c) 2012-Present Pivotal Software, Inc.
B
Bruce Momjian 已提交
8
 * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
9 10 11 12
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
13
 *	  src/backend/commands/sequence.c
14
 *
15 16
 *-------------------------------------------------------------------------
 */
17
#include "postgres.h"
18

19
#include "access/heapam.h"
20
#include "access/bufmask.h"
21 22
#include "access/transam.h"
#include "access/xact.h"
23
#include "access/xlogutils.h"
24
#include "catalog/dependency.h"
25
#include "catalog/heap.h"
26
#include "catalog/namespace.h"
27
#include "catalog/pg_type.h"
28
#include "commands/defrem.h"
29
#include "commands/sequence.h"
30
#include "commands/tablecmds.h"
31
#include "funcapi.h"
B
Bruce Momjian 已提交
32
#include "miscadmin.h"
33
#include "storage/smgr.h"               /* RelationCloseSmgr -> smgrclose */
34
#include "nodes/makefuncs.h"
35 36
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
37
#include "storage/proc.h"
38
#include "storage/smgr.h"
39
#include "utils/acl.h"
B
Bruce Momjian 已提交
40
#include "utils/builtins.h"
41
#include "utils/formatting.h"
42
#include "utils/lsyscache.h"
43
#include "utils/resowner.h"
44
#include "utils/syscache.h"
45

46
#include "catalog/oid_dispatch.h"
47
#include "cdb/cdbdisp_query.h"
H
Heikki Linnakangas 已提交
48
#include "cdb/cdbdoublylinked.h"
49 50 51 52 53 54 55
#include "cdb/cdbsrlz.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbmotion.h"
#include "cdb/ml_ipc.h"

#include "postmaster/seqserver.h"

56

V
Vadim B. Mikheev 已提交
57
/*
58
 * We don't want to log each fetching of a value from a sequence,
V
Vadim B. Mikheev 已提交
59
 * so we pre-log a few fetches in advance. In the event of
60
 * crash we can lose (skip over) as many values as we pre-logged.
V
Vadim B. Mikheev 已提交
61
 */
B
Bruce Momjian 已提交
62
#define SEQ_LOG_VALS	32
63

64 65 66 67 68
/*
 * The "special area" of a sequence's buffer page looks like this.
 */
#define SEQ_MAGIC	  0x1717

69 70
typedef struct sequence_magic
{
71
	uint32		magic;
72
} sequence_magic;
73

74 75 76 77 78 79
/*
 * We store a SeqTable item for every sequence we have touched in the current
 * session.  This is needed to hold onto nextval/currval state.  (We can't
 * rely on the relcache, since it's only, well, a cache, and may decide to
 * discard entries.)
 *
B
Bruce Momjian 已提交
80
 * XXX We use linear search to find pre-existing SeqTable entries.	This is
81 82 83
 * good when only a small number of sequences are touched in a session, but
 * would suck with many different sequences.  Perhaps use a hashtable someday.
 */
84 85
typedef struct SeqTableData
{
86 87
	struct SeqTableData *next;	/* link to next SeqTable object */
	Oid			relid;			/* pg_class OID of this sequence */
88
	Oid			filenode;		/* last seen relfilenode of this sequence */
89
	LocalTransactionId lxid;	/* xact in which we last did a seq op */
90
	bool		last_valid;		/* do we have a valid "last" value? */
91 92 93 94
	int64		last;			/* value last returned by nextval */
	int64		cached;			/* last value already cached for nextval */
	/* if last != cached, we have not used up all the cached values */
	int64		increment;		/* copy of sequence's increment field */
95
	/* note that increment is zero until we first do read_seq_tuple() */
96
} SeqTableData;
97 98 99

typedef SeqTableData *SeqTable;

100
static SeqTable seqtab = NULL;	/* Head of list of SeqTable items */
101

102 103 104 105 106
/*
 * last_used_seq is updated by nextval() to point to the last used
 * sequence.
 */
static SeqTableData *last_used_seq = NULL;
107

108
static void fill_seq_with_data(Relation rel, HeapTuple tuple);
109
static int64 nextval_internal(Oid relid);
110
static Relation open_share_lock(SeqTable seq);
111
static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel);
112 113
static Form_pg_sequence read_seq_tuple(SeqTable elm, Relation rel,
			   Buffer *buf, HeapTuple seqtuple);
114
static void init_params(List *options, bool isInit,
B
Bruce Momjian 已提交
115
			Form_pg_sequence new, List **owned_by);
116
static void do_setval(Oid relid, int64 next, bool iscalled);
117
static void process_owned_by(Relation seqrel, List *owned_by);
118
static void mask_seq_values(Page page);
119

120
static void
121 122
cdb_sequence_nextval(SeqTable elm,
					 Relation   seqrel,
123 124 125 126 127 128 129 130 131 132 133
                     int64     *plast,
                     int64     *pcached,
                     int64     *pincrement,
                     bool      *seq_overflow);
static void
cdb_sequence_nextval_proxy(Relation seqrel,
                           int64   *plast,
                           int64   *pcached,
                           int64   *pincrement,
                           bool    *poverflow);

134
/*
B
Bruce Momjian 已提交
135
 * DefineSequence
136
 *				Creates a new sequence relation
137 138
 */
void
139
DefineSequence(CreateSeqStmt *seq)
140
{
141
	FormData_pg_sequence new;
142
	List	   *owned_by;
143
	CreateStmt *stmt = makeNode(CreateStmt);
144
	Oid			seqoid;
145 146 147 148
	Relation	rel;
	HeapTuple	tuple;
	TupleDesc	tupDesc;
	Datum		value[SEQ_COL_LASTCOL];
149
	bool		null[SEQ_COL_LASTCOL];
150
	int			i;
151
	NameData	name;
152

153 154
	bool shouldDispatch =  Gp_role == GP_ROLE_DISPATCH && !IsBootstrapProcessingMode();

155 156 157 158 159 160
	/* Unlogged sequences are not implemented -- not clear if useful. */
	if (seq->sequence->relpersistence == RELPERSISTENCE_UNLOGGED)
		ereport(ERROR,
				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
				 errmsg("unlogged sequences are not supported")));

161
	/* Check and set all option values */
162
	init_params(seq->options, true, &new, &owned_by);
163 164

	/*
165
	 * Create relation (and fill value[] and null[] for the tuple)
166 167 168
	 */
	stmt->tableElts = NIL;
	for (i = SEQ_COL_FIRSTCOL; i <= SEQ_COL_LASTCOL; i++)
169
	{
170
		ColumnDef  *coldef = makeNode(ColumnDef);
171

172 173
		coldef->inhcount = 0;
		coldef->is_local = true;
174
		coldef->is_not_null = true;
175
		coldef->is_from_type = false;
176
		coldef->storage = 0;
177 178
		coldef->raw_default = NULL;
		coldef->cooked_default = NULL;
179 180
		coldef->collClause = NULL;
		coldef->collOid = InvalidOid;
181 182
		coldef->constraints = NIL;

183
		null[i - 1] = false;
184 185 186

		switch (i)
		{
187
			case SEQ_COL_NAME:
188
				coldef->typeName = makeTypeNameFromOid(NAMEOID, -1);
189
				coldef->colname = "sequence_name";
190
				namestrcpy(&name, seq->sequence->relname);
191
				value[i - 1] = NameGetDatum(&name);
192 193
				break;
			case SEQ_COL_LASTVAL:
194
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
195
				coldef->colname = "last_value";
196
				value[i - 1] = Int64GetDatumFast(new.last_value);
197
				break;
198
			case SEQ_COL_STARTVAL:
199
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
200 201 202
				coldef->colname = "start_value";
				value[i - 1] = Int64GetDatumFast(new.start_value);
				break;
203
			case SEQ_COL_INCBY:
204
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
205
				coldef->colname = "increment_by";
206
				value[i - 1] = Int64GetDatumFast(new.increment_by);
207 208
				break;
			case SEQ_COL_MAXVALUE:
209
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
210
				coldef->colname = "max_value";
211
				value[i - 1] = Int64GetDatumFast(new.max_value);
212 213
				break;
			case SEQ_COL_MINVALUE:
214
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
215
				coldef->colname = "min_value";
216
				value[i - 1] = Int64GetDatumFast(new.min_value);
217 218
				break;
			case SEQ_COL_CACHE:
219
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
220
				coldef->colname = "cache_value";
221
				value[i - 1] = Int64GetDatumFast(new.cache_value);
222
				break;
V
Vadim B. Mikheev 已提交
223
			case SEQ_COL_LOG:
224
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
V
Vadim B. Mikheev 已提交
225
				coldef->colname = "log_cnt";
226
				value[i - 1] = Int64GetDatum((int64) 0);
V
Vadim B. Mikheev 已提交
227
				break;
228
			case SEQ_COL_CYCLE:
229
				coldef->typeName = makeTypeNameFromOid(BOOLOID, -1);
230
				coldef->colname = "is_cycled";
231
				value[i - 1] = BoolGetDatum(new.is_cycled);
232 233
				break;
			case SEQ_COL_CALLED:
234
				coldef->typeName = makeTypeNameFromOid(BOOLOID, -1);
235
				coldef->colname = "is_called";
236
				value[i - 1] = BoolGetDatum(false);
237
				break;
238 239 240 241
		}
		stmt->tableElts = lappend(stmt->tableElts, coldef);
	}

242 243
	stmt->relation = seq->sequence;
	stmt->inhRelations = NIL;
244
	stmt->constraints = NIL;
245 246
	stmt->inhOids = NIL;
	stmt->parentOidCount = 0;
B
Bruce Momjian 已提交
247
	stmt->options = list_make1(defWithOids(false));
248
	stmt->oncommit = ONCOMMIT_NOOP;
249
	stmt->tablespacename = NULL;
R
Robert Haas 已提交
250
	stmt->if_not_exists = false;
251 252
	stmt->relKind = RELKIND_SEQUENCE;
	stmt->ownerid = GetUserId();
253

A
Asim R P 已提交
254
	seqoid = DefineRelation(stmt, RELKIND_SEQUENCE, seq->ownerId, RELSTORAGE_HEAP, false);
R
Robert Haas 已提交
255
	Assert(seqoid != InvalidOid);
256

257 258 259 260 261 262 263 264
	/*
	 * Open and lock the new sequence.  (This lock is redundant; an
	 * AccessExclusiveLock was acquired above by DefineRelation and
	 * won't be released until end of transaction.)
	 *
	 * CDB: Acquire lock on qDisp before dispatching to qExecs, so
	 * qDisp can detect and resolve any deadlocks.
	 */
265
	rel = heap_open(seqoid, AccessExclusiveLock);
266
	tupDesc = RelationGetDescr(rel);
267

268
	/* now initialize the sequence's data */
269
	tuple = heap_form_tuple(tupDesc, value, null);
270 271
	fill_seq_with_data(rel, tuple);

A
Asim R P 已提交
272 273 274 275 276 277 278 279 280 281 282
	/* Dispatch to segments */
	if (shouldDispatch)
	{
		CdbDispatchUtilityStatement((Node *) seq,
									DF_CANCEL_ON_ERROR|
									DF_WITH_SNAPSHOT|
									DF_NEED_TWO_PHASE,
									GetAssignedOidsForDispatch(),
									NULL);
	}

283 284 285 286 287 288 289
	/* process OWNED BY if given */
	if (owned_by)
		process_owned_by(rel, owned_by);

	heap_close(rel, NoLock);
}

A
Asim R P 已提交
290 291
static void gp_alter_sequence_internal(Oid relid, List *options);

292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
/*
 * Reset a sequence to its initial value.
 *
 * The change is made transactionally, so that on failure of the current
 * transaction, the sequence will be restored to its previous state.
 * We do that by creating a whole new relfilenode for the sequence; so this
 * works much like the rewriting forms of ALTER TABLE.
 *
 * Caller is assumed to have acquired AccessExclusiveLock on the sequence,
 * which must not be released until end of transaction.  Caller is also
 * responsible for permissions checking.
 */
void
ResetSequence(Oid seq_relid)
{
	Relation	seq_rel;
	SeqTable	elm;
	Form_pg_sequence seq;
	Buffer		buf;
A
Asim R P 已提交
311
	HeapTupleData seqtuple;
312
	HeapTuple	tuple;
A
Asim R P 已提交
313 314 315 316 317 318 319 320 321 322 323 324

	/*
	 * GPDB_91_MERGE_FIXME: GPDB does not support transactional restart of
	 * sequence relations.  This is a consequence of the assumption in sequence
	 * server that relfilenode of a sequence relation is identical to its OID.
	 * The RelationSetNewRelfilenode() call below violates that assumption and
	 * breaks sequence server implementation.  Until sequences in GPDB are
	 * redesigned, we have to resort to non-transactional sequence restarts.
	 */
	gp_alter_sequence_internal(
		seq_relid, list_make1(makeDefElem("restart", NULL)));
	return;
325 326 327 328 329 330 331

	/*
	 * Read the old sequence.  This does a bit more work than really
	 * necessary, but it's simple, and we do want to double-check that it's
	 * indeed a sequence.
	 */
	init_sequence(seq_relid, &elm, &seq_rel);
A
Asim R P 已提交
332
	(void) read_seq_tuple(elm, seq_rel, &buf, &seqtuple);
333 334 335 336

	/*
	 * Copy the existing sequence tuple.
	 */
A
Asim R P 已提交
337
	tuple = heap_copytuple(&seqtuple);
338 339 340 341 342 343 344 345 346 347 348 349 350 351

	/* Now we're done with the old page */
	UnlockReleaseBuffer(buf);

	/*
	 * Modify the copied tuple to execute the restart (compare the RESTART
	 * action in AlterSequence)
	 */
	seq = (Form_pg_sequence) GETSTRUCT(tuple);
	seq->last_value = seq->start_value;
	seq->is_called = false;
	seq->log_cnt = 1;

	/*
352
	 * Create a new storage file for the sequence.	We want to keep the
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
	 * sequence's relfrozenxid at 0, since it won't contain any unfrozen XIDs.
	 */
	RelationSetNewRelfilenode(seq_rel, InvalidTransactionId);

	/*
	 * Insert the modified tuple into the new storage file.
	 */
	fill_seq_with_data(seq_rel, tuple);

	/* Clear local cache so that we don't think we have cached numbers */
	/* Note that we do not change the currval() state */
	elm->cached = elm->last;

	relation_close(seq_rel, NoLock);
}

/*
 * Initialize a sequence's relation with the specified tuple as content
 */
static void
fill_seq_with_data(Relation rel, HeapTuple tuple)
{
	Buffer		buf;
	Page		page;
	sequence_magic *sm;
A
Asim R P 已提交
378
	OffsetNumber offnum;
379 380

	/* Initialize first page of relation with special magic number */
381
	buf = ReadBuffer(rel, P_NEW);
382 383
	Assert(BufferGetBlockNumber(buf) == 0);

384
	page = BufferGetPage(buf);
385

386 387
	LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);

388
	PageInit(page, BufferGetPageSize(buf), sizeof(sequence_magic));
389 390 391
	sm = (sequence_magic *) PageGetSpecialPointer(page);
	sm->magic = SEQ_MAGIC;

392
	/* Now insert sequence tuple */
393

394
	/*
395
	 * Since VACUUM does not process sequences, we have to force the tuple
396
	 * to have xmin = FrozenTransactionId now.	Otherwise it would become
B
Bruce Momjian 已提交
397
	 * invisible to SELECTs after 2G transactions.	It is okay to do this
398 399 400
	 * because if the current transaction aborts, no other xact will ever
	 * examine the sequence tuple anyway.
	 *
401
	 */
402

403 404 405 406 407 408
	HeapTupleHeaderSetXmin(tuple->t_data, FrozenTransactionId);
	HeapTupleHeaderSetXminFrozen(tuple->t_data);
	HeapTupleHeaderSetCmin(tuple->t_data, FirstCommandId);
	HeapTupleHeaderSetXmax(tuple->t_data, InvalidTransactionId);
	tuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
	ItemPointerSet(&tuple->t_data->t_ctid, 0, FirstOffsetNumber);
409

410
	START_CRIT_SECTION();
411

412 413
	MarkBufferDirty(buf);

414 415 416 417 418
	offnum = PageAddItem(page, (Item) tuple->t_data, tuple->t_len,
						 InvalidOffsetNumber, false, false);
	if (offnum != FirstOffsetNumber)
		elog(ERROR, "failed to add sequence tuple to page");

419
	/* XLOG stuff */
420
	if (RelationNeedsWAL(rel))
421
	{
422 423 424
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
		XLogRecData rdata[2];
425 426

		xlrec.node = rel->rd_node;
427

428 429
		rdata[0].data = (char *) &xlrec;
		rdata[0].len = sizeof(xl_seq_rec);
430
		rdata[0].buffer = InvalidBuffer;
431 432
		rdata[0].next = &(rdata[1]);

433
		rdata[1].data = (char *) tuple->t_data;
434
		rdata[1].len = tuple->t_len;
435
		rdata[1].buffer = InvalidBuffer;
436 437
		rdata[1].next = NULL;

438
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);
439 440 441

		PageSetLSN(page, recptr);
	}
442

443
	END_CRIT_SECTION();
444

445
	UnlockReleaseBuffer(buf);
446 447
}

B
Bruce Momjian 已提交
448 449 450
/*
 * AlterSequence
 *
451
 * Modify the definition of a sequence relation
B
Bruce Momjian 已提交
452 453
 */
void
454
AlterSequence(AlterSeqStmt *stmt)
B
Bruce Momjian 已提交
455
{
456
	Oid			relid;
457 458 459 460 461 462 463 464 465 466 467

	/* find sequence */
	relid = RangeVarGetRelid(stmt->sequence, false);

	/* allow ALTER to sequence owner only */
	/* if you change this, see also callers of AlterSequenceInternal! */
	if (!pg_class_ownercheck(relid, GetUserId()))
		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
					   stmt->sequence->relname);

	/* do the work */
A
Asim R P 已提交
468
	gp_alter_sequence_internal(relid, stmt->options);
469 470 471 472 473 474 475 476

	if (Gp_role == GP_ROLE_DISPATCH)
		CdbDispatchUtilityStatement((Node *) stmt,
									DF_CANCEL_ON_ERROR|
									DF_WITH_SNAPSHOT|
									DF_NEED_TWO_PHASE,
									NIL,
									NULL);
477 478
}

A
Asim R P 已提交
479 480
static void
gp_alter_sequence_internal(Oid relid, List *options)
B
Bruce Momjian 已提交
481 482 483 484
{
	SeqTable	elm;
	Relation	seqrel;
	Buffer		buf;
A
Asim R P 已提交
485
	HeapTupleData seqtuple;
B
Bruce Momjian 已提交
486 487
	Form_pg_sequence seq;
	FormData_pg_sequence new;
488
	List	   *owned_by;
A
Asim R P 已提交
489 490
	bool        bSeqIsTemp = false;
	int			numopts;
491 492
	char	   *alter_subtype = "";		/* metadata tracking: kind of
										   redundant to say "role" */
B
Bruce Momjian 已提交
493 494

	/* open and AccessShareLock sequence */
495
	init_sequence(relid, &elm, &seqrel);
B
Bruce Momjian 已提交
496 497

	/* lock page' buffer and read tuple into new sequence structure */
498
	seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
B
Bruce Momjian 已提交
499

500 501
	/* Copy old values of options into workspace */
	memcpy(&new, seq, sizeof(FormData_pg_sequence));
B
Bruce Momjian 已提交
502

503
	/* Check and set new values */
504
	init_params(options, false, &new, &owned_by);
B
Bruce Momjian 已提交
505

506 507 508
	/* Clear local cache so that we don't think we have cached numbers */
	/* Note that we do not change the currval() state */
	elm->cached = elm->last;
509

510
	/* Now okay to update the on-disk tuple */
B
Bruce Momjian 已提交
511 512
	START_CRIT_SECTION();

513 514
	memcpy(seq, &new, sizeof(FormData_pg_sequence));

515 516
	MarkBufferDirty(buf);

B
Bruce Momjian 已提交
517
	/* XLOG stuff */
518
	if (RelationNeedsWAL(seqrel))
B
Bruce Momjian 已提交
519 520 521 522
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
		XLogRecData rdata[2];
523
		Page		page = BufferGetPage(buf);
B
Bruce Momjian 已提交
524 525

		xlrec.node = seqrel->rd_node;
526

B
Bruce Momjian 已提交
527 528
		rdata[0].data = (char *) &xlrec;
		rdata[0].len = sizeof(xl_seq_rec);
529
		rdata[0].buffer = InvalidBuffer;
B
Bruce Momjian 已提交
530 531
		rdata[0].next = &(rdata[1]);

532 533
		rdata[1].data = (char *) seqtuple.t_data;
		rdata[1].len = seqtuple.t_len;
534
		rdata[1].buffer = InvalidBuffer;
B
Bruce Momjian 已提交
535 536
		rdata[1].next = NULL;

537
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);
B
Bruce Momjian 已提交
538 539 540 541 542 543

		PageSetLSN(page, recptr);
	}

	END_CRIT_SECTION();

544
	UnlockReleaseBuffer(buf);
B
Bruce Momjian 已提交
545

546 547 548 549
	/* process OWNED BY if given */
	if (owned_by)
		process_owned_by(seqrel, owned_by);

A
Asim R P 已提交
550 551
	bSeqIsTemp = (seqrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP);

B
Bruce Momjian 已提交
552
	relation_close(seqrel, NoLock);
553

554
	numopts = list_length(options);
555 556
	if (numopts > 1)
	{
A
Asim R P 已提交
557
		alter_subtype = psprintf("%d OPTIONS", numopts);
558 559 560 561 562
	}
	else if (0 == numopts)
	{
		alter_subtype = "0 OPTIONS";
	}
A
Asim R P 已提交
563
	else if (Gp_role == GP_ROLE_DISPATCH && !bSeqIsTemp)
564
	{
565
		ListCell		*option = list_head(options);
566 567 568 569 570 571 572
		DefElem			*defel	= (DefElem *) lfirst(option);
		char			*tempo	= NULL;

		alter_subtype = defel->defname;
		if (0 == strcmp(alter_subtype, "owned_by"))
			alter_subtype = "OWNED BY";

A
Asim R P 已提交
573
		tempo = asc_toupper(alter_subtype, strlen(alter_subtype));
574 575 576 577

		alter_subtype = tempo;
	}

578
	if (Gp_role == GP_ROLE_DISPATCH && !bSeqIsTemp)
579
	{
580 581 582 583 584
		/* MPP-6929: metadata tracking */
		MetaTrackUpdObject(RelationRelationId,
						   relid,
						   GetUserId(),
						   "ALTER", alter_subtype);
585
	}
B
Bruce Momjian 已提交
586 587
}

588

589 590 591 592 593
/*
 * Note: nextval with a text argument is no longer exported as a pg_proc
 * entry, but we keep it around to ease porting of C code that may have
 * called the function directly.
 */
594 595
Datum
nextval(PG_FUNCTION_ARGS)
596
{
597
	text	   *seqin = PG_GETARG_TEXT_P(0);
598
	RangeVar   *sequence;
599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617
	Oid			relid;

	sequence = makeRangeVarFromNameList(textToQualifiedNameList(seqin));
	relid = RangeVarGetRelid(sequence, false);

	PG_RETURN_INT64(nextval_internal(relid));
}

Datum
nextval_oid(PG_FUNCTION_ARGS)
{
	Oid			relid = PG_GETARG_OID(0);

	PG_RETURN_INT64(nextval_internal(relid));
}

static int64
nextval_internal(Oid relid)
{
618
	SeqTable	elm;
619
	Relation	seqrel;
620 621 622 623 624
	bool is_overflow = false;

	/* open and AccessShareLock sequence */
	init_sequence(relid, &elm, &seqrel);

625
	/* read-only transactions may only modify temp sequences */
A
Asim R P 已提交
626 627 628 629 630
	/*
	 * GPDB_91_MERGE_FIXME: if it's possible to get another session's relation
	 * here, this code will not function as expected.
	 */
	if (seqrel->rd_backend != TempRelBackendId)
631 632
		PreventCommandIfReadOnly("nextval()");

633 634
	if (elm->last != elm->cached)		/* some numbers were cached */
	{
635 636
		Assert(elm->last_valid);
		Assert(elm->increment != 0);
637 638
		elm->last += elm->increment;
		relation_close(seqrel, NoLock);
639
		last_used_seq = elm;
640 641 642 643 644 645 646 647 648
		return elm->last;
	}

	if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 errmsg("permission denied for sequence %s",
						RelationGetRelationName(seqrel))));

649 650 651 652 653 654 655 656 657
	/* Update the sequence object. */
	if (Gp_role == GP_ROLE_EXECUTE)
		cdb_sequence_nextval_proxy(seqrel,
								   &elm->last,
								   &elm->cached,
								   &elm->increment,
								   &is_overflow);
	else
		cdb_sequence_nextval(elm,
658
							 seqrel,
659 660 661 662
							 &elm->last,
							 &elm->cached,
							 &elm->increment,
							 &is_overflow);
663 664
	last_used_seq = elm;

665 666
	if (is_overflow)
	{
667 668
		char	   *relname = pstrdup(RelationGetRelationName(seqrel));

669
		relation_close(seqrel, NoLock);
670

671 672 673
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("nextval: reached %s value of sequence \"%s\" (" INT64_FORMAT ")",
674
                        elm->increment>0 ? "maximum":"minimum",
675
                        relname, elm->last)));
676 677 678
	}
	else
		elm->last_valid = true;
679 680

	relation_close(seqrel, NoLock);
681 682 683 684
	return elm->last;
}


685 686 687
static void
cdb_sequence_nextval(SeqTable elm,
					 Relation   seqrel,
688 689 690 691 692
                     int64     *plast,
                     int64     *pcached,
                     int64     *pincrement,
                     bool      *poverflow)
{
693
	Buffer		buf;
694
	Page		page;
695
	HeapTupleData seqtuple;
696
	Form_pg_sequence seq;
697
	int64		incby,
698 699
				maxv,
				minv,
V
Vadim B. Mikheev 已提交
700 701 702 703
				cache,
				log,
				fetch,
				last;
704
	int64		result,
705 706
				next,
				rescnt = 0;
707
	bool 		have_overflow = false;
V
Vadim B. Mikheev 已提交
708
	bool		logit = false;
709

710
	/* lock page' buffer and read tuple */
711
	seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
712
	page = BufferGetPage(buf);
713

V
Vadim B. Mikheev 已提交
714
	last = next = result = seq->last_value;
715 716 717
	incby = seq->increment_by;
	maxv = seq->max_value;
	minv = seq->min_value;
V
Vadim B. Mikheev 已提交
718 719
	fetch = cache = seq->cache_value;
	log = seq->log_cnt;
720

721
	if (!seq->is_called)
V
Vadim B. Mikheev 已提交
722
	{
723
		rescnt++;				/* return last_value if not is_called */
V
Vadim B. Mikheev 已提交
724 725
		fetch--;
	}
726

727
	/*
B
Bruce Momjian 已提交
728 729 730
	 * Decide whether we should emit a WAL log record.	If so, force up the
	 * fetch count to grab SEQ_LOG_VALS more values than we actually need to
	 * cache.  (These will then be usable without logging.)
731
	 *
732 733 734 735
	 * If this is the first nextval after a checkpoint, we must force a new
	 * WAL record to be written anyway, else replay starting from the
	 * checkpoint would fail to advance the sequence past the logged values.
	 * In this case we may as well fetch extra values.
736
	 */
737
	if (log < fetch || !seq->is_called)
V
Vadim B. Mikheev 已提交
738
	{
739 740
		/* forced log to satisfy local demand for values */
		fetch = log = fetch + SEQ_LOG_VALS;
V
Vadim B. Mikheev 已提交
741 742
		logit = true;
	}
743 744 745 746 747 748 749 750 751 752 753
	else
	{
		XLogRecPtr	redoptr = GetRedoRecPtr();

		if (XLByteLE(PageGetLSN(page), redoptr))
		{
			/* last update of seq was before checkpoint */
			fetch = log = fetch + SEQ_LOG_VALS;
			logit = true;
		}
	}
V
Vadim B. Mikheev 已提交
754

B
Bruce Momjian 已提交
755
	while (fetch)				/* try to fetch cache [+ log ] numbers */
756
	{
757
		/*
B
Bruce Momjian 已提交
758 759
		 * Check MAXVALUE for ascending sequences and MINVALUE for descending
		 * sequences
760
		 */
761
		if (incby > 0)
762
		{
763
			/* ascending sequence */
764 765 766 767
			if ((maxv >= 0 && next > maxv - incby) ||
				(maxv < 0 && next + incby > maxv))
			{
				if (rescnt > 0)
V
Vadim B. Mikheev 已提交
768
					break;		/* stop fetching */
769
				if (!seq->is_cycled)
770
				{
771 772 773 774 775
					have_overflow = true;
				}
				else
				{
					next = minv;
776
				}
777 778 779 780 781 782
			}
			else
				next += incby;
		}
		else
		{
783
			/* descending sequence */
784 785 786 787
			if ((minv < 0 && next < minv - incby) ||
				(minv >= 0 && next + incby < minv))
			{
				if (rescnt > 0)
V
Vadim B. Mikheev 已提交
788
					break;		/* stop fetching */
789
				if (!seq->is_cycled)
790
				{
791 792 793 794 795
					have_overflow = true;
				}
				else
				{
					next = maxv;
796
				}
797 798 799 800
			}
			else
				next += incby;
		}
V
Vadim B. Mikheev 已提交
801 802 803 804 805 806
		fetch--;
		if (rescnt < cache)
		{
			log--;
			rescnt++;
			last = next;
B
Bruce Momjian 已提交
807 808
			if (rescnt == 1)	/* if it's first result - */
				result = next;	/* it's what to return */
V
Vadim B. Mikheev 已提交
809
		}
810 811
	}

812 813 814
	log -= fetch;				/* adjust for any unfetched numbers */
	Assert(log >= 0);

815 816 817 818 819
    /* set results for caller */
	*poverflow = have_overflow; /* has the sequence overflown */
    *plast = result;            /* last returned number */
    *pcached = last;            /* last fetched number */
	*pincrement = incby;
V
Vadim B. Mikheev 已提交
820

821
	/* ready to change the on-disk (or really, in-buffer) tuple */
822
	START_CRIT_SECTION();
823

824 825 826 827 828 829 830 831 832
	/*
	 * We must mark the buffer dirty before doing XLogInsert(); see notes in
	 * SyncOneBuffer().  However, we don't apply the desired changes just yet.
	 * This looks like a violation of the buffer update protocol, but it is
	 * in fact safe because we hold exclusive lock on the buffer.  Any other
	 * process, including a checkpoint, that tries to examine the buffer
	 * contents will block until we release the lock, and then will see the
	 * final state that we install below.
	 */
833 834
	MarkBufferDirty(buf);

835
	/* XLOG stuff */
836
	if (logit && RelationNeedsWAL(seqrel))
V
Vadim B. Mikheev 已提交
837 838 839
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
B
Bruce Momjian 已提交
840
		XLogRecData rdata[2];
V
Vadim B. Mikheev 已提交
841

842 843 844 845 846 847
		/*
		 * We don't log the current state of the tuple, but rather the state
		 * as it would appear after "log" more fetches.  This lets us skip
		 * that many future WAL records, at the cost that we lose those
		 * sequence values if we crash.
		 */
848

849
		/* set values that will be saved in xlog */
850
		seq->last_value = next;
851
		seq->is_called = true;
852
		seq->log_cnt = 0;
853

854
		xlrec.node = seqrel->rd_node;
B
Bruce Momjian 已提交
855
		rdata[0].data = (char *) &xlrec;
856
		rdata[0].len = sizeof(xl_seq_rec);
857
		rdata[0].buffer = InvalidBuffer;
858 859
		rdata[0].next = &(rdata[1]);

860 861
		rdata[1].data = (char *) seqtuple.t_data;
		rdata[1].len = seqtuple.t_len;
862
		rdata[1].buffer = InvalidBuffer;
863 864
		rdata[1].next = NULL;

865
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);
V
Vadim B. Mikheev 已提交
866

867
		PageSetLSN(page, recptr);
868 869 870 871 872 873 874 875 876 877 878 879 880

		/* need to update where we've inserted to into shmem so that the QD can flush it
		 * when necessary
		 */
		LWLockAcquire(SeqServerControlLock, LW_EXCLUSIVE);

		if (XLByteLT(seqServerCtl->lastXlogEntry, recptr))
		{
			seqServerCtl->lastXlogEntry.xlogid = recptr.xlogid;
			seqServerCtl->lastXlogEntry.xrecoff = recptr.xrecoff;
		}

		LWLockRelease(SeqServerControlLock);
V
Vadim B. Mikheev 已提交
881
	}
882

883
	/* Now update sequence tuple to the intended final state */
V
Vadim B. Mikheev 已提交
884
	seq->last_value = last;		/* last fetched number */
885
	seq->is_called = true;
V
Vadim B. Mikheev 已提交
886
	seq->log_cnt = log;			/* how much is logged */
887

888
	END_CRIT_SECTION();
889

890
	UnlockReleaseBuffer(buf);
891
}                               /* cdb_sequence_nextval */
892

893

894
Datum
895
currval_oid(PG_FUNCTION_ARGS)
896
{
897 898
	Oid			relid = PG_GETARG_OID(0);
	int64		result;
899
	SeqTable	elm;
900
	Relation	seqrel;
901

902 903 904 905 906 907 908 909
	/* For now, strictly forbidden on MPP. */
	if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE)
	{
		ereport(ERROR,
				(errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED),
				 errmsg("currval() not supported")));
	}

V
Vadim B. Mikheev 已提交
910
	/* open and AccessShareLock sequence */
911
	init_sequence(relid, &elm, &seqrel);
912

913 914
	if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_SELECT) != ACLCHECK_OK &&
		pg_class_aclcheck(elm->relid, GetUserId(), ACL_USAGE) != ACLCHECK_OK)
915 916
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
917
				 errmsg("permission denied for sequence %s",
918
						RelationGetRelationName(seqrel))));
919

920
	if (!elm->last_valid)
921 922
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
923
				 errmsg("currval of sequence \"%s\" is not yet defined in this session",
924
						RelationGetRelationName(seqrel))));
925 926 927

	result = elm->last;

928 929
	relation_close(seqrel, NoLock);

930
	PG_RETURN_INT64(result);
931 932
}

933 934 935 936 937 938
Datum
lastval(PG_FUNCTION_ARGS)
{
	Relation	seqrel;
	int64		result;

939 940 941 942 943 944 945 946
	/* For now, strictly forbidden on MPP. */
	if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE)
	{
		ereport(ERROR,
				(errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED),
				 errmsg("lastval() not supported")));
	}

947 948 949 950 951 952
	if (last_used_seq == NULL)
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("lastval is not yet defined in this session")));

	/* Someone may have dropped the sequence since the last nextval() */
953
	if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(last_used_seq->relid)))
954 955 956 957
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("lastval is not yet defined in this session")));

958
	seqrel = open_share_lock(last_used_seq);
959 960

	/* nextval() must have already been called for this sequence */
961
	Assert(last_used_seq->last_valid);
962

963 964
	if (pg_class_aclcheck(last_used_seq->relid, GetUserId(), ACL_SELECT) != ACLCHECK_OK &&
		pg_class_aclcheck(last_used_seq->relid, GetUserId(), ACL_USAGE) != ACLCHECK_OK)
965 966 967 968 969 970 971
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 errmsg("permission denied for sequence %s",
						RelationGetRelationName(seqrel))));

	result = last_used_seq->last;
	relation_close(seqrel, NoLock);
972

973 974 975
	PG_RETURN_INT64(result);
}

B
Bruce Momjian 已提交
976
/*
977 978 979 980
 * Main internal procedure that handles 2 & 3 arg forms of SETVAL.
 *
 * Note that the 3 arg version (which sets the is_called flag) is
 * only for use in pg_dump, and setting the is_called flag may not
B
Bruce Momjian 已提交
981
 * work if multiple users are attached to the database and referencing
982 983
 * the sequence (unlikely if pg_dump is restoring it).
 *
B
Bruce Momjian 已提交
984
 * It is necessary to have the 3 arg version so that pg_dump can
985 986 987 988
 * restore the state of a sequence exactly during data-only restores -
 * it is the only way to clear the is_called flag in an existing
 * sequence.
 */
B
Bruce Momjian 已提交
989
static void
990
do_setval(Oid relid, int64 next, bool iscalled)
M
 
Marc G. Fournier 已提交
991 992
{
	SeqTable	elm;
993
	Relation	seqrel;
994
	Buffer		buf;
995
	HeapTupleData seqtuple;
996
	Form_pg_sequence seq;
M
 
Marc G. Fournier 已提交
997

998 999 1000 1001 1002 1003 1004
	if (Gp_role == GP_ROLE_EXECUTE)
	{
		ereport(ERROR,
				(errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED),
				 errmsg("setval() not supported in this context")));
	}

1005
	/* open and AccessShareLock sequence */
1006
	init_sequence(relid, &elm, &seqrel);
1007 1008

	if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
1009 1010
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1011
				 errmsg("permission denied for sequence %s",
1012
						RelationGetRelationName(seqrel))));
M
 
Marc G. Fournier 已提交
1013

1014
	/* read-only transactions may only modify temp sequences */
A
Asim R P 已提交
1015 1016 1017 1018 1019
	/*
	 * GPDB_91_MERGE_FIXME: if it's possible to get another session's relation
	 * here, this code will not function as expected.
	 */
	if (seqrel->rd_backend != TempRelBackendId)
1020 1021
		PreventCommandIfReadOnly("setval()");

1022
	/* lock page' buffer and read tuple */
1023
	seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
1024
	elm->increment = seq->increment_by;
M
 
Marc G. Fournier 已提交
1025

1026
	if ((next < seq->min_value) || (next > seq->max_value))
1027
	{
B
Bruce Momjian 已提交
1028 1029 1030 1031
		char		bufv[100],
					bufm[100],
					bufx[100];

1032 1033 1034
		snprintf(bufv, sizeof(bufv), INT64_FORMAT, next);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, seq->min_value);
		snprintf(bufx, sizeof(bufx), INT64_FORMAT, seq->max_value);
1035 1036
		ereport(ERROR,
				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1037
				 errmsg("setval: value %s is out of bounds for sequence \"%s\" (%s..%s)",
1038 1039
						bufv, RelationGetRelationName(seqrel),
						bufm, bufx)));
1040
	}
M
 
Marc G. Fournier 已提交
1041

1042 1043 1044 1045 1046 1047 1048 1049 1050
	/* Set the currval() state only if iscalled = true */
	if (iscalled)
	{
		elm->last = next;		/* last returned number */
		elm->last_valid = true;
	}

	/* In any case, forget any future cached numbers */
	elm->cached = elm->last;
M
 
Marc G. Fournier 已提交
1051

1052
	/* ready to change the on-disk (or really, in-buffer) tuple */
1053
	START_CRIT_SECTION();
1054

1055 1056 1057 1058
	seq->last_value = next;		/* last fetched number */
	seq->is_called = iscalled;
	seq->log_cnt = 0;

1059 1060
	MarkBufferDirty(buf);

1061
	/* XLOG stuff */
1062
	if (RelationNeedsWAL(seqrel))
V
Vadim B. Mikheev 已提交
1063 1064 1065
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
B
Bruce Momjian 已提交
1066
		XLogRecData rdata[2];
1067
		Page		page = BufferGetPage(buf);
V
Vadim B. Mikheev 已提交
1068

1069
		xlrec.node = seqrel->rd_node;
B
Bruce Momjian 已提交
1070
		rdata[0].data = (char *) &xlrec;
1071
		rdata[0].len = sizeof(xl_seq_rec);
1072
		rdata[0].buffer = InvalidBuffer;
1073 1074
		rdata[0].next = &(rdata[1]);

1075 1076
		rdata[1].data = (char *) seqtuple.t_data;
		rdata[1].len = seqtuple.t_len;
1077
		rdata[1].buffer = InvalidBuffer;
1078 1079
		rdata[1].next = NULL;

1080
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);
1081 1082

		PageSetLSN(page, recptr);
V
Vadim B. Mikheev 已提交
1083
	}
1084

1085
	END_CRIT_SECTION();
M
 
Marc G. Fournier 已提交
1086

1087
	UnlockReleaseBuffer(buf);
1088 1089

	relation_close(seqrel, NoLock);
1090 1091
}

1092 1093 1094 1095
/*
 * Implement the 2 arg setval procedure.
 * See do_setval for discussion.
 */
1096
Datum
1097
setval_oid(PG_FUNCTION_ARGS)
1098
{
1099
	Oid			relid = PG_GETARG_OID(0);
1100
	int64		next = PG_GETARG_INT64(1);
1101

1102
	do_setval(relid, next, true);
1103

1104
	PG_RETURN_INT64(next);
1105 1106
}

1107 1108 1109 1110
/*
 * Implement the 3 arg setval procedure.
 * See do_setval for discussion.
 */
1111
Datum
1112
setval3_oid(PG_FUNCTION_ARGS)
1113
{
1114
	Oid			relid = PG_GETARG_OID(0);
1115
	int64		next = PG_GETARG_INT64(1);
1116 1117
	bool		iscalled = PG_GETARG_BOOL(2);

1118
	do_setval(relid, next, iscalled);
1119

1120
	PG_RETURN_INT64(next);
M
 
Marc G. Fournier 已提交
1121 1122
}

1123

1124
/*
1125 1126
 * Open the sequence and acquire AccessShareLock if needed
 *
1127
 * If we haven't touched the sequence already in this transaction,
B
Bruce Momjian 已提交
1128
 * we need to acquire AccessShareLock.	We arrange for the lock to
1129 1130 1131
 * be owned by the top transaction, so that we don't need to do it
 * more than once per xact.
 */
1132 1133
static Relation
open_share_lock(SeqTable seq)
1134
{
1135
	LocalTransactionId thislxid = MyProc->lxid;
1136

1137
	/* Get the lock if not already held in this xact */
1138
	if (seq->lxid != thislxid)
1139 1140 1141 1142 1143 1144 1145
	{
		ResourceOwner currentOwner;

		currentOwner = CurrentResourceOwner;
		PG_TRY();
		{
			CurrentResourceOwner = TopTransactionResourceOwner;
1146
			LockRelationOid(seq->relid, AccessShareLock);
1147 1148 1149 1150 1151 1152 1153 1154 1155 1156
		}
		PG_CATCH();
		{
			/* Ensure CurrentResourceOwner is restored on error */
			CurrentResourceOwner = currentOwner;
			PG_RE_THROW();
		}
		PG_END_TRY();
		CurrentResourceOwner = currentOwner;

1157
		/* Flag that we have a lock in the current xact */
1158
		seq->lxid = thislxid;
1159
	}
1160 1161 1162

	/* We now know we have AccessShareLock, and can safely open the rel */
	return relation_open(seq->relid, NoLock);
1163 1164
}

1165
/*
1166
 * Given a relation OID, open and lock the sequence.  p_elm and p_rel are
1167
 * output parameters.
1168 1169
 *
 * GPDB: If p_rel is NULL, the sequence relation is not opened or locked.
1170 1171
 */
static void
1172
init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel)
1173
{
B
Bruce Momjian 已提交
1174
	SeqTable	elm;
1175
	Relation	seqrel;
1176

1177 1178 1179 1180 1181 1182 1183
	/* Look to see if we already have a seqtable entry for relation */
	for (elm = seqtab; elm != NULL; elm = elm->next)
	{
		if (elm->relid == relid)
			break;
	}

1184
	/*
1185
	 * Allocate new seqtable entry if we didn't find one.
1186
	 *
B
Bruce Momjian 已提交
1187 1188 1189
	 * NOTE: seqtable entries remain in the list for the life of a backend. If
	 * the sequence itself is deleted then the entry becomes wasted memory,
	 * but it's small enough that this should not matter.
B
Bruce Momjian 已提交
1190
	 */
1191
	if (elm == NULL)
1192
	{
1193
		/*
B
Bruce Momjian 已提交
1194 1195
		 * Time to make a new seqtable entry.  These entries live as long as
		 * the backend does, so we use plain malloc for them.
1196 1197
		 */
		elm = (SeqTable) malloc(sizeof(SeqTableData));
T
Tom Lane 已提交
1198
		if (elm == NULL)
1199 1200 1201
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
1202
		elm->relid = relid;
1203
		elm->filenode = InvalidOid;
1204
		elm->lxid = InvalidLocalTransactionId;
1205
		elm->last_valid = false;
1206 1207 1208
		elm->last = elm->cached = elm->increment = 0;
		elm->next = seqtab;
		seqtab = elm;
1209 1210
	}

1211 1212 1213
	/*
	 * Open the sequence relation.
	 */
1214 1215 1216
	if (p_rel)
	{
		seqrel = open_share_lock(elm);
1217

1218 1219 1220 1221 1222
		if (seqrel->rd_rel->relkind != RELKIND_SEQUENCE)
			ereport(ERROR,
					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
					 errmsg("\"%s\" is not a sequence",
							RelationGetRelationName(seqrel))));
1223

1224
		*p_rel = seqrel;
A
Asim R P 已提交
1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235

		/*
		 * If the sequence has been transactionally replaced since we last saw it,
		 * discard any cached-but-unissued values.	We do not touch the currval()
		 * state, however.
		 */
		if (seqrel->rd_rel->relfilenode != elm->filenode)
		{
			elm->filenode = seqrel->rd_rel->relfilenode;
			elm->cached = elm->last;
		}
1236
	}
1237 1238

	/* Return results */
1239
	*p_elm = elm;
1240 1241 1242
}


1243 1244 1245 1246 1247 1248 1249 1250 1251
/*
 * Given an opened sequence relation, lock the page buffer and find the tuple
 *
 * *buf receives the reference to the pinned-and-ex-locked buffer
 * *seqtuple receives the reference to the sequence tuple proper
 *		(this arg should point to a local variable of type HeapTupleData)
 *
 * Function's return value points to the data payload of the tuple
 */
1252
static Form_pg_sequence
1253
read_seq_tuple(SeqTable elm, Relation rel, Buffer *buf, HeapTuple seqtuple)
1254
{
1255
	Page		page;
1256 1257 1258
	ItemId		lp;
	sequence_magic *sm;
	Form_pg_sequence seq;
1259

1260 1261 1262
	*buf = ReadBuffer(rel, 0);
	LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);

1263
	page = BufferGetPage(*buf);
1264 1265 1266
	sm = (sequence_magic *) PageGetSpecialPointer(page);

	if (sm->magic != SEQ_MAGIC)
1267 1268
		elog(ERROR, "bad magic number in sequence \"%s\": %08X",
			 RelationGetRelationName(rel), sm->magic);
1269 1270

	lp = PageGetItemId(page, FirstOffsetNumber);
1271
	Assert(ItemIdIsNormal(lp));
1272 1273 1274 1275

	/* Note we currently only bother to set these two fields of *seqtuple */
	seqtuple->t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
	seqtuple->t_len = ItemIdGetLength(lp);
1276

1277
	/*
B
Bruce Momjian 已提交
1278 1279 1280 1281 1282 1283
	 * Previous releases of Postgres neglected to prevent SELECT FOR UPDATE on
	 * a sequence, which would leave a non-frozen XID in the sequence tuple's
	 * xmax, which eventually leads to clog access failures or worse. If we
	 * see this has happened, clean up after it.  We treat this like a hint
	 * bit update, ie, don't bother to WAL-log it, since we can certainly do
	 * this again if the update gets lost.
1284
	 */
1285
	if (HeapTupleHeaderGetXmax(seqtuple->t_data) != InvalidTransactionId)
1286
	{
1287 1288 1289
		HeapTupleHeaderSetXmax(seqtuple->t_data, InvalidTransactionId);
		seqtuple->t_data->t_infomask &= ~HEAP_XMAX_COMMITTED;
		seqtuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
1290
		MarkBufferDirtyHint(*buf);
1291
	}
1292

1293
	seq = (Form_pg_sequence) GETSTRUCT(seqtuple);
1294

1295
	/* this is a handy place to update our copy of the increment */
1296 1297 1298
	elm->increment = seq->increment_by;

	return seq;
1299 1300
}

1301 1302
/*
 * init_params: process the options list of CREATE or ALTER SEQUENCE,
1303 1304
 * and store the values into appropriate fields of *new.  Also set
 * *owned_by to any OWNED BY option, or to NIL if there is none.
1305 1306 1307 1308
 *
 * If isInit is true, fill any unspecified options with default values;
 * otherwise, do not change existing options that aren't explicitly overridden.
 */
1309
static void
1310 1311
init_params(List *options, bool isInit,
			Form_pg_sequence new, List **owned_by)
1312
{
1313 1314
	DefElem    *start_value = NULL;
	DefElem    *restart_value = NULL;
1315 1316 1317 1318
	DefElem    *increment_by = NULL;
	DefElem    *max_value = NULL;
	DefElem    *min_value = NULL;
	DefElem    *cache_value = NULL;
1319
	DefElem    *is_cycled = NULL;
1320
	ListCell   *option;
1321

1322 1323
	*owned_by = NIL;

B
Bruce Momjian 已提交
1324
	foreach(option, options)
1325
	{
1326
		DefElem    *defel = (DefElem *) lfirst(option);
1327

1328
		if (strcmp(defel->defname, "increment") == 0)
1329 1330
		{
			if (increment_by)
1331 1332 1333
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1334
			increment_by = defel;
1335
		}
1336 1337
		else if (strcmp(defel->defname, "start") == 0)
		{
1338
			if (start_value)
1339 1340 1341
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1342
			start_value = defel;
1343 1344
		}
		else if (strcmp(defel->defname, "restart") == 0)
1345
		{
1346
			if (restart_value)
1347 1348 1349
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1350
			restart_value = defel;
1351
		}
1352
		else if (strcmp(defel->defname, "maxvalue") == 0)
1353 1354
		{
			if (max_value)
1355 1356 1357
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1358
			max_value = defel;
1359
		}
1360
		else if (strcmp(defel->defname, "minvalue") == 0)
1361 1362
		{
			if (min_value)
1363 1364 1365
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1366
			min_value = defel;
1367
		}
1368
		else if (strcmp(defel->defname, "cache") == 0)
1369 1370
		{
			if (cache_value)
1371 1372 1373
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1374
			cache_value = defel;
1375
		}
1376
		else if (strcmp(defel->defname, "cycle") == 0)
1377
		{
1378
			if (is_cycled)
1379 1380 1381
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1382
			is_cycled = defel;
1383
		}
1384 1385 1386 1387 1388 1389 1390 1391
		else if (strcmp(defel->defname, "owned_by") == 0)
		{
			if (*owned_by)
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
			*owned_by = defGetQualifiedName(defel);
		}
1392
		else
1393
			elog(ERROR, "option \"%s\" not recognized",
1394 1395 1396
				 defel->defname);
	}

1397 1398 1399 1400 1401 1402 1403
	/*
	 * We must reset log_cnt when isInit or when changing any parameters
	 * that would affect future nextval allocations.
	 */
	if (isInit)
		new->log_cnt = 0;

B
Bruce Momjian 已提交
1404
	/* INCREMENT BY */
1405
	if (increment_by != NULL)
B
Bruce Momjian 已提交
1406 1407
	{
		new->increment_by = defGetInt64(increment_by);
1408 1409 1410
		if (new->increment_by == 0)
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1411
					 errmsg("INCREMENT must not be zero")));
1412
		new->log_cnt = 0;
B
Bruce Momjian 已提交
1413
	}
1414 1415 1416 1417
	else if (isInit)
		new->increment_by = 1;

	/* CYCLE */
1418
	if (is_cycled != NULL)
1419 1420
	{
		new->is_cycled = intVal(is_cycled->arg);
1421
		Assert(BoolIsValid(new->is_cycled));
1422
		new->log_cnt = 0;
1423 1424 1425
	}
	else if (isInit)
		new->is_cycled = false;
1426

1427
	/* MAXVALUE (null arg means NO MAXVALUE) */
1428
	if (max_value != NULL && max_value->arg)
1429
	{
1430
		new->max_value = defGetInt64(max_value);
1431 1432
		new->log_cnt = 0;
	}
1433
	else if (isInit || max_value != NULL)
1434
	{
1435
		if (new->increment_by > 0)
B
Bruce Momjian 已提交
1436
			new->max_value = SEQ_MAXVALUE;		/* ascending seq */
1437
		else
B
Bruce Momjian 已提交
1438
			new->max_value = -1;	/* descending seq */
1439
		new->log_cnt = 0;
1440
	}
1441

1442
	/* MINVALUE (null arg means NO MINVALUE) */
1443
	if (min_value != NULL && min_value->arg)
1444
	{
1445
		new->min_value = defGetInt64(min_value);
1446 1447
		new->log_cnt = 0;
	}
1448
	else if (isInit || min_value != NULL)
1449
	{
1450
		if (new->increment_by > 0)
B
Bruce Momjian 已提交
1451
			new->min_value = 1; /* ascending seq */
1452
		else
B
Bruce Momjian 已提交
1453
			new->min_value = SEQ_MINVALUE;		/* descending seq */
1454
		new->log_cnt = 0;
1455
	}
1456

1457
	/* crosscheck min/max */
1458
	if (new->min_value >= new->max_value)
1459
	{
B
Bruce Momjian 已提交
1460 1461 1462
		char		bufm[100],
					bufx[100];

1463 1464
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->min_value);
		snprintf(bufx, sizeof(bufx), INT64_FORMAT, new->max_value);
1465 1466 1467 1468
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("MINVALUE (%s) must be less than MAXVALUE (%s)",
						bufm, bufx)));
1469
	}
1470

B
Bruce Momjian 已提交
1471
	/* START WITH */
1472 1473
	if (start_value != NULL)
		new->start_value = defGetInt64(start_value);
1474
	else if (isInit)
1475
	{
1476
		if (new->increment_by > 0)
1477
			new->start_value = new->min_value;	/* ascending seq */
1478
		else
1479
			new->start_value = new->max_value;	/* descending seq */
1480
	}
1481

1482 1483
	/* crosscheck START */
	if (new->start_value < new->min_value)
1484
	{
1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507
		char		bufs[100],
					bufm[100];

		snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->start_value);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->min_value);
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("START value (%s) cannot be less than MINVALUE (%s)",
						bufs, bufm)));
	}
	if (new->start_value > new->max_value)
	{
		char		bufs[100],
					bufm[100];

		snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->start_value);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->max_value);
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
			  errmsg("START value (%s) cannot be greater than MAXVALUE (%s)",
					 bufs, bufm)));
	}

1508 1509 1510 1511 1512 1513 1514
	/* RESTART [WITH] */
	if (restart_value != NULL)
	{
		if (restart_value->arg != NULL)
			new->last_value = defGetInt64(restart_value);
		else
			new->last_value = new->start_value;
1515 1516 1517
		new->is_called = false;
		new->log_cnt = 1;
	}
1518
	else if (isInit)
1519
	{
1520
		new->last_value = new->start_value;
1521 1522
		new->is_called = false;
		new->log_cnt = 1;
1523
	}
1524

1525
	/* crosscheck RESTART (or current value, if changing MIN/MAX) */
1526
	if (new->last_value < new->min_value)
1527
	{
B
Bruce Momjian 已提交
1528 1529 1530
		char		bufs[100],
					bufm[100];

1531 1532
		snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->last_value);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->min_value);
1533 1534
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1535 1536
			   errmsg("RESTART value (%s) cannot be less than MINVALUE (%s)",
					  bufs, bufm)));
1537
	}
1538
	if (new->last_value > new->max_value)
1539
	{
B
Bruce Momjian 已提交
1540 1541 1542
		char		bufs[100],
					bufm[100];

1543 1544
		snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->last_value);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->max_value);
1545 1546
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1547 1548
			errmsg("RESTART value (%s) cannot be greater than MAXVALUE (%s)",
				   bufs, bufm)));
1549
	}
1550

B
Bruce Momjian 已提交
1551
	/* CACHE */
1552
	if (cache_value != NULL)
1553
	{
1554 1555 1556 1557
		new->cache_value = defGetInt64(cache_value);
		if (new->cache_value <= 0)
		{
			char		buf[100];
B
Bruce Momjian 已提交
1558

1559 1560 1561 1562 1563 1564
			snprintf(buf, sizeof(buf), INT64_FORMAT, new->cache_value);
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					 errmsg("CACHE (%s) must be greater than zero",
							buf)));
		}
1565
		new->log_cnt = 0;
1566
	}
1567 1568
	else if (isInit)
		new->cache_value = 1;
1569 1570
}

1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594
/*
 * Process an OWNED BY option for CREATE/ALTER SEQUENCE
 *
 * Ownership permissions on the sequence are already checked,
 * but if we are establishing a new owned-by dependency, we must
 * enforce that the referenced table has the same owner and namespace
 * as the sequence.
 */
static void
process_owned_by(Relation seqrel, List *owned_by)
{
	int			nnames;
	Relation	tablerel;
	AttrNumber	attnum;

	nnames = list_length(owned_by);
	Assert(nnames > 0);
	if (nnames == 1)
	{
		/* Must be OWNED BY NONE */
		if (strcmp(strVal(linitial(owned_by)), "none") != 0)
			ereport(ERROR,
					(errcode(ERRCODE_SYNTAX_ERROR),
					 errmsg("invalid OWNED BY option"),
B
Bruce Momjian 已提交
1595
				errhint("Specify OWNED BY table.column or OWNED BY NONE.")));
1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623
		tablerel = NULL;
		attnum = 0;
	}
	else
	{
		List	   *relname;
		char	   *attrname;
		RangeVar   *rel;

		/* Separate relname and attr name */
		relname = list_truncate(list_copy(owned_by), nnames - 1);
		attrname = strVal(lfirst(list_tail(owned_by)));

		/* Open and lock rel to ensure it won't go away meanwhile */
		rel = makeRangeVarFromNameList(relname);
		tablerel = relation_openrv(rel, AccessShareLock);

		/* Must be a regular table */
		if (tablerel->rd_rel->relkind != RELKIND_RELATION)
			ereport(ERROR,
					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
					 errmsg("referenced relation \"%s\" is not a table",
							RelationGetRelationName(tablerel))));

		/* We insist on same owner and schema */
		if (seqrel->rd_rel->relowner != tablerel->rd_rel->relowner)
			ereport(ERROR,
					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
B
Bruce Momjian 已提交
1624
					 errmsg("sequence must have same owner as table it is linked to")));
1625 1626 1627
		if (RelationGetNamespace(seqrel) != RelationGetNamespace(tablerel))
			ereport(ERROR,
					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
P
Peter Eisentraut 已提交
1628
					 errmsg("sequence must be in same schema as table it is linked to")));
1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639

		/* Now, fetch the attribute number from the system cache */
		attnum = get_attnum(RelationGetRelid(tablerel), attrname);
		if (attnum == InvalidAttrNumber)
			ereport(ERROR,
					(errcode(ERRCODE_UNDEFINED_COLUMN),
					 errmsg("column \"%s\" of relation \"%s\" does not exist",
							attrname, RelationGetRelationName(tablerel))));
	}

	/*
B
Bruce Momjian 已提交
1640 1641
	 * OK, we are ready to update pg_depend.  First remove any existing AUTO
	 * dependencies for the sequence, then optionally add a new one.
1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663
	 */
	markSequenceUnowned(RelationGetRelid(seqrel));

	if (tablerel)
	{
		ObjectAddress refobject,
					depobject;

		refobject.classId = RelationRelationId;
		refobject.objectId = RelationGetRelid(tablerel);
		refobject.objectSubId = attnum;
		depobject.classId = RelationRelationId;
		depobject.objectId = RelationGetRelid(seqrel);
		depobject.objectSubId = 0;
		recordDependencyOn(&depobject, &refobject, DEPENDENCY_AUTO);
	}

	/* Done, but hold lock until commit */
	if (tablerel)
		relation_close(tablerel, NoLock);
}

V
Vadim B. Mikheev 已提交
1664

1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677
/*
 * Return sequence parameters, for use by information schema
 */
Datum
pg_sequence_parameters(PG_FUNCTION_ARGS)
{
	Oid			relid = PG_GETARG_OID(0);
	TupleDesc	tupdesc;
	Datum		values[5];
	bool		isnull[5];
	SeqTable	elm;
	Relation	seqrel;
	Buffer		buf;
A
Asim R P 已提交
1678
	HeapTupleData seqtuple;
1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690
	Form_pg_sequence seq;

	/* open and AccessShareLock sequence */
	init_sequence(relid, &elm, &seqrel);

	if (pg_class_aclcheck(relid, GetUserId(), ACL_SELECT | ACL_UPDATE | ACL_USAGE) != ACLCHECK_OK)
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 errmsg("permission denied for sequence %s",
						RelationGetRelationName(seqrel))));

	tupdesc = CreateTemplateTupleDesc(5, false);
1691 1692 1693 1694 1695 1696 1697 1698 1699 1700
	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "start_value",
					   INT8OID, -1, 0);
	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "minimum_value",
					   INT8OID, -1, 0);
	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "maximum_value",
					   INT8OID, -1, 0);
	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "increment",
					   INT8OID, -1, 0);
	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "cycle_option",
					   BOOLOID, -1, 0);
1701 1702 1703 1704 1705

	BlessTupleDesc(tupdesc);

	memset(isnull, 0, sizeof(isnull));

A
Asim R P 已提交
1706
	seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720

	values[0] = Int64GetDatum(seq->start_value);
	values[1] = Int64GetDatum(seq->min_value);
	values[2] = Int64GetDatum(seq->max_value);
	values[3] = Int64GetDatum(seq->increment_by);
	values[4] = BoolGetDatum(seq->is_cycled);

	UnlockReleaseBuffer(buf);
	relation_close(seqrel, NoLock);

	return HeapTupleGetDatum(heap_form_tuple(tupdesc, values, isnull));
}


B
Bruce Momjian 已提交
1721
void
1722
seq_redo(XLogRecPtr beginLoc, XLogRecPtr lsn, XLogRecord *record)
V
Vadim B. Mikheev 已提交
1723
{
B
Bruce Momjian 已提交
1724 1725 1726 1727 1728 1729
	uint8		info = record->xl_info & ~XLR_INFO_MASK;
	Buffer		buffer;
	Page		page;
	char	   *item;
	Size		itemsz;
	xl_seq_rec *xlrec = (xl_seq_rec *) XLogRecGetData(record);
1730
	sequence_magic *sm;
V
Vadim B. Mikheev 已提交
1731

1732 1733 1734
	/* Backup blocks are not used in seq records */
	Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));

1735
	if (info != XLOG_SEQ_LOG)
1736
		elog(PANIC, "seq_redo: unknown op code %u", info);
1737

1738
	buffer = XLogReadBuffer(xlrec->node, 0, true);
1739
	Assert(BufferIsValid(buffer));
V
Vadim B. Mikheev 已提交
1740 1741
	page = (Page) BufferGetPage(buffer);

1742 1743
	/* Always reinit the page and reinstall the magic number */
	/* See comments in DefineSequence */
1744 1745 1746
	PageInit((Page) page, BufferGetPageSize(buffer), sizeof(sequence_magic));
	sm = (sequence_magic *) PageGetSpecialPointer(page);
	sm->magic = SEQ_MAGIC;
V
Vadim B. Mikheev 已提交
1747

B
Bruce Momjian 已提交
1748
	item = (char *) xlrec + sizeof(xl_seq_rec);
1749
	itemsz = record->xl_len - sizeof(xl_seq_rec);
1750

B
Bruce Momjian 已提交
1751
	if (PageAddItem(page, (Item) item, itemsz,
1752
					FirstOffsetNumber, false, false) == InvalidOffsetNumber)
1753
		elog(PANIC, "seq_redo: failed to add item to page");
V
Vadim B. Mikheev 已提交
1754 1755

	PageSetLSN(page, lsn);
1756 1757
	MarkBufferDirty(buffer);
	UnlockReleaseBuffer(buffer);
V
Vadim B. Mikheev 已提交
1758 1759
}

B
Bruce Momjian 已提交
1760
void
1761
seq_desc(StringInfo buf, XLogRecord *record)
V
Vadim B. Mikheev 已提交
1762
{
1763 1764
	uint8		info = record->xl_info & ~XLR_INFO_MASK;
	char		*rec = XLogRecGetData(record);
B
Bruce Momjian 已提交
1765
	xl_seq_rec *xlrec = (xl_seq_rec *) rec;
V
Vadim B. Mikheev 已提交
1766 1767

	if (info == XLOG_SEQ_LOG)
1768
		appendStringInfo(buf, "log: ");
V
Vadim B. Mikheev 已提交
1769 1770
	else
	{
1771
		appendStringInfo(buf, "UNKNOWN");
V
Vadim B. Mikheev 已提交
1772 1773 1774
		return;
	}

1775
	appendStringInfo(buf, "rel %u/%u/%u",
B
Bruce Momjian 已提交
1776
			   xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode);
V
Vadim B. Mikheev 已提交
1777
}
1778 1779 1780 1781 1782 1783 1784 1785 1786 1787


/*
 * Initialize a pseudo relcache entry with just enough info to call bufmgr.
 */
static void
cdb_sequence_relation_init(Relation seqrel,
                           Oid      tablespaceid,
                           Oid      dbid,
                           Oid      relid,
A
Asim R P 已提交
1788
						   char     relpersistence)
1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799
{
    /* See RelationBuildDesc in relcache.c */
    memset(seqrel, 0, sizeof(*seqrel));

    seqrel->rd_smgr = NULL;
    seqrel->rd_refcnt = 99;

    seqrel->rd_id = relid;

	seqrel->rd_rel = (Form_pg_class)palloc0(CLASS_TUPLE_SIZE);
    sprintf(seqrel->rd_rel->relname.data, "pg_class.oid=%d", relid);
A
Asim R P 已提交
1800 1801
	seqrel->rd_rel->relpersistence = relpersistence;
	seqrel->rd_rel->relkind = RELKIND_SEQUENCE;
1802 1803 1804 1805 1806

    /* as in RelationInitPhysicalAddr... */
    seqrel->rd_node.spcNode = tablespaceid;
    seqrel->rd_node.dbNode = dbid;
    seqrel->rd_node.relNode = relid;
A
Asim R P 已提交
1807 1808 1809
	/* GPDB_91_MERGE_FIXME: Do we ever use the seqserver for temp sequences? I think not.. */
	seqrel->rd_backend = InvalidBackendId;

1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855
}                               /* cdb_sequence_relation_init */

/*
 * Clean up pseudo relcache entry.
 */
static void
cdb_sequence_relation_term(Relation seqrel)
{
    /* Close the file. */
    RelationCloseSmgr(seqrel);

    if (seqrel->rd_rel)
        pfree(seqrel->rd_rel);
}                               /* cdb_sequence_relation_term */



/*
 * CDB: forward a nextval request from qExec to the sequence server
 */
void
cdb_sequence_nextval_proxy(Relation	seqrel,
                           int64   *plast,
                           int64   *pcached,
                           int64   *pincrement,
                           bool    *poverflow)
{

	sendSequenceRequest(GetSeqServerFD(),
						seqrel,
    					gp_session_id,
    					plast,
    					pcached,
    					pincrement,
    					poverflow);

}                               /* cdb_sequence_server_nextval */


/*
 * CDB: nextval entry point called by sequence server
 */
void
cdb_sequence_nextval_server(Oid    tablespaceid,
                            Oid    dbid,
                            Oid    relid,
A
Asim R P 已提交
1856
                            char relpersistence,
1857 1858 1859 1860 1861 1862
                            int64 *plast,
                            int64 *pcached,
                            int64 *pincrement,
                            bool  *poverflow)
{
    RelationData    fakerel;
1863
	SeqTable	elm;
1864 1865 1866 1867 1868 1869
	Relation	    seqrel = &fakerel;

    *plast = 0;
    *pcached = 0;
    *pincrement = 0;

1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884
	/*
	 * In Postgres, this method is to find the SeqTable entry for the sequence.
	 * This is not required by sequence server. We only need to initialize
	 * the `elm` which is used later in `cdb_sequence_nextval()`, which
	 * is calling `read_seq_tuple()` method, and require `elm` parameter.
	 *
	 * In GPDB, a sequence server is used to generate unique values for all the sequence.
	 * It doesn't have to lock on the sequence relation, because there will be
	 * only a single instance of sequence server to handle all the requests from
	 * segments to generate the sequence values.
	 * To prevent collision of generating sequence values between 'master'
	 * (e.g.`select nextval(seq)`) and 'segments' (e.g. `insert into table with
	 * serial column`), an BUFFER_LOCK_EXCLUSIVE lock is held on the shared buffer
	 * of the sequence relation.
	 */
1885 1886
	init_sequence(relid, &elm, NULL);

1887 1888
    /* Build a pseudo relcache entry with just enough info to call bufmgr. */
    seqrel = &fakerel;
A
Asim R P 已提交
1889
    cdb_sequence_relation_init(seqrel, tablespaceid, dbid, relid, relpersistence);
1890 1891 1892 1893

    /* CDB TODO: Catch errors. */

    /* Update the sequence object. */
1894
    cdb_sequence_nextval(elm, seqrel, plast, pcached, pincrement, poverflow);
1895 1896 1897 1898

    /* Cleanup. */
    cdb_sequence_relation_term(seqrel);
}                               /* cdb_sequence_server_nextval */
1899

1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929
/*
 * Mask last_value and log_cnt for consistency checking
 *
 * To avoid logging every fetch from a sequence, SEQ_LOG_VALS are pre-logged
 * and thus we need to mask the last_value and log_cnt during consistency
 * checks.
 */
static void
mask_seq_values(Page page)
{
	OffsetNumber 		i;
	OffsetNumber 		maxoff;
	Form_pg_sequence	seqtup;

	maxoff = PageGetMaxOffsetNumber(page);

	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
	{
		HeapTupleData	htup;
		ItemId			iid = PageGetItemId(page, i);

		htup.t_data = (HeapTupleHeader) ((char *) page + ItemIdGetOffset(iid));
		htup.t_len = ItemIdGetLength(iid);

		seqtup = (Form_pg_sequence) GETSTRUCT(&htup);
		MemSet(&seqtup->last_value, 0, sizeof(int64));
		MemSet(&seqtup->log_cnt, 0, sizeof(int64));
	}
}

1930 1931 1932 1933 1934 1935 1936 1937
/*
 * Mask a Sequence page before performing consistency checks on it.
 */
void
seq_mask(char *page, BlockNumber blkno)
{
	mask_page_lsn_and_checksum(page);

1938 1939 1940 1941 1942 1943
	/*
	 * last_value and log_cnt need to be masked to account for SEQ_LOG_VALS
	 * skipped loggings of fetching
	 */
	mask_seq_values(page);

1944 1945
	mask_unused_space(page);
}