sequence.c 47.0 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * sequence.c
4
 *	  PostgreSQL sequences support code.
5
 *
6
 * Portions Copyright (c) 2005-2008, Greenplum inc.
7
 * Portions Copyright (c) 2012-Present Pivotal Software, Inc.
8
 * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
9 10 11 12
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
13
 *	  $PostgreSQL: pgsql/src/backend/commands/sequence.c,v 1.168 2010/02/20 21:24:02 tgl Exp $
14
 *
15 16
 *-------------------------------------------------------------------------
 */
17
#include "postgres.h"
18

19
#include "access/heapam.h"
20
#include "access/bufmask.h"
21 22
#include "access/transam.h"
#include "access/xact.h"
23
#include "access/xlogutils.h"
24
#include "catalog/dependency.h"
25
#include "catalog/heap.h"
26
#include "catalog/namespace.h"
27
#include "catalog/pg_type.h"
28
#include "commands/defrem.h"
29
#include "commands/sequence.h"
30
#include "commands/tablecmds.h"
B
Bruce Momjian 已提交
31
#include "miscadmin.h"
32
#include "storage/smgr.h"               /* RelationCloseSmgr -> smgrclose */
33
#include "nodes/makefuncs.h"
34 35
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
36
#include "storage/proc.h"
37
#include "storage/smgr.h"
38
#include "utils/acl.h"
B
Bruce Momjian 已提交
39
#include "utils/builtins.h"
40
#include "utils/formatting.h"
41
#include "utils/lsyscache.h"
42
#include "utils/resowner.h"
43
#include "utils/syscache.h"
44

45
#include "catalog/oid_dispatch.h"
46
#include "cdb/cdbdisp_query.h"
H
Heikki Linnakangas 已提交
47
#include "cdb/cdbdoublylinked.h"
48 49 50 51 52 53 54
#include "cdb/cdbsrlz.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbmotion.h"
#include "cdb/ml_ipc.h"

#include "postmaster/seqserver.h"

55

V
Vadim B. Mikheev 已提交
56
/*
57
 * We don't want to log each fetching of a value from a sequence,
V
Vadim B. Mikheev 已提交
58
 * so we pre-log a few fetches in advance. In the event of
59
 * crash we can lose (skip over) as many values as we pre-logged.
V
Vadim B. Mikheev 已提交
60
 */
B
Bruce Momjian 已提交
61
#define SEQ_LOG_VALS	32
62

63 64 65 66 67
/*
 * The "special area" of a sequence's buffer page looks like this.
 */
#define SEQ_MAGIC	  0x1717

68 69
typedef struct sequence_magic
{
70
	uint32		magic;
71
} sequence_magic;
72

73 74 75 76 77 78
/*
 * We store a SeqTable item for every sequence we have touched in the current
 * session.  This is needed to hold onto nextval/currval state.  (We can't
 * rely on the relcache, since it's only, well, a cache, and may decide to
 * discard entries.)
 *
B
Bruce Momjian 已提交
79
 * XXX We use linear search to find pre-existing SeqTable entries.	This is
80 81 82
 * good when only a small number of sequences are touched in a session, but
 * would suck with many different sequences.  Perhaps use a hashtable someday.
 */
83 84
typedef struct SeqTableData
{
85 86
	struct SeqTableData *next;	/* link to next SeqTable object */
	Oid			relid;			/* pg_class OID of this sequence */
87
	LocalTransactionId lxid;	/* xact in which we last did a seq op */
88
	bool		last_valid;		/* do we have a valid "last" value? */
89 90 91 92
	int64		last;			/* value last returned by nextval */
	int64		cached;			/* last value already cached for nextval */
	/* if last != cached, we have not used up all the cached values */
	int64		increment;		/* copy of sequence's increment field */
93
	/* note that increment is zero until we first do read_seq_tuple() */
94
} SeqTableData;
95 96 97

typedef SeqTableData *SeqTable;

98
static SeqTable seqtab = NULL;	/* Head of list of SeqTable items */
99

100 101 102 103 104
/*
 * last_used_seq is updated by nextval() to point to the last used
 * sequence.
 */
static SeqTableData *last_used_seq = NULL;
105

106
static int64 nextval_internal(Oid relid);
107
static Relation open_share_lock(SeqTable seq);
108
static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel);
109 110
static Form_pg_sequence read_seq_tuple(SeqTable elm, Relation rel,
			   Buffer *buf, HeapTuple seqtuple);
111
static void init_params(List *options, bool isInit,
B
Bruce Momjian 已提交
112
			Form_pg_sequence new, List **owned_by);
113
static void do_setval(Oid relid, int64 next, bool iscalled);
114
static void process_owned_by(Relation seqrel, List *owned_by);
115
static void mask_seq_values(Page page);
116

117
static void
118 119
cdb_sequence_nextval(SeqTable elm,
					 Relation   seqrel,
120 121 122 123 124 125 126 127 128 129 130
                     int64     *plast,
                     int64     *pcached,
                     int64     *pincrement,
                     bool      *seq_overflow);
static void
cdb_sequence_nextval_proxy(Relation seqrel,
                           int64   *plast,
                           int64   *pcached,
                           int64   *pincrement,
                           bool    *poverflow);

131
/*
B
Bruce Momjian 已提交
132
 * DefineSequence
133
 *				Creates a new sequence relation
134 135
 */
void
136
DefineSequence(CreateSeqStmt *seq)
137
{
138
	FormData_pg_sequence new;
139
	List	   *owned_by;
140
	CreateStmt *stmt = makeNode(CreateStmt);
141
	Oid			seqoid;
142 143
	Relation	rel;
	Buffer		buf;
144
	Page		page;
145
	sequence_magic *sm;
146 147 148
	HeapTuple	tuple;
	TupleDesc	tupDesc;
	Datum		value[SEQ_COL_LASTCOL];
149
	bool		null[SEQ_COL_LASTCOL];
150
	int			i;
151
	NameData	name;
152
	OffsetNumber offnum;
153

154 155
	bool shouldDispatch =  Gp_role == GP_ROLE_DISPATCH && !IsBootstrapProcessingMode();

156
	/* Check and set all option values */
157
	init_params(seq->options, true, &new, &owned_by);
158 159

	/*
160
	 * Create relation (and fill value[] and null[] for the tuple)
161 162 163
	 */
	stmt->tableElts = NIL;
	for (i = SEQ_COL_FIRSTCOL; i <= SEQ_COL_LASTCOL; i++)
164
	{
165
		ColumnDef  *coldef = makeNode(ColumnDef);
166

167 168
		coldef->inhcount = 0;
		coldef->is_local = true;
169
		coldef->is_not_null = true;
170
		coldef->storage = 0;
171 172
		coldef->raw_default = NULL;
		coldef->cooked_default = NULL;
173 174
		coldef->constraints = NIL;

175
		null[i - 1] = false;
176 177 178

		switch (i)
		{
179
			case SEQ_COL_NAME:
180
				coldef->typeName = makeTypeNameFromOid(NAMEOID, -1);
181
				coldef->colname = "sequence_name";
182
				namestrcpy(&name, seq->sequence->relname);
183
				value[i - 1] = NameGetDatum(&name);
184 185
				break;
			case SEQ_COL_LASTVAL:
186
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
187
				coldef->colname = "last_value";
188
				value[i - 1] = Int64GetDatumFast(new.last_value);
189
				break;
190
			case SEQ_COL_STARTVAL:
191
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
192 193 194
				coldef->colname = "start_value";
				value[i - 1] = Int64GetDatumFast(new.start_value);
				break;
195
			case SEQ_COL_INCBY:
196
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
197
				coldef->colname = "increment_by";
198
				value[i - 1] = Int64GetDatumFast(new.increment_by);
199 200
				break;
			case SEQ_COL_MAXVALUE:
201
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
202
				coldef->colname = "max_value";
203
				value[i - 1] = Int64GetDatumFast(new.max_value);
204 205
				break;
			case SEQ_COL_MINVALUE:
206
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
207
				coldef->colname = "min_value";
208
				value[i - 1] = Int64GetDatumFast(new.min_value);
209 210
				break;
			case SEQ_COL_CACHE:
211
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
212
				coldef->colname = "cache_value";
213
				value[i - 1] = Int64GetDatumFast(new.cache_value);
214
				break;
V
Vadim B. Mikheev 已提交
215
			case SEQ_COL_LOG:
216
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
V
Vadim B. Mikheev 已提交
217
				coldef->colname = "log_cnt";
218
				value[i - 1] = Int64GetDatum((int64) 0);
V
Vadim B. Mikheev 已提交
219
				break;
220
			case SEQ_COL_CYCLE:
221
				coldef->typeName = makeTypeNameFromOid(BOOLOID, -1);
222
				coldef->colname = "is_cycled";
223
				value[i - 1] = BoolGetDatum(new.is_cycled);
224 225
				break;
			case SEQ_COL_CALLED:
226
				coldef->typeName = makeTypeNameFromOid(BOOLOID, -1);
227
				coldef->colname = "is_called";
228
				value[i - 1] = BoolGetDatum(false);
229
				break;
230 231 232 233
		}
		stmt->tableElts = lappend(stmt->tableElts, coldef);
	}

234 235
	stmt->relation = seq->sequence;
	stmt->inhRelations = NIL;
236
	stmt->constraints = NIL;
237 238
	stmt->inhOids = NIL;
	stmt->parentOidCount = 0;
B
Bruce Momjian 已提交
239
	stmt->options = list_make1(defWithOids(false));
240
	stmt->oncommit = ONCOMMIT_NOOP;
241
	stmt->tablespacename = NULL;
242 243
	stmt->relKind = RELKIND_SEQUENCE;
	stmt->ownerid = GetUserId();
244

245
	seqoid = DefineRelation(stmt, RELKIND_SEQUENCE, RELSTORAGE_HEAP, false);
246

247 248 249 250 251 252 253 254
	/*
	 * Open and lock the new sequence.  (This lock is redundant; an
	 * AccessExclusiveLock was acquired above by DefineRelation and
	 * won't be released until end of transaction.)
	 *
	 * CDB: Acquire lock on qDisp before dispatching to qExecs, so
	 * qDisp can detect and resolve any deadlocks.
	 */
255
	rel = heap_open(seqoid, AccessExclusiveLock);
256
	tupDesc = RelationGetDescr(rel);
257

258 259 260 261
	/* Now form sequence tuple */
	tuple = heap_form_tuple(tupDesc, value, null);

	/* Initialize first page of relation with special magic number */
262
	buf = ReadBuffer(rel, P_NEW);
263 264
	Assert(BufferGetBlockNumber(buf) == 0);

265
	page = BufferGetPage(buf);
266

267
	PageInit(page, BufferGetPageSize(buf), sizeof(sequence_magic));
268 269 270
	sm = (sequence_magic *) PageGetSpecialPointer(page);
	sm->magic = SEQ_MAGIC;

271 272
	/* Now insert sequence tuple */
	LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
273

274
	/*
275
	 * Since VACUUM does not process sequences, we have to force the tuple
276
	 * to have xmin = FrozenTransactionId now.	Otherwise it would become
B
Bruce Momjian 已提交
277
	 * invisible to SELECTs after 2G transactions.	It is okay to do this
278 279 280
	 * because if the current transaction aborts, no other xact will ever
	 * examine the sequence tuple anyway.
	 *
281
	 */
282

283 284 285 286 287 288
	HeapTupleHeaderSetXmin(tuple->t_data, FrozenTransactionId);
	HeapTupleHeaderSetXminFrozen(tuple->t_data);
	HeapTupleHeaderSetCmin(tuple->t_data, FirstCommandId);
	HeapTupleHeaderSetXmax(tuple->t_data, InvalidTransactionId);
	tuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
	ItemPointerSet(&tuple->t_data->t_ctid, 0, FirstOffsetNumber);
289

290
	START_CRIT_SECTION();
291

292 293
	MarkBufferDirty(buf);

294 295 296 297 298
	offnum = PageAddItem(page, (Item) tuple->t_data, tuple->t_len,
						 InvalidOffsetNumber, false, false);
	if (offnum != FirstOffsetNumber)
		elog(ERROR, "failed to add sequence tuple to page");

299 300
	/* XLOG stuff */
	if (!rel->rd_istemp)
301
	{
302 303 304
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
		XLogRecData rdata[2];
305 306

		xlrec.node = rel->rd_node;
307

308 309
		rdata[0].data = (char *) &xlrec;
		rdata[0].len = sizeof(xl_seq_rec);
310
		rdata[0].buffer = InvalidBuffer;
311 312
		rdata[0].next = &(rdata[1]);

313
		rdata[1].data = (char *) tuple->t_data;
314
		rdata[1].len = tuple->t_len;
315
		rdata[1].buffer = InvalidBuffer;
316 317
		rdata[1].next = NULL;

318
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);
319 320 321

		PageSetLSN(page, recptr);
	}
322

323
	END_CRIT_SECTION();
324

325 326
	UnlockReleaseBuffer(buf);

327 328 329 330
	/* process OWNED BY if given */
	if (owned_by)
		process_owned_by(rel, owned_by);

331
	heap_close(rel, NoLock);
332 333 334 335 336

	
	/* Dispatch to segments */
	if (shouldDispatch)
	{
337 338 339 340
		CdbDispatchUtilityStatement((Node *) seq,
									DF_CANCEL_ON_ERROR|
									DF_WITH_SNAPSHOT|
									DF_NEED_TWO_PHASE,
341
									GetAssignedOidsForDispatch(),
342
									NULL);
343
	}
344 345
}

B
Bruce Momjian 已提交
346 347 348
/*
 * AlterSequence
 *
349
 * Modify the definition of a sequence relation
B
Bruce Momjian 已提交
350 351
 */
void
352
AlterSequence(AlterSeqStmt *stmt)
B
Bruce Momjian 已提交
353
{
354
	Oid			relid;
355 356 357 358 359 360 361 362 363 364 365 366

	/* find sequence */
	relid = RangeVarGetRelid(stmt->sequence, false);

	/* allow ALTER to sequence owner only */
	/* if you change this, see also callers of AlterSequenceInternal! */
	if (!pg_class_ownercheck(relid, GetUserId()))
		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
					   stmt->sequence->relname);

	/* do the work */
	AlterSequenceInternal(relid, stmt->options);
367 368 369 370 371 372 373 374

	if (Gp_role == GP_ROLE_DISPATCH)
		CdbDispatchUtilityStatement((Node *) stmt,
									DF_CANCEL_ON_ERROR|
									DF_WITH_SNAPSHOT|
									DF_NEED_TWO_PHASE,
									NIL,
									NULL);
375 376 377 378 379 380 381 382 383 384
}

/*
 * AlterSequenceInternal
 *
 * Same as AlterSequence except that the sequence is specified by OID
 * and we assume the caller already checked permissions.
 */
void
AlterSequenceInternal(Oid relid, List *options)
B
Bruce Momjian 已提交
385 386 387 388 389 390
{
	SeqTable	elm;
	Relation	seqrel;
	Buffer		buf;
	Form_pg_sequence seq;
	FormData_pg_sequence new;
391
	List	   *owned_by;
392
	HeapTupleData seqtuple;
393
	int64		save_increment;
394
	bool		bSeqIsTemp = false;
395 396 397
	int			numopts	   = 0;
	char	   *alter_subtype = "";		/* metadata tracking: kind of
										   redundant to say "role" */
B
Bruce Momjian 已提交
398 399

	/* open and AccessShareLock sequence */
400
	init_sequence(relid, &elm, &seqrel);
B
Bruce Momjian 已提交
401 402

	/* lock page' buffer and read tuple into new sequence structure */
403 404 405

	/* hack to keep ALTER SEQUENCE OWNED BY from changing currval state */
	save_increment = elm->increment;
406
	
407
	seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
408
	elm->increment = seq->increment_by;
B
Bruce Momjian 已提交
409

410 411
	/* Copy old values of options into workspace */
	memcpy(&new, seq, sizeof(FormData_pg_sequence));
B
Bruce Momjian 已提交
412

413
	/* Check and set new values */
414
	init_params(options, false, &new, &owned_by);
B
Bruce Momjian 已提交
415

416 417 418 419 420 421 422 423
	if (owned_by)
	{
		/* Restore previous state of elm (assume nothing else changes) */
		elm->increment = save_increment;
	}
	else
	{
		/* Clear local cache so that we don't think we have cached numbers */
424 425
		/* Note that we do not change the currval() state */
		elm->cached = elm->last;
426 427
	}

428
	/* Now okay to update the on-disk tuple */
B
Bruce Momjian 已提交
429 430
	START_CRIT_SECTION();

431 432
	memcpy(seq, &new, sizeof(FormData_pg_sequence));

433 434
	MarkBufferDirty(buf);

B
Bruce Momjian 已提交
435
	/* XLOG stuff */
436 437 438 439

	bSeqIsTemp = seqrel->rd_istemp;

	if (!bSeqIsTemp)
B
Bruce Momjian 已提交
440 441 442 443
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
		XLogRecData rdata[2];
444
		Page		page = BufferGetPage(buf);
B
Bruce Momjian 已提交
445 446

		xlrec.node = seqrel->rd_node;
447

B
Bruce Momjian 已提交
448 449
		rdata[0].data = (char *) &xlrec;
		rdata[0].len = sizeof(xl_seq_rec);
450
		rdata[0].buffer = InvalidBuffer;
B
Bruce Momjian 已提交
451 452
		rdata[0].next = &(rdata[1]);

453 454
		rdata[1].data = (char *) seqtuple.t_data;
		rdata[1].len = seqtuple.t_len;
455
		rdata[1].buffer = InvalidBuffer;
B
Bruce Momjian 已提交
456 457
		rdata[1].next = NULL;

458
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);
B
Bruce Momjian 已提交
459 460 461 462 463 464

		PageSetLSN(page, recptr);
	}

	END_CRIT_SECTION();

465
	UnlockReleaseBuffer(buf);
B
Bruce Momjian 已提交
466

467 468 469 470
	/* process OWNED BY if given */
	if (owned_by)
		process_owned_by(seqrel, owned_by);

B
Bruce Momjian 已提交
471
	relation_close(seqrel, NoLock);
472

473
	numopts = list_length(options);
474 475 476 477 478 479 480 481 482 483 484 485 486 487 488

	if (numopts > 1)
	{
		char allopts[NAMEDATALEN];

		sprintf(allopts, "%d OPTIONS", numopts);

		alter_subtype = pstrdup(allopts);
	}
	else if (0 == numopts)
	{
		alter_subtype = "0 OPTIONS";
	}
	else if ((Gp_role == GP_ROLE_DISPATCH) && (!bSeqIsTemp))
	{
489
		ListCell		*option = list_head(options);
490 491 492 493 494 495 496 497 498 499 500 501
		DefElem			*defel	= (DefElem *) lfirst(option);
		char			*tempo	= NULL;

		alter_subtype = defel->defname;
		if (0 == strcmp(alter_subtype, "owned_by"))
			alter_subtype = "OWNED BY";

		tempo = str_toupper(alter_subtype, strlen(alter_subtype));

		alter_subtype = tempo;
	}

502
	if (Gp_role == GP_ROLE_DISPATCH && !bSeqIsTemp)
503
	{
504 505 506 507 508
		/* MPP-6929: metadata tracking */
		MetaTrackUpdObject(RelationRelationId,
						   relid,
						   GetUserId(),
						   "ALTER", alter_subtype);
509
	}
B
Bruce Momjian 已提交
510 511
}

512

513 514 515 516 517
/*
 * Note: nextval with a text argument is no longer exported as a pg_proc
 * entry, but we keep it around to ease porting of C code that may have
 * called the function directly.
 */
518 519
Datum
nextval(PG_FUNCTION_ARGS)
520
{
521
	text	   *seqin = PG_GETARG_TEXT_P(0);
522
	RangeVar   *sequence;
523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541
	Oid			relid;

	sequence = makeRangeVarFromNameList(textToQualifiedNameList(seqin));
	relid = RangeVarGetRelid(sequence, false);

	PG_RETURN_INT64(nextval_internal(relid));
}

Datum
nextval_oid(PG_FUNCTION_ARGS)
{
	Oid			relid = PG_GETARG_OID(0);

	PG_RETURN_INT64(nextval_internal(relid));
}

static int64
nextval_internal(Oid relid)
{
542
	SeqTable	elm;
543
	Relation	seqrel;
544 545 546 547 548
	bool is_overflow = false;

	/* open and AccessShareLock sequence */
	init_sequence(relid, &elm, &seqrel);

549 550 551 552
	/* read-only transactions may only modify temp sequences */
	if (!seqrel->rd_islocaltemp)
		PreventCommandIfReadOnly("nextval()");

553 554
	if (elm->last != elm->cached)		/* some numbers were cached */
	{
555 556
		Assert(elm->last_valid);
		Assert(elm->increment != 0);
557 558
		elm->last += elm->increment;
		relation_close(seqrel, NoLock);
559
		last_used_seq = elm;
560 561 562 563 564 565 566 567 568
		return elm->last;
	}

	if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 errmsg("permission denied for sequence %s",
						RelationGetRelationName(seqrel))));

569 570 571 572 573 574 575 576 577
	/* Update the sequence object. */
	if (Gp_role == GP_ROLE_EXECUTE)
		cdb_sequence_nextval_proxy(seqrel,
								   &elm->last,
								   &elm->cached,
								   &elm->increment,
								   &is_overflow);
	else
		cdb_sequence_nextval(elm,
578
							 seqrel,
579 580 581 582
							 &elm->last,
							 &elm->cached,
							 &elm->increment,
							 &is_overflow);
583 584
	last_used_seq = elm;

585 586
	if (is_overflow)
	{
587 588
		char	   *relname = pstrdup(RelationGetRelationName(seqrel));

589
		relation_close(seqrel, NoLock);
590

591 592 593
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("nextval: reached %s value of sequence \"%s\" (" INT64_FORMAT ")",
594
                        elm->increment>0 ? "maximum":"minimum",
595
                        relname, elm->last)));
596 597 598
	}
	else
		elm->last_valid = true;
599 600

	relation_close(seqrel, NoLock);
601 602 603 604
	return elm->last;
}


605 606 607
static void
cdb_sequence_nextval(SeqTable elm,
					 Relation   seqrel,
608 609 610 611 612
                     int64     *plast,
                     int64     *pcached,
                     int64     *pincrement,
                     bool      *poverflow)
{
613
	Buffer		buf;
614
	Page		page;
615
	HeapTupleData seqtuple;
616
	Form_pg_sequence seq;
617
	int64		incby,
618 619
				maxv,
				minv,
V
Vadim B. Mikheev 已提交
620 621 622 623
				cache,
				log,
				fetch,
				last;
624
	int64		result,
625 626
				next,
				rescnt = 0;
627
	bool 		have_overflow = false;
V
Vadim B. Mikheev 已提交
628
	bool		logit = false;
629

630
	/* lock page' buffer and read tuple */
631
	seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
632
	page = BufferGetPage(buf);
633

V
Vadim B. Mikheev 已提交
634
	last = next = result = seq->last_value;
635 636 637
	incby = seq->increment_by;
	maxv = seq->max_value;
	minv = seq->min_value;
V
Vadim B. Mikheev 已提交
638 639
	fetch = cache = seq->cache_value;
	log = seq->log_cnt;
640

641
	if (!seq->is_called)
V
Vadim B. Mikheev 已提交
642
	{
643
		rescnt++;				/* return last_value if not is_called */
V
Vadim B. Mikheev 已提交
644 645
		fetch--;
	}
646

647
	/*
B
Bruce Momjian 已提交
648 649 650
	 * Decide whether we should emit a WAL log record.	If so, force up the
	 * fetch count to grab SEQ_LOG_VALS more values than we actually need to
	 * cache.  (These will then be usable without logging.)
651
	 *
652 653 654 655
	 * If this is the first nextval after a checkpoint, we must force a new
	 * WAL record to be written anyway, else replay starting from the
	 * checkpoint would fail to advance the sequence past the logged values.
	 * In this case we may as well fetch extra values.
656
	 */
657
	if (log < fetch || !seq->is_called)
V
Vadim B. Mikheev 已提交
658
	{
659 660
		/* forced log to satisfy local demand for values */
		fetch = log = fetch + SEQ_LOG_VALS;
V
Vadim B. Mikheev 已提交
661 662
		logit = true;
	}
663 664 665 666 667 668 669 670 671 672 673
	else
	{
		XLogRecPtr	redoptr = GetRedoRecPtr();

		if (XLByteLE(PageGetLSN(page), redoptr))
		{
			/* last update of seq was before checkpoint */
			fetch = log = fetch + SEQ_LOG_VALS;
			logit = true;
		}
	}
V
Vadim B. Mikheev 已提交
674

B
Bruce Momjian 已提交
675
	while (fetch)				/* try to fetch cache [+ log ] numbers */
676
	{
677
		/*
B
Bruce Momjian 已提交
678 679
		 * Check MAXVALUE for ascending sequences and MINVALUE for descending
		 * sequences
680
		 */
681
		if (incby > 0)
682
		{
683
			/* ascending sequence */
684 685 686 687
			if ((maxv >= 0 && next > maxv - incby) ||
				(maxv < 0 && next + incby > maxv))
			{
				if (rescnt > 0)
V
Vadim B. Mikheev 已提交
688
					break;		/* stop fetching */
689
				if (!seq->is_cycled)
690
				{
691 692 693 694 695
					have_overflow = true;
				}
				else
				{
					next = minv;
696
				}
697 698 699 700 701 702
			}
			else
				next += incby;
		}
		else
		{
703
			/* descending sequence */
704 705 706 707
			if ((minv < 0 && next < minv - incby) ||
				(minv >= 0 && next + incby < minv))
			{
				if (rescnt > 0)
V
Vadim B. Mikheev 已提交
708
					break;		/* stop fetching */
709
				if (!seq->is_cycled)
710
				{
711 712 713 714 715
					have_overflow = true;
				}
				else
				{
					next = maxv;
716
				}
717 718 719 720
			}
			else
				next += incby;
		}
V
Vadim B. Mikheev 已提交
721 722 723 724 725 726
		fetch--;
		if (rescnt < cache)
		{
			log--;
			rescnt++;
			last = next;
B
Bruce Momjian 已提交
727 728
			if (rescnt == 1)	/* if it's first result - */
				result = next;	/* it's what to return */
V
Vadim B. Mikheev 已提交
729
		}
730 731
	}

732 733 734
	log -= fetch;				/* adjust for any unfetched numbers */
	Assert(log >= 0);

735 736 737 738 739
    /* set results for caller */
	*poverflow = have_overflow; /* has the sequence overflown */
    *plast = result;            /* last returned number */
    *pcached = last;            /* last fetched number */
	*pincrement = incby;
V
Vadim B. Mikheev 已提交
740

741
	/* ready to change the on-disk (or really, in-buffer) tuple */
742
	START_CRIT_SECTION();
743

744 745 746 747 748 749 750 751 752
	/*
	 * We must mark the buffer dirty before doing XLogInsert(); see notes in
	 * SyncOneBuffer().  However, we don't apply the desired changes just yet.
	 * This looks like a violation of the buffer update protocol, but it is
	 * in fact safe because we hold exclusive lock on the buffer.  Any other
	 * process, including a checkpoint, that tries to examine the buffer
	 * contents will block until we release the lock, and then will see the
	 * final state that we install below.
	 */
753 754
	MarkBufferDirty(buf);

755 756
	/* XLOG stuff */
	if (logit && !seqrel->rd_istemp)
V
Vadim B. Mikheev 已提交
757 758 759
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
B
Bruce Momjian 已提交
760
		XLogRecData rdata[2];
V
Vadim B. Mikheev 已提交
761

762 763 764 765 766 767
		/*
		 * We don't log the current state of the tuple, but rather the state
		 * as it would appear after "log" more fetches.  This lets us skip
		 * that many future WAL records, at the cost that we lose those
		 * sequence values if we crash.
		 */
768

769
		/* set values that will be saved in xlog */
770
		seq->last_value = next;
771
		seq->is_called = true;
772
		seq->log_cnt = 0;
773

774
		xlrec.node = seqrel->rd_node;
B
Bruce Momjian 已提交
775
		rdata[0].data = (char *) &xlrec;
776
		rdata[0].len = sizeof(xl_seq_rec);
777
		rdata[0].buffer = InvalidBuffer;
778 779
		rdata[0].next = &(rdata[1]);

780 781
		rdata[1].data = (char *) seqtuple.t_data;
		rdata[1].len = seqtuple.t_len;
782
		rdata[1].buffer = InvalidBuffer;
783 784
		rdata[1].next = NULL;

785
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);
V
Vadim B. Mikheev 已提交
786

787
		PageSetLSN(page, recptr);
788 789 790 791 792 793 794 795 796 797 798 799 800

		/* need to update where we've inserted to into shmem so that the QD can flush it
		 * when necessary
		 */
		LWLockAcquire(SeqServerControlLock, LW_EXCLUSIVE);

		if (XLByteLT(seqServerCtl->lastXlogEntry, recptr))
		{
			seqServerCtl->lastXlogEntry.xlogid = recptr.xlogid;
			seqServerCtl->lastXlogEntry.xrecoff = recptr.xrecoff;
		}

		LWLockRelease(SeqServerControlLock);
V
Vadim B. Mikheev 已提交
801
	}
802

803
	/* Now update sequence tuple to the intended final state */
V
Vadim B. Mikheev 已提交
804
	seq->last_value = last;		/* last fetched number */
805
	seq->is_called = true;
V
Vadim B. Mikheev 已提交
806
	seq->log_cnt = log;			/* how much is logged */
807

808
	END_CRIT_SECTION();
809

810
	UnlockReleaseBuffer(buf);
811
}                               /* cdb_sequence_nextval */
812

813

814
Datum
815
currval_oid(PG_FUNCTION_ARGS)
816
{
817 818
	Oid			relid = PG_GETARG_OID(0);
	int64		result;
819
	SeqTable	elm;
820
	Relation	seqrel;
821

822 823 824 825 826 827 828 829
	/* For now, strictly forbidden on MPP. */
	if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE)
	{
		ereport(ERROR,
				(errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED),
				 errmsg("currval() not supported")));
	}

V
Vadim B. Mikheev 已提交
830
	/* open and AccessShareLock sequence */
831
	init_sequence(relid, &elm, &seqrel);
832

833 834
	if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_SELECT) != ACLCHECK_OK &&
		pg_class_aclcheck(elm->relid, GetUserId(), ACL_USAGE) != ACLCHECK_OK)
835 836
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
837
				 errmsg("permission denied for sequence %s",
838
						RelationGetRelationName(seqrel))));
839

840
	if (!elm->last_valid)
841 842
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
843
				 errmsg("currval of sequence \"%s\" is not yet defined in this session",
844
						RelationGetRelationName(seqrel))));
845 846 847

	result = elm->last;

848 849
	relation_close(seqrel, NoLock);

850
	PG_RETURN_INT64(result);
851 852
}

853 854 855 856 857 858
Datum
lastval(PG_FUNCTION_ARGS)
{
	Relation	seqrel;
	int64		result;

859 860 861 862 863 864 865 866
	/* For now, strictly forbidden on MPP. */
	if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE)
	{
		ereport(ERROR,
				(errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED),
				 errmsg("lastval() not supported")));
	}

867 868 869 870 871 872
	if (last_used_seq == NULL)
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("lastval is not yet defined in this session")));

	/* Someone may have dropped the sequence since the last nextval() */
873
	if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(last_used_seq->relid)))
874 875 876 877
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("lastval is not yet defined in this session")));

878
	seqrel = open_share_lock(last_used_seq);
879 880

	/* nextval() must have already been called for this sequence */
881
	Assert(last_used_seq->last_valid);
882

883 884
	if (pg_class_aclcheck(last_used_seq->relid, GetUserId(), ACL_SELECT) != ACLCHECK_OK &&
		pg_class_aclcheck(last_used_seq->relid, GetUserId(), ACL_USAGE) != ACLCHECK_OK)
885 886 887 888 889 890 891
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 errmsg("permission denied for sequence %s",
						RelationGetRelationName(seqrel))));

	result = last_used_seq->last;
	relation_close(seqrel, NoLock);
892

893 894 895
	PG_RETURN_INT64(result);
}

B
Bruce Momjian 已提交
896
/*
897 898 899 900
 * Main internal procedure that handles 2 & 3 arg forms of SETVAL.
 *
 * Note that the 3 arg version (which sets the is_called flag) is
 * only for use in pg_dump, and setting the is_called flag may not
B
Bruce Momjian 已提交
901
 * work if multiple users are attached to the database and referencing
902 903
 * the sequence (unlikely if pg_dump is restoring it).
 *
B
Bruce Momjian 已提交
904
 * It is necessary to have the 3 arg version so that pg_dump can
905 906 907 908
 * restore the state of a sequence exactly during data-only restores -
 * it is the only way to clear the is_called flag in an existing
 * sequence.
 */
B
Bruce Momjian 已提交
909
static void
910
do_setval(Oid relid, int64 next, bool iscalled)
M
 
Marc G. Fournier 已提交
911 912
{
	SeqTable	elm;
913
	Relation	seqrel;
914
	Buffer		buf;
915
	HeapTupleData seqtuple;
916
	Form_pg_sequence seq;
M
 
Marc G. Fournier 已提交
917

918 919 920 921 922 923 924
	if (Gp_role == GP_ROLE_EXECUTE)
	{
		ereport(ERROR,
				(errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED),
				 errmsg("setval() not supported in this context")));
	}

925
	/* open and AccessShareLock sequence */
926
	init_sequence(relid, &elm, &seqrel);
927 928

	if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
929 930
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
931
				 errmsg("permission denied for sequence %s",
932
						RelationGetRelationName(seqrel))));
M
 
Marc G. Fournier 已提交
933

934 935 936 937
	/* read-only transactions may only modify temp sequences */
	if (!seqrel->rd_islocaltemp)
		PreventCommandIfReadOnly("setval()");

938
	/* lock page' buffer and read tuple */
939
	seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
940
	elm->increment = seq->increment_by;
M
 
Marc G. Fournier 已提交
941

942
	if ((next < seq->min_value) || (next > seq->max_value))
943
	{
B
Bruce Momjian 已提交
944 945 946 947
		char		bufv[100],
					bufm[100],
					bufx[100];

948 949 950
		snprintf(bufv, sizeof(bufv), INT64_FORMAT, next);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, seq->min_value);
		snprintf(bufx, sizeof(bufx), INT64_FORMAT, seq->max_value);
951 952
		ereport(ERROR,
				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
953
				 errmsg("setval: value %s is out of bounds for sequence \"%s\" (%s..%s)",
954 955
						bufv, RelationGetRelationName(seqrel),
						bufm, bufx)));
956
	}
M
 
Marc G. Fournier 已提交
957

958 959 960 961 962 963 964 965 966
	/* Set the currval() state only if iscalled = true */
	if (iscalled)
	{
		elm->last = next;		/* last returned number */
		elm->last_valid = true;
	}

	/* In any case, forget any future cached numbers */
	elm->cached = elm->last;
M
 
Marc G. Fournier 已提交
967

968
	/* ready to change the on-disk (or really, in-buffer) tuple */
969
	START_CRIT_SECTION();
970

971 972 973 974
	seq->last_value = next;		/* last fetched number */
	seq->is_called = iscalled;
	seq->log_cnt = 0;

975 976
	MarkBufferDirty(buf);

977 978
	/* XLOG stuff */
	if (!seqrel->rd_istemp)
V
Vadim B. Mikheev 已提交
979 980 981
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
B
Bruce Momjian 已提交
982
		XLogRecData rdata[2];
983
		Page		page = BufferGetPage(buf);
V
Vadim B. Mikheev 已提交
984

985
		xlrec.node = seqrel->rd_node;
986

B
Bruce Momjian 已提交
987
		rdata[0].data = (char *) &xlrec;
988
		rdata[0].len = sizeof(xl_seq_rec);
989
		rdata[0].buffer = InvalidBuffer;
990 991
		rdata[0].next = &(rdata[1]);

992 993
		rdata[1].data = (char *) seqtuple.t_data;
		rdata[1].len = seqtuple.t_len;
994
		rdata[1].buffer = InvalidBuffer;
995 996
		rdata[1].next = NULL;

997
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);
998 999

		PageSetLSN(page, recptr);
V
Vadim B. Mikheev 已提交
1000
	}
1001

1002
	END_CRIT_SECTION();
M
 
Marc G. Fournier 已提交
1003

1004
	UnlockReleaseBuffer(buf);
1005
	relation_close(seqrel, NoLock);
1006 1007
}

1008 1009 1010 1011
/*
 * Implement the 2 arg setval procedure.
 * See do_setval for discussion.
 */
1012
Datum
1013
setval_oid(PG_FUNCTION_ARGS)
1014
{
1015
	Oid			relid = PG_GETARG_OID(0);
1016
	int64		next = PG_GETARG_INT64(1);
1017

1018
	do_setval(relid, next, true);
1019

1020
	PG_RETURN_INT64(next);
1021 1022
}

1023 1024 1025 1026
/*
 * Implement the 3 arg setval procedure.
 * See do_setval for discussion.
 */
1027
Datum
1028
setval3_oid(PG_FUNCTION_ARGS)
1029
{
1030
	Oid			relid = PG_GETARG_OID(0);
1031
	int64		next = PG_GETARG_INT64(1);
1032 1033
	bool		iscalled = PG_GETARG_BOOL(2);

1034
	do_setval(relid, next, iscalled);
1035

1036
	PG_RETURN_INT64(next);
M
 
Marc G. Fournier 已提交
1037 1038
}

1039

1040
/*
1041 1042
 * Open the sequence and acquire AccessShareLock if needed
 *
1043
 * If we haven't touched the sequence already in this transaction,
B
Bruce Momjian 已提交
1044
 * we need to acquire AccessShareLock.	We arrange for the lock to
1045 1046 1047
 * be owned by the top transaction, so that we don't need to do it
 * more than once per xact.
 */
1048 1049
static Relation
open_share_lock(SeqTable seq)
1050
{
1051
	LocalTransactionId thislxid = MyProc->lxid;
1052

1053
	/* Get the lock if not already held in this xact */
1054
	if (seq->lxid != thislxid)
1055 1056 1057 1058 1059 1060 1061
	{
		ResourceOwner currentOwner;

		currentOwner = CurrentResourceOwner;
		PG_TRY();
		{
			CurrentResourceOwner = TopTransactionResourceOwner;
1062
			LockRelationOid(seq->relid, AccessShareLock);
1063 1064 1065 1066 1067 1068 1069 1070 1071 1072
		}
		PG_CATCH();
		{
			/* Ensure CurrentResourceOwner is restored on error */
			CurrentResourceOwner = currentOwner;
			PG_RE_THROW();
		}
		PG_END_TRY();
		CurrentResourceOwner = currentOwner;

1073
		/* Flag that we have a lock in the current xact */
1074
		seq->lxid = thislxid;
1075
	}
1076 1077 1078

	/* We now know we have AccessShareLock, and can safely open the rel */
	return relation_open(seq->relid, NoLock);
1079 1080
}

1081
/*
1082
 * Given a relation OID, open and lock the sequence.  p_elm and p_rel are
1083
 * output parameters.
1084 1085
 *
 * GPDB: If p_rel is NULL, the sequence relation is not opened or locked.
1086 1087
 */
static void
1088
init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel)
1089
{
B
Bruce Momjian 已提交
1090
	SeqTable	elm;
1091
	Relation	seqrel;
1092

1093 1094 1095 1096 1097 1098 1099
	/* Look to see if we already have a seqtable entry for relation */
	for (elm = seqtab; elm != NULL; elm = elm->next)
	{
		if (elm->relid == relid)
			break;
	}

1100
	/*
1101
	 * Allocate new seqtable entry if we didn't find one.
1102
	 *
B
Bruce Momjian 已提交
1103 1104 1105
	 * NOTE: seqtable entries remain in the list for the life of a backend. If
	 * the sequence itself is deleted then the entry becomes wasted memory,
	 * but it's small enough that this should not matter.
B
Bruce Momjian 已提交
1106
	 */
1107
	if (elm == NULL)
1108
	{
1109
		/*
B
Bruce Momjian 已提交
1110 1111
		 * Time to make a new seqtable entry.  These entries live as long as
		 * the backend does, so we use plain malloc for them.
1112 1113
		 */
		elm = (SeqTable) malloc(sizeof(SeqTableData));
T
Tom Lane 已提交
1114
		if (elm == NULL)
1115 1116 1117
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
1118
		elm->relid = relid;
1119
		elm->lxid = InvalidLocalTransactionId;
1120
		elm->last_valid = false;
1121 1122 1123
		elm->last = elm->cached = elm->increment = 0;
		elm->next = seqtab;
		seqtab = elm;
1124 1125
	}

1126 1127 1128
	/*
	 * Open the sequence relation.
	 */
1129 1130 1131
	if (p_rel)
	{
		seqrel = open_share_lock(elm);
1132

1133 1134 1135 1136 1137
		if (seqrel->rd_rel->relkind != RELKIND_SEQUENCE)
			ereport(ERROR,
					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
					 errmsg("\"%s\" is not a sequence",
							RelationGetRelationName(seqrel))));
1138

1139 1140
		*p_rel = seqrel;
	}
1141
	*p_elm = elm;
1142 1143 1144
}


1145 1146 1147 1148 1149 1150 1151 1152 1153
/*
 * Given an opened sequence relation, lock the page buffer and find the tuple
 *
 * *buf receives the reference to the pinned-and-ex-locked buffer
 * *seqtuple receives the reference to the sequence tuple proper
 *		(this arg should point to a local variable of type HeapTupleData)
 *
 * Function's return value points to the data payload of the tuple
 */
1154
static Form_pg_sequence
1155
read_seq_tuple(SeqTable elm, Relation rel, Buffer *buf, HeapTuple seqtuple)
1156
{
1157
	Page		page;
1158 1159 1160
	ItemId		lp;
	sequence_magic *sm;
	Form_pg_sequence seq;
1161

1162 1163 1164
	*buf = ReadBuffer(rel, 0);
	LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);

1165
	page = BufferGetPage(*buf);
1166 1167 1168
	sm = (sequence_magic *) PageGetSpecialPointer(page);

	if (sm->magic != SEQ_MAGIC)
1169 1170
		elog(ERROR, "bad magic number in sequence \"%s\": %08X",
			 RelationGetRelationName(rel), sm->magic);
1171 1172

	lp = PageGetItemId(page, FirstOffsetNumber);
1173
	Assert(ItemIdIsNormal(lp));
1174 1175 1176 1177

	/* Note we currently only bother to set these two fields of *seqtuple */
	seqtuple->t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
	seqtuple->t_len = ItemIdGetLength(lp);
1178

1179 1180 1181 1182 1183 1184 1185 1186
	/*
	 * Previous releases of Postgres neglected to prevent SELECT FOR UPDATE
	 * on a sequence, which would leave a non-frozen XID in the sequence
	 * tuple's xmax, which eventually leads to clog access failures or worse.
	 * If we see this has happened, clean up after it.  We treat this like a
	 * hint bit update, ie, don't bother to WAL-log it, since we can certainly
	 * do this again if the update gets lost.
	 */
1187
	if (HeapTupleHeaderGetXmax(seqtuple->t_data) != InvalidTransactionId)
1188
	{
1189 1190 1191
		HeapTupleHeaderSetXmax(seqtuple->t_data, InvalidTransactionId);
		seqtuple->t_data->t_infomask &= ~HEAP_XMAX_COMMITTED;
		seqtuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
1192
		MarkBufferDirtyHint(*buf);
1193
	}
1194

1195
	seq = (Form_pg_sequence) GETSTRUCT(seqtuple);
1196

1197
	/* this is a handy place to update our copy of the increment */
1198 1199 1200
	elm->increment = seq->increment_by;

	return seq;
1201 1202
}

1203 1204
/*
 * init_params: process the options list of CREATE or ALTER SEQUENCE,
1205 1206
 * and store the values into appropriate fields of *new.  Also set
 * *owned_by to any OWNED BY option, or to NIL if there is none.
1207 1208 1209 1210
 *
 * If isInit is true, fill any unspecified options with default values;
 * otherwise, do not change existing options that aren't explicitly overridden.
 */
1211
static void
1212 1213
init_params(List *options, bool isInit,
			Form_pg_sequence new, List **owned_by)
1214
{
1215 1216
	DefElem    *start_value = NULL;
	DefElem    *restart_value = NULL;
1217 1218 1219 1220
	DefElem    *increment_by = NULL;
	DefElem    *max_value = NULL;
	DefElem    *min_value = NULL;
	DefElem    *cache_value = NULL;
1221
	DefElem    *is_cycled = NULL;
1222
	ListCell   *option;
1223

1224 1225
	*owned_by = NIL;

B
Bruce Momjian 已提交
1226
	foreach(option, options)
1227
	{
1228
		DefElem    *defel = (DefElem *) lfirst(option);
1229

1230
		if (strcmp(defel->defname, "increment") == 0)
1231 1232
		{
			if (increment_by)
1233 1234 1235
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1236
			increment_by = defel;
1237
		}
1238 1239
		else if (strcmp(defel->defname, "start") == 0)
		{
1240
			if (start_value)
1241 1242 1243
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1244
			start_value = defel;
1245 1246
		}
		else if (strcmp(defel->defname, "restart") == 0)
1247
		{
1248
			if (restart_value)
1249 1250 1251
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1252
			restart_value = defel;
1253
		}
1254
		else if (strcmp(defel->defname, "maxvalue") == 0)
1255 1256
		{
			if (max_value)
1257 1258 1259
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1260
			max_value = defel;
1261
		}
1262
		else if (strcmp(defel->defname, "minvalue") == 0)
1263 1264
		{
			if (min_value)
1265 1266 1267
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1268
			min_value = defel;
1269
		}
1270
		else if (strcmp(defel->defname, "cache") == 0)
1271 1272
		{
			if (cache_value)
1273 1274 1275
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1276
			cache_value = defel;
1277
		}
1278
		else if (strcmp(defel->defname, "cycle") == 0)
1279
		{
1280
			if (is_cycled)
1281 1282 1283
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1284
			is_cycled = defel;
1285
		}
1286 1287 1288 1289 1290 1291 1292 1293
		else if (strcmp(defel->defname, "owned_by") == 0)
		{
			if (*owned_by)
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
			*owned_by = defGetQualifiedName(defel);
		}
1294
		else
1295
			elog(ERROR, "option \"%s\" not recognized",
1296 1297 1298
				 defel->defname);
	}

1299 1300 1301 1302 1303 1304 1305
	/*
	 * We must reset log_cnt when isInit or when changing any parameters
	 * that would affect future nextval allocations.
	 */
	if (isInit)
		new->log_cnt = 0;

B
Bruce Momjian 已提交
1306
	/* INCREMENT BY */
1307
	if (increment_by != NULL)
B
Bruce Momjian 已提交
1308 1309
	{
		new->increment_by = defGetInt64(increment_by);
1310 1311 1312
		if (new->increment_by == 0)
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1313
					 errmsg("INCREMENT must not be zero")));
1314
		new->log_cnt = 0;
B
Bruce Momjian 已提交
1315
	}
1316 1317 1318 1319
	else if (isInit)
		new->increment_by = 1;

	/* CYCLE */
1320
	if (is_cycled != NULL)
1321 1322 1323
	{
		new->is_cycled = intVal(is_cycled->arg);
		Assert(new->is_cycled == false || new->is_cycled == true);
1324
		new->log_cnt = 0;
1325 1326 1327
	}
	else if (isInit)
		new->is_cycled = false;
1328

1329
	/* MAXVALUE (null arg means NO MAXVALUE) */
1330
	if (max_value != NULL && max_value->arg)
1331
	{
1332
		new->max_value = defGetInt64(max_value);
1333 1334
		new->log_cnt = 0;
	}
1335
	else if (isInit || max_value != NULL)
1336
	{
1337
		if (new->increment_by > 0)
B
Bruce Momjian 已提交
1338
			new->max_value = SEQ_MAXVALUE;		/* ascending seq */
1339
		else
B
Bruce Momjian 已提交
1340
			new->max_value = -1;	/* descending seq */
1341
		new->log_cnt = 0;
1342
	}
1343

1344
	/* MINVALUE (null arg means NO MINVALUE) */
1345
	if (min_value != NULL && min_value->arg)
1346
	{
1347
		new->min_value = defGetInt64(min_value);
1348 1349
		new->log_cnt = 0;
	}
1350
	else if (isInit || min_value != NULL)
1351
	{
1352
		if (new->increment_by > 0)
B
Bruce Momjian 已提交
1353
			new->min_value = 1; /* ascending seq */
1354
		else
B
Bruce Momjian 已提交
1355
			new->min_value = SEQ_MINVALUE;		/* descending seq */
1356
		new->log_cnt = 0;
1357
	}
1358

1359
	/* crosscheck min/max */
1360
	if (new->min_value >= new->max_value)
1361
	{
B
Bruce Momjian 已提交
1362 1363 1364
		char		bufm[100],
					bufx[100];

1365 1366
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->min_value);
		snprintf(bufx, sizeof(bufx), INT64_FORMAT, new->max_value);
1367 1368 1369 1370
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("MINVALUE (%s) must be less than MAXVALUE (%s)",
						bufm, bufx)));
1371
	}
1372

B
Bruce Momjian 已提交
1373
	/* START WITH */
1374 1375
	if (start_value != NULL)
		new->start_value = defGetInt64(start_value);
1376
	else if (isInit)
1377
	{
1378
		if (new->increment_by > 0)
1379
			new->start_value = new->min_value;	/* ascending seq */
1380
		else
1381
			new->start_value = new->max_value;	/* descending seq */
1382
	}
1383

1384 1385
	/* crosscheck START */
	if (new->start_value < new->min_value)
1386
	{
1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409
		char		bufs[100],
					bufm[100];

		snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->start_value);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->min_value);
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("START value (%s) cannot be less than MINVALUE (%s)",
						bufs, bufm)));
	}
	if (new->start_value > new->max_value)
	{
		char		bufs[100],
					bufm[100];

		snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->start_value);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->max_value);
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
			  errmsg("START value (%s) cannot be greater than MAXVALUE (%s)",
					 bufs, bufm)));
	}

1410 1411 1412 1413 1414 1415 1416
	/* RESTART [WITH] */
	if (restart_value != NULL)
	{
		if (restart_value->arg != NULL)
			new->last_value = defGetInt64(restart_value);
		else
			new->last_value = new->start_value;
1417 1418 1419
		new->is_called = false;
		new->log_cnt = 1;
	}
1420
	else if (isInit)
1421
	{
1422
		new->last_value = new->start_value;
1423 1424
		new->is_called = false;
		new->log_cnt = 1;
1425
	}
1426

1427
	/* crosscheck RESTART (or current value, if changing MIN/MAX) */
1428
	if (new->last_value < new->min_value)
1429
	{
B
Bruce Momjian 已提交
1430 1431 1432
		char		bufs[100],
					bufm[100];

1433 1434
		snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->last_value);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->min_value);
1435 1436
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1437 1438
			   errmsg("RESTART value (%s) cannot be less than MINVALUE (%s)",
					  bufs, bufm)));
1439
	}
1440
	if (new->last_value > new->max_value)
1441
	{
B
Bruce Momjian 已提交
1442 1443 1444
		char		bufs[100],
					bufm[100];

1445 1446
		snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->last_value);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->max_value);
1447 1448
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1449 1450
			errmsg("RESTART value (%s) cannot be greater than MAXVALUE (%s)",
				   bufs, bufm)));
1451
	}
1452

B
Bruce Momjian 已提交
1453
	/* CACHE */
1454
	if (cache_value != NULL)
1455
	{
1456 1457 1458 1459
		new->cache_value = defGetInt64(cache_value);
		if (new->cache_value <= 0)
		{
			char		buf[100];
B
Bruce Momjian 已提交
1460

1461 1462 1463 1464 1465 1466
			snprintf(buf, sizeof(buf), INT64_FORMAT, new->cache_value);
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					 errmsg("CACHE (%s) must be greater than zero",
							buf)));
		}
1467
		new->log_cnt = 0;
1468
	}
1469 1470
	else if (isInit)
		new->cache_value = 1;
1471 1472
}

1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496
/*
 * Process an OWNED BY option for CREATE/ALTER SEQUENCE
 *
 * Ownership permissions on the sequence are already checked,
 * but if we are establishing a new owned-by dependency, we must
 * enforce that the referenced table has the same owner and namespace
 * as the sequence.
 */
static void
process_owned_by(Relation seqrel, List *owned_by)
{
	int			nnames;
	Relation	tablerel;
	AttrNumber	attnum;

	nnames = list_length(owned_by);
	Assert(nnames > 0);
	if (nnames == 1)
	{
		/* Must be OWNED BY NONE */
		if (strcmp(strVal(linitial(owned_by)), "none") != 0)
			ereport(ERROR,
					(errcode(ERRCODE_SYNTAX_ERROR),
					 errmsg("invalid OWNED BY option"),
B
Bruce Momjian 已提交
1497
				errhint("Specify OWNED BY table.column or OWNED BY NONE.")));
1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525
		tablerel = NULL;
		attnum = 0;
	}
	else
	{
		List	   *relname;
		char	   *attrname;
		RangeVar   *rel;

		/* Separate relname and attr name */
		relname = list_truncate(list_copy(owned_by), nnames - 1);
		attrname = strVal(lfirst(list_tail(owned_by)));

		/* Open and lock rel to ensure it won't go away meanwhile */
		rel = makeRangeVarFromNameList(relname);
		tablerel = relation_openrv(rel, AccessShareLock);

		/* Must be a regular table */
		if (tablerel->rd_rel->relkind != RELKIND_RELATION)
			ereport(ERROR,
					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
					 errmsg("referenced relation \"%s\" is not a table",
							RelationGetRelationName(tablerel))));

		/* We insist on same owner and schema */
		if (seqrel->rd_rel->relowner != tablerel->rd_rel->relowner)
			ereport(ERROR,
					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
B
Bruce Momjian 已提交
1526
					 errmsg("sequence must have same owner as table it is linked to")));
1527 1528 1529
		if (RelationGetNamespace(seqrel) != RelationGetNamespace(tablerel))
			ereport(ERROR,
					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
P
Peter Eisentraut 已提交
1530
					 errmsg("sequence must be in same schema as table it is linked to")));
1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541

		/* Now, fetch the attribute number from the system cache */
		attnum = get_attnum(RelationGetRelid(tablerel), attrname);
		if (attnum == InvalidAttrNumber)
			ereport(ERROR,
					(errcode(ERRCODE_UNDEFINED_COLUMN),
					 errmsg("column \"%s\" of relation \"%s\" does not exist",
							attrname, RelationGetRelationName(tablerel))));
	}

	/*
B
Bruce Momjian 已提交
1542 1543
	 * OK, we are ready to update pg_depend.  First remove any existing AUTO
	 * dependencies for the sequence, then optionally add a new one.
1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565
	 */
	markSequenceUnowned(RelationGetRelid(seqrel));

	if (tablerel)
	{
		ObjectAddress refobject,
					depobject;

		refobject.classId = RelationRelationId;
		refobject.objectId = RelationGetRelid(tablerel);
		refobject.objectSubId = attnum;
		depobject.classId = RelationRelationId;
		depobject.objectId = RelationGetRelid(seqrel);
		depobject.objectSubId = 0;
		recordDependencyOn(&depobject, &refobject, DEPENDENCY_AUTO);
	}

	/* Done, but hold lock until commit */
	if (tablerel)
		relation_close(tablerel, NoLock);
}

V
Vadim B. Mikheev 已提交
1566

B
Bruce Momjian 已提交
1567
void
1568
seq_redo(XLogRecPtr beginLoc, XLogRecPtr lsn, XLogRecord *record)
V
Vadim B. Mikheev 已提交
1569
{
B
Bruce Momjian 已提交
1570 1571 1572 1573 1574 1575
	uint8		info = record->xl_info & ~XLR_INFO_MASK;
	Buffer		buffer;
	Page		page;
	char	   *item;
	Size		itemsz;
	xl_seq_rec *xlrec = (xl_seq_rec *) XLogRecGetData(record);
1576
	sequence_magic *sm;
V
Vadim B. Mikheev 已提交
1577

1578 1579 1580
	/* Backup blocks are not used in seq records */
	Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));

1581
	if (info != XLOG_SEQ_LOG)
1582
		elog(PANIC, "seq_redo: unknown op code %u", info);
1583

1584
	buffer = XLogReadBuffer(xlrec->node, 0, true);
1585
	Assert(BufferIsValid(buffer));
V
Vadim B. Mikheev 已提交
1586 1587
	page = (Page) BufferGetPage(buffer);

1588 1589
	/* Always reinit the page and reinstall the magic number */
	/* See comments in DefineSequence */
1590 1591 1592
	PageInit((Page) page, BufferGetPageSize(buffer), sizeof(sequence_magic));
	sm = (sequence_magic *) PageGetSpecialPointer(page);
	sm->magic = SEQ_MAGIC;
V
Vadim B. Mikheev 已提交
1593

B
Bruce Momjian 已提交
1594
	item = (char *) xlrec + sizeof(xl_seq_rec);
1595
	itemsz = record->xl_len - sizeof(xl_seq_rec);
1596

B
Bruce Momjian 已提交
1597
	if (PageAddItem(page, (Item) item, itemsz,
1598
					FirstOffsetNumber, false, false) == InvalidOffsetNumber)
1599
		elog(PANIC, "seq_redo: failed to add item to page");
V
Vadim B. Mikheev 已提交
1600 1601

	PageSetLSN(page, lsn);
1602 1603
	MarkBufferDirty(buffer);
	UnlockReleaseBuffer(buffer);
V
Vadim B. Mikheev 已提交
1604 1605
}

B
Bruce Momjian 已提交
1606
void
1607
seq_desc(StringInfo buf, XLogRecPtr beginLoc, XLogRecord *record)
V
Vadim B. Mikheev 已提交
1608
{
1609 1610
	uint8		info = record->xl_info & ~XLR_INFO_MASK;
	char		*rec = XLogRecGetData(record);
B
Bruce Momjian 已提交
1611
	xl_seq_rec *xlrec = (xl_seq_rec *) rec;
V
Vadim B. Mikheev 已提交
1612 1613

	if (info == XLOG_SEQ_LOG)
1614
		appendStringInfo(buf, "log: ");
V
Vadim B. Mikheev 已提交
1615 1616
	else
	{
1617
		appendStringInfo(buf, "UNKNOWN");
V
Vadim B. Mikheev 已提交
1618 1619 1620
		return;
	}

1621
	appendStringInfo(buf, "rel %u/%u/%u",
B
Bruce Momjian 已提交
1622
			   xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode);
V
Vadim B. Mikheev 已提交
1623
}
1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707


/*
 * Initialize a pseudo relcache entry with just enough info to call bufmgr.
 */
static void
cdb_sequence_relation_init(Relation seqrel,
                           Oid      tablespaceid,
                           Oid      dbid,
                           Oid      relid,
                           bool     istemp)
{
    /* See RelationBuildDesc in relcache.c */
    memset(seqrel, 0, sizeof(*seqrel));

    seqrel->rd_smgr = NULL;
    seqrel->rd_refcnt = 99;

    seqrel->rd_id = relid;
    seqrel->rd_istemp = istemp;

    /* Must use shared buffer pool so seqserver & QDs can see the data. */
    seqrel->rd_isLocalBuf = false;

	seqrel->rd_rel = (Form_pg_class)palloc0(CLASS_TUPLE_SIZE);
    sprintf(seqrel->rd_rel->relname.data, "pg_class.oid=%d", relid);

    /* as in RelationInitPhysicalAddr... */
    seqrel->rd_node.spcNode = tablespaceid;
    seqrel->rd_node.dbNode = dbid;
    seqrel->rd_node.relNode = relid;
}                               /* cdb_sequence_relation_init */

/*
 * Clean up pseudo relcache entry.
 */
static void
cdb_sequence_relation_term(Relation seqrel)
{
    /* Close the file. */
    RelationCloseSmgr(seqrel);

    if (seqrel->rd_rel)
        pfree(seqrel->rd_rel);
}                               /* cdb_sequence_relation_term */



/*
 * CDB: forward a nextval request from qExec to the sequence server
 */
void
cdb_sequence_nextval_proxy(Relation	seqrel,
                           int64   *plast,
                           int64   *pcached,
                           int64   *pincrement,
                           bool    *poverflow)
{

	sendSequenceRequest(GetSeqServerFD(),
						seqrel,
    					gp_session_id,
    					plast,
    					pcached,
    					pincrement,
    					poverflow);

}                               /* cdb_sequence_server_nextval */


/*
 * CDB: nextval entry point called by sequence server
 */
void
cdb_sequence_nextval_server(Oid    tablespaceid,
                            Oid    dbid,
                            Oid    relid,
                            bool   istemp,
                            int64 *plast,
                            int64 *pcached,
                            int64 *pincrement,
                            bool  *poverflow)
{
    RelationData    fakerel;
1708
	SeqTable	elm;
1709 1710 1711 1712 1713 1714
	Relation	    seqrel = &fakerel;

    *plast = 0;
    *pcached = 0;
    *pincrement = 0;

1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729
	/*
	 * In Postgres, this method is to find the SeqTable entry for the sequence.
	 * This is not required by sequence server. We only need to initialize
	 * the `elm` which is used later in `cdb_sequence_nextval()`, which
	 * is calling `read_seq_tuple()` method, and require `elm` parameter.
	 *
	 * In GPDB, a sequence server is used to generate unique values for all the sequence.
	 * It doesn't have to lock on the sequence relation, because there will be
	 * only a single instance of sequence server to handle all the requests from
	 * segments to generate the sequence values.
	 * To prevent collision of generating sequence values between 'master'
	 * (e.g.`select nextval(seq)`) and 'segments' (e.g. `insert into table with
	 * serial column`), an BUFFER_LOCK_EXCLUSIVE lock is held on the shared buffer
	 * of the sequence relation.
	 */
1730 1731
	init_sequence(relid, &elm, NULL);

1732 1733 1734 1735 1736 1737 1738
    /* Build a pseudo relcache entry with just enough info to call bufmgr. */
    seqrel = &fakerel;
    cdb_sequence_relation_init(seqrel, tablespaceid, dbid, relid, istemp);

    /* CDB TODO: Catch errors. */

    /* Update the sequence object. */
1739
    cdb_sequence_nextval(elm, seqrel, plast, pcached, pincrement, poverflow);
1740 1741 1742 1743

    /* Cleanup. */
    cdb_sequence_relation_term(seqrel);
}                               /* cdb_sequence_server_nextval */
1744

1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774
/*
 * Mask last_value and log_cnt for consistency checking
 *
 * To avoid logging every fetch from a sequence, SEQ_LOG_VALS are pre-logged
 * and thus we need to mask the last_value and log_cnt during consistency
 * checks.
 */
static void
mask_seq_values(Page page)
{
	OffsetNumber 		i;
	OffsetNumber 		maxoff;
	Form_pg_sequence	seqtup;

	maxoff = PageGetMaxOffsetNumber(page);

	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
	{
		HeapTupleData	htup;
		ItemId			iid = PageGetItemId(page, i);

		htup.t_data = (HeapTupleHeader) ((char *) page + ItemIdGetOffset(iid));
		htup.t_len = ItemIdGetLength(iid);

		seqtup = (Form_pg_sequence) GETSTRUCT(&htup);
		MemSet(&seqtup->last_value, 0, sizeof(int64));
		MemSet(&seqtup->log_cnt, 0, sizeof(int64));
	}
}

1775 1776 1777 1778 1779 1780 1781 1782
/*
 * Mask a Sequence page before performing consistency checks on it.
 */
void
seq_mask(char *page, BlockNumber blkno)
{
	mask_page_lsn_and_checksum(page);

1783 1784 1785 1786 1787 1788
	/*
	 * last_value and log_cnt need to be masked to account for SEQ_LOG_VALS
	 * skipped loggings of fetching
	 */
	mask_seq_values(page);

1789 1790
	mask_unused_space(page);
}