sequence.c 46.7 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * sequence.c
4
 *	  PostgreSQL sequences support code.
5
 *
6
 * Portions Copyright (c) 2005-2008, Greenplum inc.
7
 * Portions Copyright (c) 2012-Present Pivotal Software, Inc.
B
Bruce Momjian 已提交
8
 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
9 10 11 12
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
13
 *	  $PostgreSQL: pgsql/src/backend/commands/sequence.c,v 1.162 2009/10/13 00:53:07 tgl Exp $
14
 *
15 16
 *-------------------------------------------------------------------------
 */
17
#include "postgres.h"
18

19
#include "access/heapam.h"
20
#include "access/bufmask.h"
21 22
#include "access/transam.h"
#include "access/xact.h"
23
#include "access/xlogutils.h"
24
#include "catalog/dependency.h"
25
#include "catalog/heap.h"
26
#include "catalog/namespace.h"
27
#include "catalog/pg_type.h"
28
#include "commands/defrem.h"
29
#include "commands/sequence.h"
30
#include "commands/tablecmds.h"
B
Bruce Momjian 已提交
31
#include "miscadmin.h"
32
#include "storage/smgr.h"               /* RelationCloseSmgr -> smgrclose */
33
#include "nodes/makefuncs.h"
34 35
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
36
#include "storage/proc.h"
37
#include "utils/acl.h"
B
Bruce Momjian 已提交
38
#include "utils/builtins.h"
39
#include "utils/formatting.h"
40
#include "utils/lsyscache.h"
41
#include "utils/resowner.h"
42
#include "utils/syscache.h"
43

44
#include "catalog/oid_dispatch.h"
45
#include "cdb/cdbdisp_query.h"
H
Heikki Linnakangas 已提交
46
#include "cdb/cdbdoublylinked.h"
47 48 49 50 51 52 53
#include "cdb/cdbsrlz.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbmotion.h"
#include "cdb/ml_ipc.h"

#include "postmaster/seqserver.h"

54

V
Vadim B. Mikheev 已提交
55
/*
56
 * We don't want to log each fetching of a value from a sequence,
V
Vadim B. Mikheev 已提交
57
 * so we pre-log a few fetches in advance. In the event of
58
 * crash we can lose (skip over) as many values as we pre-logged.
V
Vadim B. Mikheev 已提交
59
 */
B
Bruce Momjian 已提交
60
#define SEQ_LOG_VALS	32
61

62 63 64 65 66
/*
 * The "special area" of a sequence's buffer page looks like this.
 */
#define SEQ_MAGIC	  0x1717

67 68
typedef struct sequence_magic
{
69
	uint32		magic;
70
} sequence_magic;
71

72 73 74 75 76 77
/*
 * We store a SeqTable item for every sequence we have touched in the current
 * session.  This is needed to hold onto nextval/currval state.  (We can't
 * rely on the relcache, since it's only, well, a cache, and may decide to
 * discard entries.)
 *
B
Bruce Momjian 已提交
78
 * XXX We use linear search to find pre-existing SeqTable entries.	This is
79 80 81
 * good when only a small number of sequences are touched in a session, but
 * would suck with many different sequences.  Perhaps use a hashtable someday.
 */
82 83
typedef struct SeqTableData
{
84 85
	struct SeqTableData *next;	/* link to next SeqTable object */
	Oid			relid;			/* pg_class OID of this sequence */
86
	LocalTransactionId lxid;	/* xact in which we last did a seq op */
87
	bool		last_valid;		/* do we have a valid "last" value? */
88 89 90 91
	int64		last;			/* value last returned by nextval */
	int64		cached;			/* last value already cached for nextval */
	/* if last != cached, we have not used up all the cached values */
	int64		increment;		/* copy of sequence's increment field */
92
	/* note that increment is zero until we first do read_seq_tuple() */
93
} SeqTableData;
94 95 96

typedef SeqTableData *SeqTable;

97
static SeqTable seqtab = NULL;	/* Head of list of SeqTable items */
98

99 100 101 102 103
/*
 * last_used_seq is updated by nextval() to point to the last used
 * sequence.
 */
static SeqTableData *last_used_seq = NULL;
104

105
static int64 nextval_internal(Oid relid);
106
static Relation open_share_lock(SeqTable seq);
107
static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel);
108 109
static Form_pg_sequence read_seq_tuple(SeqTable elm, Relation rel,
			   Buffer *buf, HeapTuple seqtuple);
110
static void init_params(List *options, bool isInit,
B
Bruce Momjian 已提交
111
			Form_pg_sequence new, List **owned_by);
112
static void do_setval(Oid relid, int64 next, bool iscalled);
113
static void process_owned_by(Relation seqrel, List *owned_by);
114
static void mask_seq_values(Page page);
115

116
static void
117 118
cdb_sequence_nextval(SeqTable elm,
					 Relation   seqrel,
119 120 121 122 123 124 125 126 127 128 129
                     int64     *plast,
                     int64     *pcached,
                     int64     *pincrement,
                     bool      *seq_overflow);
static void
cdb_sequence_nextval_proxy(Relation seqrel,
                           int64   *plast,
                           int64   *pcached,
                           int64   *pincrement,
                           bool    *poverflow);

130
/*
B
Bruce Momjian 已提交
131
 * DefineSequence
132
 *				Creates a new sequence relation
133 134
 */
void
135
DefineSequence(CreateSeqStmt *seq)
136
{
137
	FormData_pg_sequence new;
138
	List	   *owned_by;
139
	CreateStmt *stmt = makeNode(CreateStmt);
140
	Oid			seqoid;
141 142
	Relation	rel;
	Buffer		buf;
143
	Page		page;
144
	sequence_magic *sm;
145 146 147
	HeapTuple	tuple;
	TupleDesc	tupDesc;
	Datum		value[SEQ_COL_LASTCOL];
148
	bool		null[SEQ_COL_LASTCOL];
149
	int			i;
150
	NameData	name;
151
	OffsetNumber offnum;
152

153 154
	bool shouldDispatch =  Gp_role == GP_ROLE_DISPATCH && !IsBootstrapProcessingMode();

155
	/* Check and set all option values */
156
	init_params(seq->options, true, &new, &owned_by);
157 158

	/*
159
	 * Create relation (and fill value[] and null[] for the tuple)
160 161 162
	 */
	stmt->tableElts = NIL;
	for (i = SEQ_COL_FIRSTCOL; i <= SEQ_COL_LASTCOL; i++)
163
	{
164
		ColumnDef  *coldef = makeNode(ColumnDef);
165

166 167
		coldef->inhcount = 0;
		coldef->is_local = true;
168
		coldef->is_not_null = true;
169
		coldef->storage = 0;
170 171
		coldef->raw_default = NULL;
		coldef->cooked_default = NULL;
172 173
		coldef->constraints = NIL;

174
		null[i - 1] = false;
175 176 177

		switch (i)
		{
178
			case SEQ_COL_NAME:
179
				coldef->typeName = makeTypeNameFromOid(NAMEOID, -1);
180
				coldef->colname = "sequence_name";
181
				namestrcpy(&name, seq->sequence->relname);
182
				value[i - 1] = NameGetDatum(&name);
183 184
				break;
			case SEQ_COL_LASTVAL:
185
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
186
				coldef->colname = "last_value";
187
				value[i - 1] = Int64GetDatumFast(new.last_value);
188
				break;
189
			case SEQ_COL_STARTVAL:
190
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
191 192 193
				coldef->colname = "start_value";
				value[i - 1] = Int64GetDatumFast(new.start_value);
				break;
194
			case SEQ_COL_INCBY:
195
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
196
				coldef->colname = "increment_by";
197
				value[i - 1] = Int64GetDatumFast(new.increment_by);
198 199
				break;
			case SEQ_COL_MAXVALUE:
200
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
201
				coldef->colname = "max_value";
202
				value[i - 1] = Int64GetDatumFast(new.max_value);
203 204
				break;
			case SEQ_COL_MINVALUE:
205
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
206
				coldef->colname = "min_value";
207
				value[i - 1] = Int64GetDatumFast(new.min_value);
208 209
				break;
			case SEQ_COL_CACHE:
210
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
211
				coldef->colname = "cache_value";
212
				value[i - 1] = Int64GetDatumFast(new.cache_value);
213
				break;
V
Vadim B. Mikheev 已提交
214
			case SEQ_COL_LOG:
215
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
V
Vadim B. Mikheev 已提交
216
				coldef->colname = "log_cnt";
217
				value[i - 1] = Int64GetDatum((int64) 0);
V
Vadim B. Mikheev 已提交
218
				break;
219
			case SEQ_COL_CYCLE:
220
				coldef->typeName = makeTypeNameFromOid(BOOLOID, -1);
221
				coldef->colname = "is_cycled";
222
				value[i - 1] = BoolGetDatum(new.is_cycled);
223 224
				break;
			case SEQ_COL_CALLED:
225
				coldef->typeName = makeTypeNameFromOid(BOOLOID, -1);
226
				coldef->colname = "is_called";
227
				value[i - 1] = BoolGetDatum(false);
228
				break;
229 230 231 232
		}
		stmt->tableElts = lappend(stmt->tableElts, coldef);
	}

233 234
	stmt->relation = seq->sequence;
	stmt->inhRelations = NIL;
235
	stmt->constraints = NIL;
236 237
	stmt->inhOids = NIL;
	stmt->parentOidCount = 0;
B
Bruce Momjian 已提交
238
	stmt->options = list_make1(defWithOids(false));
239
	stmt->oncommit = ONCOMMIT_NOOP;
240
	stmt->tablespacename = NULL;
241 242
	stmt->relKind = RELKIND_SEQUENCE;
	stmt->ownerid = GetUserId();
243

244
	seqoid = DefineRelation(stmt, RELKIND_SEQUENCE, RELSTORAGE_HEAP, false);
245

246 247 248 249 250 251 252 253
	/*
	 * Open and lock the new sequence.  (This lock is redundant; an
	 * AccessExclusiveLock was acquired above by DefineRelation and
	 * won't be released until end of transaction.)
	 *
	 * CDB: Acquire lock on qDisp before dispatching to qExecs, so
	 * qDisp can detect and resolve any deadlocks.
	 */
254
	rel = heap_open(seqoid, AccessExclusiveLock);
255
	tupDesc = RelationGetDescr(rel);
256

257 258 259 260
	/* Now form sequence tuple */
	tuple = heap_form_tuple(tupDesc, value, null);

	/* Initialize first page of relation with special magic number */
261
	buf = ReadBuffer(rel, P_NEW);
262 263
	Assert(BufferGetBlockNumber(buf) == 0);

264
	page = BufferGetPage(buf);
265

266
	PageInit(page, BufferGetPageSize(buf), sizeof(sequence_magic));
267 268 269
	sm = (sequence_magic *) PageGetSpecialPointer(page);
	sm->magic = SEQ_MAGIC;

270 271
	/* Now insert sequence tuple */
	LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
272

273
	/*
274
	 * Since VACUUM does not process sequences, we have to force the tuple
275
	 * to have xmin = FrozenTransactionId now.	Otherwise it would become
B
Bruce Momjian 已提交
276
	 * invisible to SELECTs after 2G transactions.	It is okay to do this
277 278 279
	 * because if the current transaction aborts, no other xact will ever
	 * examine the sequence tuple anyway.
	 *
280
	 */
281

282 283 284 285 286 287
	HeapTupleHeaderSetXmin(tuple->t_data, FrozenTransactionId);
	HeapTupleHeaderSetXminFrozen(tuple->t_data);
	HeapTupleHeaderSetCmin(tuple->t_data, FirstCommandId);
	HeapTupleHeaderSetXmax(tuple->t_data, InvalidTransactionId);
	tuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
	ItemPointerSet(&tuple->t_data->t_ctid, 0, FirstOffsetNumber);
288

289
	START_CRIT_SECTION();
290

291 292
	MarkBufferDirty(buf);

293 294 295 296 297
	offnum = PageAddItem(page, (Item) tuple->t_data, tuple->t_len,
						 InvalidOffsetNumber, false, false);
	if (offnum != FirstOffsetNumber)
		elog(ERROR, "failed to add sequence tuple to page");

298 299
	/* XLOG stuff */
	if (!rel->rd_istemp)
300
	{
301 302 303
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
		XLogRecData rdata[2];
304 305

		xlrec.node = rel->rd_node;
306

307 308
		rdata[0].data = (char *) &xlrec;
		rdata[0].len = sizeof(xl_seq_rec);
309
		rdata[0].buffer = InvalidBuffer;
310 311
		rdata[0].next = &(rdata[1]);

312
		rdata[1].data = (char *) tuple->t_data;
313
		rdata[1].len = tuple->t_len;
314
		rdata[1].buffer = InvalidBuffer;
315 316
		rdata[1].next = NULL;

317
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);
318 319 320

		PageSetLSN(page, recptr);
	}
321

322
	END_CRIT_SECTION();
323

324 325
	UnlockReleaseBuffer(buf);

326 327 328 329
	/* process OWNED BY if given */
	if (owned_by)
		process_owned_by(rel, owned_by);

330
	heap_close(rel, NoLock);
331 332 333 334 335

	
	/* Dispatch to segments */
	if (shouldDispatch)
	{
336 337 338 339
		CdbDispatchUtilityStatement((Node *) seq,
									DF_CANCEL_ON_ERROR|
									DF_WITH_SNAPSHOT|
									DF_NEED_TWO_PHASE,
340
									GetAssignedOidsForDispatch(),
341
									NULL);
342
	}
343 344
}

B
Bruce Momjian 已提交
345 346 347
/*
 * AlterSequence
 *
348
 * Modify the definition of a sequence relation
B
Bruce Momjian 已提交
349 350
 */
void
351
AlterSequence(AlterSeqStmt *stmt)
B
Bruce Momjian 已提交
352
{
353
	Oid			relid;
354 355 356 357 358 359 360 361 362 363 364 365

	/* find sequence */
	relid = RangeVarGetRelid(stmt->sequence, false);

	/* allow ALTER to sequence owner only */
	/* if you change this, see also callers of AlterSequenceInternal! */
	if (!pg_class_ownercheck(relid, GetUserId()))
		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
					   stmt->sequence->relname);

	/* do the work */
	AlterSequenceInternal(relid, stmt->options);
366 367 368 369 370 371 372 373

	if (Gp_role == GP_ROLE_DISPATCH)
		CdbDispatchUtilityStatement((Node *) stmt,
									DF_CANCEL_ON_ERROR|
									DF_WITH_SNAPSHOT|
									DF_NEED_TWO_PHASE,
									NIL,
									NULL);
374 375 376 377 378 379 380 381 382 383
}

/*
 * AlterSequenceInternal
 *
 * Same as AlterSequence except that the sequence is specified by OID
 * and we assume the caller already checked permissions.
 */
void
AlterSequenceInternal(Oid relid, List *options)
B
Bruce Momjian 已提交
384 385 386 387 388 389
{
	SeqTable	elm;
	Relation	seqrel;
	Buffer		buf;
	Form_pg_sequence seq;
	FormData_pg_sequence new;
390
	List	   *owned_by;
391
	HeapTupleData seqtuple;
392
	int64		save_increment;
393
	bool		bSeqIsTemp = false;
394 395 396
	int			numopts	   = 0;
	char	   *alter_subtype = "";		/* metadata tracking: kind of
										   redundant to say "role" */
B
Bruce Momjian 已提交
397 398

	/* open and AccessShareLock sequence */
399
	init_sequence(relid, &elm, &seqrel);
B
Bruce Momjian 已提交
400 401

	/* lock page' buffer and read tuple into new sequence structure */
402 403 404

	/* hack to keep ALTER SEQUENCE OWNED BY from changing currval state */
	save_increment = elm->increment;
405
	
406
	seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
407
	elm->increment = seq->increment_by;
B
Bruce Momjian 已提交
408

409 410
	/* Copy old values of options into workspace */
	memcpy(&new, seq, sizeof(FormData_pg_sequence));
B
Bruce Momjian 已提交
411

412
	/* Check and set new values */
413
	init_params(options, false, &new, &owned_by);
B
Bruce Momjian 已提交
414

415 416 417 418 419 420 421 422
	if (owned_by)
	{
		/* Restore previous state of elm (assume nothing else changes) */
		elm->increment = save_increment;
	}
	else
	{
		/* Clear local cache so that we don't think we have cached numbers */
423 424
		/* Note that we do not change the currval() state */
		elm->cached = elm->last;
425 426
	}

427
	/* Now okay to update the on-disk tuple */
B
Bruce Momjian 已提交
428 429
	START_CRIT_SECTION();

430 431
	memcpy(seq, &new, sizeof(FormData_pg_sequence));

432 433
	MarkBufferDirty(buf);

B
Bruce Momjian 已提交
434
	/* XLOG stuff */
435 436 437 438

	bSeqIsTemp = seqrel->rd_istemp;

	if (!bSeqIsTemp)
B
Bruce Momjian 已提交
439 440 441 442
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
		XLogRecData rdata[2];
443
		Page		page = BufferGetPage(buf);
B
Bruce Momjian 已提交
444 445

		xlrec.node = seqrel->rd_node;
446

B
Bruce Momjian 已提交
447 448
		rdata[0].data = (char *) &xlrec;
		rdata[0].len = sizeof(xl_seq_rec);
449
		rdata[0].buffer = InvalidBuffer;
B
Bruce Momjian 已提交
450 451
		rdata[0].next = &(rdata[1]);

452 453
		rdata[1].data = (char *) seqtuple.t_data;
		rdata[1].len = seqtuple.t_len;
454
		rdata[1].buffer = InvalidBuffer;
B
Bruce Momjian 已提交
455 456
		rdata[1].next = NULL;

457
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);
B
Bruce Momjian 已提交
458 459 460 461 462 463

		PageSetLSN(page, recptr);
	}

	END_CRIT_SECTION();

464
	UnlockReleaseBuffer(buf);
B
Bruce Momjian 已提交
465

466 467 468 469
	/* process OWNED BY if given */
	if (owned_by)
		process_owned_by(seqrel, owned_by);

B
Bruce Momjian 已提交
470
	relation_close(seqrel, NoLock);
471

472
	numopts = list_length(options);
473 474 475 476 477 478 479 480 481 482 483 484 485 486 487

	if (numopts > 1)
	{
		char allopts[NAMEDATALEN];

		sprintf(allopts, "%d OPTIONS", numopts);

		alter_subtype = pstrdup(allopts);
	}
	else if (0 == numopts)
	{
		alter_subtype = "0 OPTIONS";
	}
	else if ((Gp_role == GP_ROLE_DISPATCH) && (!bSeqIsTemp))
	{
488
		ListCell		*option = list_head(options);
489 490 491 492 493 494 495 496 497 498 499 500
		DefElem			*defel	= (DefElem *) lfirst(option);
		char			*tempo	= NULL;

		alter_subtype = defel->defname;
		if (0 == strcmp(alter_subtype, "owned_by"))
			alter_subtype = "OWNED BY";

		tempo = str_toupper(alter_subtype, strlen(alter_subtype));

		alter_subtype = tempo;
	}

501
	if (Gp_role == GP_ROLE_DISPATCH && !bSeqIsTemp)
502
	{
503 504 505 506 507
		/* MPP-6929: metadata tracking */
		MetaTrackUpdObject(RelationRelationId,
						   relid,
						   GetUserId(),
						   "ALTER", alter_subtype);
508
	}
B
Bruce Momjian 已提交
509 510
}

511

512 513 514 515 516
/*
 * Note: nextval with a text argument is no longer exported as a pg_proc
 * entry, but we keep it around to ease porting of C code that may have
 * called the function directly.
 */
517 518
Datum
nextval(PG_FUNCTION_ARGS)
519
{
520
	text	   *seqin = PG_GETARG_TEXT_P(0);
521
	RangeVar   *sequence;
522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540
	Oid			relid;

	sequence = makeRangeVarFromNameList(textToQualifiedNameList(seqin));
	relid = RangeVarGetRelid(sequence, false);

	PG_RETURN_INT64(nextval_internal(relid));
}

Datum
nextval_oid(PG_FUNCTION_ARGS)
{
	Oid			relid = PG_GETARG_OID(0);

	PG_RETURN_INT64(nextval_internal(relid));
}

static int64
nextval_internal(Oid relid)
{
541
	SeqTable	elm;
542
	Relation	seqrel;
543 544 545 546 547 548 549
	bool is_overflow = false;

	/* open and AccessShareLock sequence */
	init_sequence(relid, &elm, &seqrel);

	if (elm->last != elm->cached)		/* some numbers were cached */
	{
550 551
		Assert(elm->last_valid);
		Assert(elm->increment != 0);
552 553
		elm->last += elm->increment;
		relation_close(seqrel, NoLock);
554
		last_used_seq = elm;
555 556 557 558 559 560 561 562 563
		return elm->last;
	}

	if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 errmsg("permission denied for sequence %s",
						RelationGetRelationName(seqrel))));

564 565 566 567 568 569 570 571 572
	/* Update the sequence object. */
	if (Gp_role == GP_ROLE_EXECUTE)
		cdb_sequence_nextval_proxy(seqrel,
								   &elm->last,
								   &elm->cached,
								   &elm->increment,
								   &is_overflow);
	else
		cdb_sequence_nextval(elm,
573
							 seqrel,
574 575 576 577
							 &elm->last,
							 &elm->cached,
							 &elm->increment,
							 &is_overflow);
578 579
	last_used_seq = elm;

580 581 582
	if (is_overflow)
	{
		relation_close(seqrel, NoLock);
583

584 585 586
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("nextval: reached %s value of sequence \"%s\" (" INT64_FORMAT ")",
587 588
                        elm->increment>0 ? "maximum":"minimum",
                        RelationGetRelationName(seqrel), elm->last)));
589 590 591
	}
	else
		elm->last_valid = true;
592 593

	relation_close(seqrel, NoLock);
594 595 596 597
	return elm->last;
}


598 599 600
static void
cdb_sequence_nextval(SeqTable elm,
					 Relation   seqrel,
601 602 603 604 605
                     int64     *plast,
                     int64     *pcached,
                     int64     *pincrement,
                     bool      *poverflow)
{
606
	Buffer		buf;
607
	Page		page;
608
	HeapTupleData seqtuple;
609
	Form_pg_sequence seq;
610
	int64		incby,
611 612
				maxv,
				minv,
V
Vadim B. Mikheev 已提交
613 614 615 616
				cache,
				log,
				fetch,
				last;
617
	int64		result,
618 619
				next,
				rescnt = 0;
620
	bool 		have_overflow = false;
V
Vadim B. Mikheev 已提交
621
	bool		logit = false;
622

623
	/* lock page' buffer and read tuple */
624
	seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
625
	page = BufferGetPage(buf);
626

V
Vadim B. Mikheev 已提交
627
	last = next = result = seq->last_value;
628 629 630
	incby = seq->increment_by;
	maxv = seq->max_value;
	minv = seq->min_value;
V
Vadim B. Mikheev 已提交
631 632
	fetch = cache = seq->cache_value;
	log = seq->log_cnt;
633

634
	if (!seq->is_called)
V
Vadim B. Mikheev 已提交
635
	{
636
		rescnt++;				/* return last_value if not is_called */
V
Vadim B. Mikheev 已提交
637 638
		fetch--;
	}
639

640
	/*
B
Bruce Momjian 已提交
641 642 643
	 * Decide whether we should emit a WAL log record.	If so, force up the
	 * fetch count to grab SEQ_LOG_VALS more values than we actually need to
	 * cache.  (These will then be usable without logging.)
644
	 *
645 646 647 648
	 * If this is the first nextval after a checkpoint, we must force a new
	 * WAL record to be written anyway, else replay starting from the
	 * checkpoint would fail to advance the sequence past the logged values.
	 * In this case we may as well fetch extra values.
649
	 */
650
	if (log < fetch || !seq->is_called)
V
Vadim B. Mikheev 已提交
651
	{
652 653
		/* forced log to satisfy local demand for values */
		fetch = log = fetch + SEQ_LOG_VALS;
V
Vadim B. Mikheev 已提交
654 655
		logit = true;
	}
656 657 658 659 660 661 662 663 664 665 666
	else
	{
		XLogRecPtr	redoptr = GetRedoRecPtr();

		if (XLByteLE(PageGetLSN(page), redoptr))
		{
			/* last update of seq was before checkpoint */
			fetch = log = fetch + SEQ_LOG_VALS;
			logit = true;
		}
	}
V
Vadim B. Mikheev 已提交
667

B
Bruce Momjian 已提交
668
	while (fetch)				/* try to fetch cache [+ log ] numbers */
669
	{
670
		/*
B
Bruce Momjian 已提交
671 672
		 * Check MAXVALUE for ascending sequences and MINVALUE for descending
		 * sequences
673
		 */
674
		if (incby > 0)
675
		{
676
			/* ascending sequence */
677 678 679 680
			if ((maxv >= 0 && next > maxv - incby) ||
				(maxv < 0 && next + incby > maxv))
			{
				if (rescnt > 0)
V
Vadim B. Mikheev 已提交
681
					break;		/* stop fetching */
682
				if (!seq->is_cycled)
683
				{
684 685 686 687 688
					have_overflow = true;
				}
				else
				{
					next = minv;
689
				}
690 691 692 693 694 695
			}
			else
				next += incby;
		}
		else
		{
696
			/* descending sequence */
697 698 699 700
			if ((minv < 0 && next < minv - incby) ||
				(minv >= 0 && next + incby < minv))
			{
				if (rescnt > 0)
V
Vadim B. Mikheev 已提交
701
					break;		/* stop fetching */
702
				if (!seq->is_cycled)
703
				{
704 705 706 707 708
					have_overflow = true;
				}
				else
				{
					next = maxv;
709
				}
710 711 712 713
			}
			else
				next += incby;
		}
V
Vadim B. Mikheev 已提交
714 715 716 717 718 719
		fetch--;
		if (rescnt < cache)
		{
			log--;
			rescnt++;
			last = next;
B
Bruce Momjian 已提交
720 721
			if (rescnt == 1)	/* if it's first result - */
				result = next;	/* it's what to return */
V
Vadim B. Mikheev 已提交
722
		}
723 724
	}

725 726 727
	log -= fetch;				/* adjust for any unfetched numbers */
	Assert(log >= 0);

728 729 730 731 732
    /* set results for caller */
	*poverflow = have_overflow; /* has the sequence overflown */
    *plast = result;            /* last returned number */
    *pcached = last;            /* last fetched number */
	*pincrement = incby;
V
Vadim B. Mikheev 已提交
733

734
	/* ready to change the on-disk (or really, in-buffer) tuple */
735
	START_CRIT_SECTION();
736

737 738 739 740 741 742 743 744 745
	/*
	 * We must mark the buffer dirty before doing XLogInsert(); see notes in
	 * SyncOneBuffer().  However, we don't apply the desired changes just yet.
	 * This looks like a violation of the buffer update protocol, but it is
	 * in fact safe because we hold exclusive lock on the buffer.  Any other
	 * process, including a checkpoint, that tries to examine the buffer
	 * contents will block until we release the lock, and then will see the
	 * final state that we install below.
	 */
746 747
	MarkBufferDirty(buf);

748 749
	/* XLOG stuff */
	if (logit && !seqrel->rd_istemp)
V
Vadim B. Mikheev 已提交
750 751 752
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
B
Bruce Momjian 已提交
753
		XLogRecData rdata[2];
V
Vadim B. Mikheev 已提交
754

755 756 757 758 759 760
		/*
		 * We don't log the current state of the tuple, but rather the state
		 * as it would appear after "log" more fetches.  This lets us skip
		 * that many future WAL records, at the cost that we lose those
		 * sequence values if we crash.
		 */
761

762
		/* set values that will be saved in xlog */
763
		seq->last_value = next;
764
		seq->is_called = true;
765
		seq->log_cnt = 0;
766

767
		xlrec.node = seqrel->rd_node;
B
Bruce Momjian 已提交
768
		rdata[0].data = (char *) &xlrec;
769
		rdata[0].len = sizeof(xl_seq_rec);
770
		rdata[0].buffer = InvalidBuffer;
771 772
		rdata[0].next = &(rdata[1]);

773 774
		rdata[1].data = (char *) seqtuple.t_data;
		rdata[1].len = seqtuple.t_len;
775
		rdata[1].buffer = InvalidBuffer;
776 777
		rdata[1].next = NULL;

778
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);
V
Vadim B. Mikheev 已提交
779

780
		PageSetLSN(page, recptr);
781 782 783 784 785 786 787 788 789 790 791 792 793

		/* need to update where we've inserted to into shmem so that the QD can flush it
		 * when necessary
		 */
		LWLockAcquire(SeqServerControlLock, LW_EXCLUSIVE);

		if (XLByteLT(seqServerCtl->lastXlogEntry, recptr))
		{
			seqServerCtl->lastXlogEntry.xlogid = recptr.xlogid;
			seqServerCtl->lastXlogEntry.xrecoff = recptr.xrecoff;
		}

		LWLockRelease(SeqServerControlLock);
V
Vadim B. Mikheev 已提交
794
	}
795

796
	/* Now update sequence tuple to the intended final state */
V
Vadim B. Mikheev 已提交
797
	seq->last_value = last;		/* last fetched number */
798
	seq->is_called = true;
V
Vadim B. Mikheev 已提交
799
	seq->log_cnt = log;			/* how much is logged */
800

801
	END_CRIT_SECTION();
802

803
	UnlockReleaseBuffer(buf);
804
}                               /* cdb_sequence_nextval */
805

806

807
Datum
808
currval_oid(PG_FUNCTION_ARGS)
809
{
810 811
	Oid			relid = PG_GETARG_OID(0);
	int64		result;
812
	SeqTable	elm;
813
	Relation	seqrel;
814

815 816 817 818 819 820 821 822
	/* For now, strictly forbidden on MPP. */
	if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE)
	{
		ereport(ERROR,
				(errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED),
				 errmsg("currval() not supported")));
	}

V
Vadim B. Mikheev 已提交
823
	/* open and AccessShareLock sequence */
824
	init_sequence(relid, &elm, &seqrel);
825

826 827
	if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_SELECT) != ACLCHECK_OK &&
		pg_class_aclcheck(elm->relid, GetUserId(), ACL_USAGE) != ACLCHECK_OK)
828 829
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
830
				 errmsg("permission denied for sequence %s",
831
						RelationGetRelationName(seqrel))));
832

833
	if (!elm->last_valid)
834 835
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
836
				 errmsg("currval of sequence \"%s\" is not yet defined in this session",
837
						RelationGetRelationName(seqrel))));
838 839 840

	result = elm->last;

841 842
	relation_close(seqrel, NoLock);

843
	PG_RETURN_INT64(result);
844 845
}

846 847 848 849 850 851
Datum
lastval(PG_FUNCTION_ARGS)
{
	Relation	seqrel;
	int64		result;

852 853 854 855 856 857 858 859
	/* For now, strictly forbidden on MPP. */
	if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE)
	{
		ereport(ERROR,
				(errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED),
				 errmsg("lastval() not supported")));
	}

860 861 862 863 864 865
	if (last_used_seq == NULL)
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("lastval is not yet defined in this session")));

	/* Someone may have dropped the sequence since the last nextval() */
866 867 868
	if (!SearchSysCacheExists(RELOID,
							  ObjectIdGetDatum(last_used_seq->relid),
							  0, 0, 0))
869 870 871 872
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("lastval is not yet defined in this session")));

873
	seqrel = open_share_lock(last_used_seq);
874 875

	/* nextval() must have already been called for this sequence */
876
	Assert(last_used_seq->last_valid);
877

878 879
	if (pg_class_aclcheck(last_used_seq->relid, GetUserId(), ACL_SELECT) != ACLCHECK_OK &&
		pg_class_aclcheck(last_used_seq->relid, GetUserId(), ACL_USAGE) != ACLCHECK_OK)
880 881 882 883 884 885 886
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 errmsg("permission denied for sequence %s",
						RelationGetRelationName(seqrel))));

	result = last_used_seq->last;
	relation_close(seqrel, NoLock);
887

888 889 890
	PG_RETURN_INT64(result);
}

B
Bruce Momjian 已提交
891
/*
892 893 894 895
 * Main internal procedure that handles 2 & 3 arg forms of SETVAL.
 *
 * Note that the 3 arg version (which sets the is_called flag) is
 * only for use in pg_dump, and setting the is_called flag may not
B
Bruce Momjian 已提交
896
 * work if multiple users are attached to the database and referencing
897 898
 * the sequence (unlikely if pg_dump is restoring it).
 *
B
Bruce Momjian 已提交
899
 * It is necessary to have the 3 arg version so that pg_dump can
900 901 902 903
 * restore the state of a sequence exactly during data-only restores -
 * it is the only way to clear the is_called flag in an existing
 * sequence.
 */
B
Bruce Momjian 已提交
904
static void
905
do_setval(Oid relid, int64 next, bool iscalled)
M
 
Marc G. Fournier 已提交
906 907
{
	SeqTable	elm;
908
	Relation	seqrel;
909
	Buffer		buf;
910
	HeapTupleData seqtuple;
911
	Form_pg_sequence seq;
M
 
Marc G. Fournier 已提交
912

913 914 915 916 917 918 919
	if (Gp_role == GP_ROLE_EXECUTE)
	{
		ereport(ERROR,
				(errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED),
				 errmsg("setval() not supported in this context")));
	}

920
	/* open and AccessShareLock sequence */
921
	init_sequence(relid, &elm, &seqrel);
922 923

	if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
924 925
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
926
				 errmsg("permission denied for sequence %s",
927
						RelationGetRelationName(seqrel))));
M
 
Marc G. Fournier 已提交
928

929
	/* lock page' buffer and read tuple */
930
	seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
931
	elm->increment = seq->increment_by;
M
 
Marc G. Fournier 已提交
932

933
	if ((next < seq->min_value) || (next > seq->max_value))
934
	{
B
Bruce Momjian 已提交
935 936 937 938
		char		bufv[100],
					bufm[100],
					bufx[100];

939 940 941
		snprintf(bufv, sizeof(bufv), INT64_FORMAT, next);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, seq->min_value);
		snprintf(bufx, sizeof(bufx), INT64_FORMAT, seq->max_value);
942 943
		ereport(ERROR,
				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
944
				 errmsg("setval: value %s is out of bounds for sequence \"%s\" (%s..%s)",
945 946
						bufv, RelationGetRelationName(seqrel),
						bufm, bufx)));
947
	}
M
 
Marc G. Fournier 已提交
948

949 950 951 952 953 954 955 956 957
	/* Set the currval() state only if iscalled = true */
	if (iscalled)
	{
		elm->last = next;		/* last returned number */
		elm->last_valid = true;
	}

	/* In any case, forget any future cached numbers */
	elm->cached = elm->last;
M
 
Marc G. Fournier 已提交
958

959
	/* ready to change the on-disk (or really, in-buffer) tuple */
960
	START_CRIT_SECTION();
961

962 963 964 965
	seq->last_value = next;		/* last fetched number */
	seq->is_called = iscalled;
	seq->log_cnt = 0;

966 967
	MarkBufferDirty(buf);

968 969
	/* XLOG stuff */
	if (!seqrel->rd_istemp)
V
Vadim B. Mikheev 已提交
970 971 972
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
B
Bruce Momjian 已提交
973
		XLogRecData rdata[2];
974
		Page		page = BufferGetPage(buf);
V
Vadim B. Mikheev 已提交
975

976
		xlrec.node = seqrel->rd_node;
977

B
Bruce Momjian 已提交
978
		rdata[0].data = (char *) &xlrec;
979
		rdata[0].len = sizeof(xl_seq_rec);
980
		rdata[0].buffer = InvalidBuffer;
981 982
		rdata[0].next = &(rdata[1]);

983 984
		rdata[1].data = (char *) seqtuple.t_data;
		rdata[1].len = seqtuple.t_len;
985
		rdata[1].buffer = InvalidBuffer;
986 987
		rdata[1].next = NULL;

988
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);
989 990

		PageSetLSN(page, recptr);
V
Vadim B. Mikheev 已提交
991
	}
992

993
	END_CRIT_SECTION();
M
 
Marc G. Fournier 已提交
994

995
	UnlockReleaseBuffer(buf);
996
	relation_close(seqrel, NoLock);
997 998
}

999 1000 1001 1002
/*
 * Implement the 2 arg setval procedure.
 * See do_setval for discussion.
 */
1003
Datum
1004
setval_oid(PG_FUNCTION_ARGS)
1005
{
1006
	Oid			relid = PG_GETARG_OID(0);
1007
	int64		next = PG_GETARG_INT64(1);
1008

1009
	do_setval(relid, next, true);
1010

1011
	PG_RETURN_INT64(next);
1012 1013
}

1014 1015 1016 1017
/*
 * Implement the 3 arg setval procedure.
 * See do_setval for discussion.
 */
1018
Datum
1019
setval3_oid(PG_FUNCTION_ARGS)
1020
{
1021
	Oid			relid = PG_GETARG_OID(0);
1022
	int64		next = PG_GETARG_INT64(1);
1023 1024
	bool		iscalled = PG_GETARG_BOOL(2);

1025
	do_setval(relid, next, iscalled);
1026

1027
	PG_RETURN_INT64(next);
M
 
Marc G. Fournier 已提交
1028 1029
}

1030

1031
/*
1032 1033
 * Open the sequence and acquire AccessShareLock if needed
 *
1034
 * If we haven't touched the sequence already in this transaction,
B
Bruce Momjian 已提交
1035
 * we need to acquire AccessShareLock.	We arrange for the lock to
1036 1037 1038
 * be owned by the top transaction, so that we don't need to do it
 * more than once per xact.
 */
1039 1040
static Relation
open_share_lock(SeqTable seq)
1041
{
1042
	LocalTransactionId thislxid = MyProc->lxid;
1043

1044
	/* Get the lock if not already held in this xact */
1045
	if (seq->lxid != thislxid)
1046 1047 1048 1049 1050 1051 1052
	{
		ResourceOwner currentOwner;

		currentOwner = CurrentResourceOwner;
		PG_TRY();
		{
			CurrentResourceOwner = TopTransactionResourceOwner;
1053
			LockRelationOid(seq->relid, AccessShareLock);
1054 1055 1056 1057 1058 1059 1060 1061 1062 1063
		}
		PG_CATCH();
		{
			/* Ensure CurrentResourceOwner is restored on error */
			CurrentResourceOwner = currentOwner;
			PG_RE_THROW();
		}
		PG_END_TRY();
		CurrentResourceOwner = currentOwner;

1064
		/* Flag that we have a lock in the current xact */
1065
		seq->lxid = thislxid;
1066
	}
1067 1068 1069

	/* We now know we have AccessShareLock, and can safely open the rel */
	return relation_open(seq->relid, NoLock);
1070 1071
}

1072
/*
1073
 * Given a relation OID, open and lock the sequence.  p_elm and p_rel are
1074
 * output parameters.
1075 1076
 *
 * GPDB: If p_rel is NULL, the sequence relation is not opened or locked.
1077 1078
 */
static void
1079
init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel)
1080
{
B
Bruce Momjian 已提交
1081
	SeqTable	elm;
1082
	Relation	seqrel;
1083

1084 1085 1086 1087 1088 1089 1090
	/* Look to see if we already have a seqtable entry for relation */
	for (elm = seqtab; elm != NULL; elm = elm->next)
	{
		if (elm->relid == relid)
			break;
	}

1091
	/*
1092
	 * Allocate new seqtable entry if we didn't find one.
1093
	 *
B
Bruce Momjian 已提交
1094 1095 1096
	 * NOTE: seqtable entries remain in the list for the life of a backend. If
	 * the sequence itself is deleted then the entry becomes wasted memory,
	 * but it's small enough that this should not matter.
B
Bruce Momjian 已提交
1097
	 */
1098
	if (elm == NULL)
1099
	{
1100
		/*
B
Bruce Momjian 已提交
1101 1102
		 * Time to make a new seqtable entry.  These entries live as long as
		 * the backend does, so we use plain malloc for them.
1103 1104
		 */
		elm = (SeqTable) malloc(sizeof(SeqTableData));
T
Tom Lane 已提交
1105
		if (elm == NULL)
1106 1107 1108
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
1109
		elm->relid = relid;
1110
		elm->lxid = InvalidLocalTransactionId;
1111
		elm->last_valid = false;
1112 1113 1114
		elm->last = elm->cached = elm->increment = 0;
		elm->next = seqtab;
		seqtab = elm;
1115 1116
	}

1117 1118 1119
	/*
	 * Open the sequence relation.
	 */
1120 1121 1122
	if (p_rel)
	{
		seqrel = open_share_lock(elm);
1123

1124 1125 1126 1127 1128
		if (seqrel->rd_rel->relkind != RELKIND_SEQUENCE)
			ereport(ERROR,
					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
					 errmsg("\"%s\" is not a sequence",
							RelationGetRelationName(seqrel))));
1129

1130 1131
		*p_rel = seqrel;
	}
1132
	*p_elm = elm;
1133 1134 1135
}


1136 1137 1138 1139 1140 1141 1142 1143 1144
/*
 * Given an opened sequence relation, lock the page buffer and find the tuple
 *
 * *buf receives the reference to the pinned-and-ex-locked buffer
 * *seqtuple receives the reference to the sequence tuple proper
 *		(this arg should point to a local variable of type HeapTupleData)
 *
 * Function's return value points to the data payload of the tuple
 */
1145
static Form_pg_sequence
1146
read_seq_tuple(SeqTable elm, Relation rel, Buffer *buf, HeapTuple seqtuple)
1147
{
1148
	Page		page;
1149 1150 1151
	ItemId		lp;
	sequence_magic *sm;
	Form_pg_sequence seq;
1152

1153 1154 1155
	*buf = ReadBuffer(rel, 0);
	LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);

1156
	page = BufferGetPage(*buf);
1157 1158 1159
	sm = (sequence_magic *) PageGetSpecialPointer(page);

	if (sm->magic != SEQ_MAGIC)
1160 1161
		elog(ERROR, "bad magic number in sequence \"%s\": %08X",
			 RelationGetRelationName(rel), sm->magic);
1162 1163

	lp = PageGetItemId(page, FirstOffsetNumber);
1164
	Assert(ItemIdIsNormal(lp));
1165 1166 1167 1168

	/* Note we currently only bother to set these two fields of *seqtuple */
	seqtuple->t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
	seqtuple->t_len = ItemIdGetLength(lp);
1169

1170 1171 1172 1173 1174 1175 1176 1177
	/*
	 * Previous releases of Postgres neglected to prevent SELECT FOR UPDATE
	 * on a sequence, which would leave a non-frozen XID in the sequence
	 * tuple's xmax, which eventually leads to clog access failures or worse.
	 * If we see this has happened, clean up after it.  We treat this like a
	 * hint bit update, ie, don't bother to WAL-log it, since we can certainly
	 * do this again if the update gets lost.
	 */
1178
	if (HeapTupleHeaderGetXmax(seqtuple->t_data) != InvalidTransactionId)
1179
	{
1180 1181 1182
		HeapTupleHeaderSetXmax(seqtuple->t_data, InvalidTransactionId);
		seqtuple->t_data->t_infomask &= ~HEAP_XMAX_COMMITTED;
		seqtuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
1183
		MarkBufferDirtyHint(*buf, rel);
1184
	}
1185

1186
	seq = (Form_pg_sequence) GETSTRUCT(seqtuple);
1187

1188
	/* this is a handy place to update our copy of the increment */
1189 1190 1191
	elm->increment = seq->increment_by;

	return seq;
1192 1193
}

1194 1195
/*
 * init_params: process the options list of CREATE or ALTER SEQUENCE,
1196 1197
 * and store the values into appropriate fields of *new.  Also set
 * *owned_by to any OWNED BY option, or to NIL if there is none.
1198 1199 1200 1201
 *
 * If isInit is true, fill any unspecified options with default values;
 * otherwise, do not change existing options that aren't explicitly overridden.
 */
1202
static void
1203 1204
init_params(List *options, bool isInit,
			Form_pg_sequence new, List **owned_by)
1205
{
1206 1207
	DefElem    *start_value = NULL;
	DefElem    *restart_value = NULL;
1208 1209 1210 1211
	DefElem    *increment_by = NULL;
	DefElem    *max_value = NULL;
	DefElem    *min_value = NULL;
	DefElem    *cache_value = NULL;
1212
	DefElem    *is_cycled = NULL;
1213
	ListCell   *option;
1214

1215 1216
	*owned_by = NIL;

B
Bruce Momjian 已提交
1217
	foreach(option, options)
1218
	{
1219
		DefElem    *defel = (DefElem *) lfirst(option);
1220

1221
		if (strcmp(defel->defname, "increment") == 0)
1222 1223
		{
			if (increment_by)
1224 1225 1226
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1227
			increment_by = defel;
1228
		}
1229 1230
		else if (strcmp(defel->defname, "start") == 0)
		{
1231
			if (start_value)
1232 1233 1234
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1235
			start_value = defel;
1236 1237
		}
		else if (strcmp(defel->defname, "restart") == 0)
1238
		{
1239
			if (restart_value)
1240 1241 1242
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1243
			restart_value = defel;
1244
		}
1245
		else if (strcmp(defel->defname, "maxvalue") == 0)
1246 1247
		{
			if (max_value)
1248 1249 1250
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1251
			max_value = defel;
1252
		}
1253
		else if (strcmp(defel->defname, "minvalue") == 0)
1254 1255
		{
			if (min_value)
1256 1257 1258
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1259
			min_value = defel;
1260
		}
1261
		else if (strcmp(defel->defname, "cache") == 0)
1262 1263
		{
			if (cache_value)
1264 1265 1266
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1267
			cache_value = defel;
1268
		}
1269
		else if (strcmp(defel->defname, "cycle") == 0)
1270
		{
1271
			if (is_cycled)
1272 1273 1274
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1275
			is_cycled = defel;
1276
		}
1277 1278 1279 1280 1281 1282 1283 1284
		else if (strcmp(defel->defname, "owned_by") == 0)
		{
			if (*owned_by)
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
			*owned_by = defGetQualifiedName(defel);
		}
1285
		else
1286
			elog(ERROR, "option \"%s\" not recognized",
1287 1288 1289
				 defel->defname);
	}

1290 1291 1292 1293 1294 1295 1296
	/*
	 * We must reset log_cnt when isInit or when changing any parameters
	 * that would affect future nextval allocations.
	 */
	if (isInit)
		new->log_cnt = 0;

B
Bruce Momjian 已提交
1297
	/* INCREMENT BY */
1298
	if (increment_by != NULL)
B
Bruce Momjian 已提交
1299 1300
	{
		new->increment_by = defGetInt64(increment_by);
1301 1302 1303
		if (new->increment_by == 0)
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1304
					 errmsg("INCREMENT must not be zero")));
1305
		new->log_cnt = 0;
B
Bruce Momjian 已提交
1306
	}
1307 1308 1309 1310
	else if (isInit)
		new->increment_by = 1;

	/* CYCLE */
1311
	if (is_cycled != NULL)
1312 1313 1314
	{
		new->is_cycled = intVal(is_cycled->arg);
		Assert(new->is_cycled == false || new->is_cycled == true);
1315
		new->log_cnt = 0;
1316 1317 1318
	}
	else if (isInit)
		new->is_cycled = false;
1319

1320
	/* MAXVALUE (null arg means NO MAXVALUE) */
1321
	if (max_value != NULL && max_value->arg)
1322
	{
1323
		new->max_value = defGetInt64(max_value);
1324 1325
		new->log_cnt = 0;
	}
1326
	else if (isInit || max_value != NULL)
1327
	{
1328
		if (new->increment_by > 0)
B
Bruce Momjian 已提交
1329
			new->max_value = SEQ_MAXVALUE;		/* ascending seq */
1330
		else
B
Bruce Momjian 已提交
1331
			new->max_value = -1;	/* descending seq */
1332
		new->log_cnt = 0;
1333
	}
1334

1335
	/* MINVALUE (null arg means NO MINVALUE) */
1336
	if (min_value != NULL && min_value->arg)
1337
	{
1338
		new->min_value = defGetInt64(min_value);
1339 1340
		new->log_cnt = 0;
	}
1341
	else if (isInit || min_value != NULL)
1342
	{
1343
		if (new->increment_by > 0)
B
Bruce Momjian 已提交
1344
			new->min_value = 1; /* ascending seq */
1345
		else
B
Bruce Momjian 已提交
1346
			new->min_value = SEQ_MINVALUE;		/* descending seq */
1347
		new->log_cnt = 0;
1348
	}
1349

1350
	/* crosscheck min/max */
1351
	if (new->min_value >= new->max_value)
1352
	{
B
Bruce Momjian 已提交
1353 1354 1355
		char		bufm[100],
					bufx[100];

1356 1357
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->min_value);
		snprintf(bufx, sizeof(bufx), INT64_FORMAT, new->max_value);
1358 1359 1360 1361
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("MINVALUE (%s) must be less than MAXVALUE (%s)",
						bufm, bufx)));
1362
	}
1363

B
Bruce Momjian 已提交
1364
	/* START WITH */
1365 1366
	if (start_value != NULL)
		new->start_value = defGetInt64(start_value);
1367
	else if (isInit)
1368
	{
1369
		if (new->increment_by > 0)
1370
			new->start_value = new->min_value;	/* ascending seq */
1371
		else
1372
			new->start_value = new->max_value;	/* descending seq */
1373
	}
1374

1375 1376
	/* crosscheck START */
	if (new->start_value < new->min_value)
1377
	{
1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400
		char		bufs[100],
					bufm[100];

		snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->start_value);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->min_value);
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("START value (%s) cannot be less than MINVALUE (%s)",
						bufs, bufm)));
	}
	if (new->start_value > new->max_value)
	{
		char		bufs[100],
					bufm[100];

		snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->start_value);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->max_value);
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
			  errmsg("START value (%s) cannot be greater than MAXVALUE (%s)",
					 bufs, bufm)));
	}

1401 1402 1403 1404 1405 1406 1407
	/* RESTART [WITH] */
	if (restart_value != NULL)
	{
		if (restart_value->arg != NULL)
			new->last_value = defGetInt64(restart_value);
		else
			new->last_value = new->start_value;
1408 1409 1410
		new->is_called = false;
		new->log_cnt = 1;
	}
1411
	else if (isInit)
1412
	{
1413
		new->last_value = new->start_value;
1414 1415
		new->is_called = false;
		new->log_cnt = 1;
1416
	}
1417

1418
	/* crosscheck RESTART (or current value, if changing MIN/MAX) */
1419
	if (new->last_value < new->min_value)
1420
	{
B
Bruce Momjian 已提交
1421 1422 1423
		char		bufs[100],
					bufm[100];

1424 1425
		snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->last_value);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->min_value);
1426 1427
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1428 1429
			   errmsg("RESTART value (%s) cannot be less than MINVALUE (%s)",
					  bufs, bufm)));
1430
	}
1431
	if (new->last_value > new->max_value)
1432
	{
B
Bruce Momjian 已提交
1433 1434 1435
		char		bufs[100],
					bufm[100];

1436 1437
		snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->last_value);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->max_value);
1438 1439
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1440 1441
			errmsg("RESTART value (%s) cannot be greater than MAXVALUE (%s)",
				   bufs, bufm)));
1442
	}
1443

B
Bruce Momjian 已提交
1444
	/* CACHE */
1445
	if (cache_value != NULL)
1446
	{
1447 1448 1449 1450
		new->cache_value = defGetInt64(cache_value);
		if (new->cache_value <= 0)
		{
			char		buf[100];
B
Bruce Momjian 已提交
1451

1452 1453 1454 1455 1456 1457
			snprintf(buf, sizeof(buf), INT64_FORMAT, new->cache_value);
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					 errmsg("CACHE (%s) must be greater than zero",
							buf)));
		}
1458
		new->log_cnt = 0;
1459
	}
1460 1461
	else if (isInit)
		new->cache_value = 1;
1462 1463
}

1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487
/*
 * Process an OWNED BY option for CREATE/ALTER SEQUENCE
 *
 * Ownership permissions on the sequence are already checked,
 * but if we are establishing a new owned-by dependency, we must
 * enforce that the referenced table has the same owner and namespace
 * as the sequence.
 */
static void
process_owned_by(Relation seqrel, List *owned_by)
{
	int			nnames;
	Relation	tablerel;
	AttrNumber	attnum;

	nnames = list_length(owned_by);
	Assert(nnames > 0);
	if (nnames == 1)
	{
		/* Must be OWNED BY NONE */
		if (strcmp(strVal(linitial(owned_by)), "none") != 0)
			ereport(ERROR,
					(errcode(ERRCODE_SYNTAX_ERROR),
					 errmsg("invalid OWNED BY option"),
B
Bruce Momjian 已提交
1488
				errhint("Specify OWNED BY table.column or OWNED BY NONE.")));
1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516
		tablerel = NULL;
		attnum = 0;
	}
	else
	{
		List	   *relname;
		char	   *attrname;
		RangeVar   *rel;

		/* Separate relname and attr name */
		relname = list_truncate(list_copy(owned_by), nnames - 1);
		attrname = strVal(lfirst(list_tail(owned_by)));

		/* Open and lock rel to ensure it won't go away meanwhile */
		rel = makeRangeVarFromNameList(relname);
		tablerel = relation_openrv(rel, AccessShareLock);

		/* Must be a regular table */
		if (tablerel->rd_rel->relkind != RELKIND_RELATION)
			ereport(ERROR,
					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
					 errmsg("referenced relation \"%s\" is not a table",
							RelationGetRelationName(tablerel))));

		/* We insist on same owner and schema */
		if (seqrel->rd_rel->relowner != tablerel->rd_rel->relowner)
			ereport(ERROR,
					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
B
Bruce Momjian 已提交
1517
					 errmsg("sequence must have same owner as table it is linked to")));
1518 1519 1520
		if (RelationGetNamespace(seqrel) != RelationGetNamespace(tablerel))
			ereport(ERROR,
					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
P
Peter Eisentraut 已提交
1521
					 errmsg("sequence must be in same schema as table it is linked to")));
1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532

		/* Now, fetch the attribute number from the system cache */
		attnum = get_attnum(RelationGetRelid(tablerel), attrname);
		if (attnum == InvalidAttrNumber)
			ereport(ERROR,
					(errcode(ERRCODE_UNDEFINED_COLUMN),
					 errmsg("column \"%s\" of relation \"%s\" does not exist",
							attrname, RelationGetRelationName(tablerel))));
	}

	/*
B
Bruce Momjian 已提交
1533 1534
	 * OK, we are ready to update pg_depend.  First remove any existing AUTO
	 * dependencies for the sequence, then optionally add a new one.
1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556
	 */
	markSequenceUnowned(RelationGetRelid(seqrel));

	if (tablerel)
	{
		ObjectAddress refobject,
					depobject;

		refobject.classId = RelationRelationId;
		refobject.objectId = RelationGetRelid(tablerel);
		refobject.objectSubId = attnum;
		depobject.classId = RelationRelationId;
		depobject.objectId = RelationGetRelid(seqrel);
		depobject.objectSubId = 0;
		recordDependencyOn(&depobject, &refobject, DEPENDENCY_AUTO);
	}

	/* Done, but hold lock until commit */
	if (tablerel)
		relation_close(tablerel, NoLock);
}

V
Vadim B. Mikheev 已提交
1557

B
Bruce Momjian 已提交
1558
void
1559
seq_redo(XLogRecPtr beginLoc, XLogRecPtr lsn, XLogRecord *record)
V
Vadim B. Mikheev 已提交
1560
{
B
Bruce Momjian 已提交
1561 1562 1563 1564 1565 1566
	uint8		info = record->xl_info & ~XLR_INFO_MASK;
	Buffer		buffer;
	Page		page;
	char	   *item;
	Size		itemsz;
	xl_seq_rec *xlrec = (xl_seq_rec *) XLogRecGetData(record);
1567
	sequence_magic *sm;
V
Vadim B. Mikheev 已提交
1568

1569 1570 1571
	/* Backup blocks are not used in seq records */
	Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));

1572
	if (info != XLOG_SEQ_LOG)
1573
		elog(PANIC, "seq_redo: unknown op code %u", info);
1574

1575
	buffer = XLogReadBuffer(xlrec->node, 0, true);
1576
	Assert(BufferIsValid(buffer));
V
Vadim B. Mikheev 已提交
1577 1578
	page = (Page) BufferGetPage(buffer);

1579 1580
	/* Always reinit the page and reinstall the magic number */
	/* See comments in DefineSequence */
1581 1582 1583
	PageInit((Page) page, BufferGetPageSize(buffer), sizeof(sequence_magic));
	sm = (sequence_magic *) PageGetSpecialPointer(page);
	sm->magic = SEQ_MAGIC;
V
Vadim B. Mikheev 已提交
1584

B
Bruce Momjian 已提交
1585
	item = (char *) xlrec + sizeof(xl_seq_rec);
1586
	itemsz = record->xl_len - sizeof(xl_seq_rec);
1587

B
Bruce Momjian 已提交
1588
	if (PageAddItem(page, (Item) item, itemsz,
1589
					FirstOffsetNumber, false, false) == InvalidOffsetNumber)
1590
		elog(PANIC, "seq_redo: failed to add item to page");
V
Vadim B. Mikheev 已提交
1591 1592

	PageSetLSN(page, lsn);
1593 1594
	MarkBufferDirty(buffer);
	UnlockReleaseBuffer(buffer);
V
Vadim B. Mikheev 已提交
1595 1596
}

B
Bruce Momjian 已提交
1597
void
1598
seq_desc(StringInfo buf, XLogRecPtr beginLoc, XLogRecord *record)
V
Vadim B. Mikheev 已提交
1599
{
1600 1601
	uint8		info = record->xl_info & ~XLR_INFO_MASK;
	char		*rec = XLogRecGetData(record);
B
Bruce Momjian 已提交
1602
	xl_seq_rec *xlrec = (xl_seq_rec *) rec;
V
Vadim B. Mikheev 已提交
1603 1604

	if (info == XLOG_SEQ_LOG)
1605
		appendStringInfo(buf, "log: ");
V
Vadim B. Mikheev 已提交
1606 1607
	else
	{
1608
		appendStringInfo(buf, "UNKNOWN");
V
Vadim B. Mikheev 已提交
1609 1610 1611
		return;
	}

1612
	appendStringInfo(buf, "rel %u/%u/%u",
B
Bruce Momjian 已提交
1613
			   xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode);
V
Vadim B. Mikheev 已提交
1614
}
1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698


/*
 * Initialize a pseudo relcache entry with just enough info to call bufmgr.
 */
static void
cdb_sequence_relation_init(Relation seqrel,
                           Oid      tablespaceid,
                           Oid      dbid,
                           Oid      relid,
                           bool     istemp)
{
    /* See RelationBuildDesc in relcache.c */
    memset(seqrel, 0, sizeof(*seqrel));

    seqrel->rd_smgr = NULL;
    seqrel->rd_refcnt = 99;

    seqrel->rd_id = relid;
    seqrel->rd_istemp = istemp;

    /* Must use shared buffer pool so seqserver & QDs can see the data. */
    seqrel->rd_isLocalBuf = false;

	seqrel->rd_rel = (Form_pg_class)palloc0(CLASS_TUPLE_SIZE);
    sprintf(seqrel->rd_rel->relname.data, "pg_class.oid=%d", relid);

    /* as in RelationInitPhysicalAddr... */
    seqrel->rd_node.spcNode = tablespaceid;
    seqrel->rd_node.dbNode = dbid;
    seqrel->rd_node.relNode = relid;
}                               /* cdb_sequence_relation_init */

/*
 * Clean up pseudo relcache entry.
 */
static void
cdb_sequence_relation_term(Relation seqrel)
{
    /* Close the file. */
    RelationCloseSmgr(seqrel);

    if (seqrel->rd_rel)
        pfree(seqrel->rd_rel);
}                               /* cdb_sequence_relation_term */



/*
 * CDB: forward a nextval request from qExec to the sequence server
 */
void
cdb_sequence_nextval_proxy(Relation	seqrel,
                           int64   *plast,
                           int64   *pcached,
                           int64   *pincrement,
                           bool    *poverflow)
{

	sendSequenceRequest(GetSeqServerFD(),
						seqrel,
    					gp_session_id,
    					plast,
    					pcached,
    					pincrement,
    					poverflow);

}                               /* cdb_sequence_server_nextval */


/*
 * CDB: nextval entry point called by sequence server
 */
void
cdb_sequence_nextval_server(Oid    tablespaceid,
                            Oid    dbid,
                            Oid    relid,
                            bool   istemp,
                            int64 *plast,
                            int64 *pcached,
                            int64 *pincrement,
                            bool  *poverflow)
{
    RelationData    fakerel;
1699
	SeqTable	elm;
1700 1701 1702 1703 1704 1705
	Relation	    seqrel = &fakerel;

    *plast = 0;
    *pcached = 0;
    *pincrement = 0;

1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720
	/*
	 * In Postgres, this method is to find the SeqTable entry for the sequence.
	 * This is not required by sequence server. We only need to initialize
	 * the `elm` which is used later in `cdb_sequence_nextval()`, which
	 * is calling `read_seq_tuple()` method, and require `elm` parameter.
	 *
	 * In GPDB, a sequence server is used to generate unique values for all the sequence.
	 * It doesn't have to lock on the sequence relation, because there will be
	 * only a single instance of sequence server to handle all the requests from
	 * segments to generate the sequence values.
	 * To prevent collision of generating sequence values between 'master'
	 * (e.g.`select nextval(seq)`) and 'segments' (e.g. `insert into table with
	 * serial column`), an BUFFER_LOCK_EXCLUSIVE lock is held on the shared buffer
	 * of the sequence relation.
	 */
1721 1722
	init_sequence(relid, &elm, NULL);

1723 1724 1725 1726 1727 1728 1729
    /* Build a pseudo relcache entry with just enough info to call bufmgr. */
    seqrel = &fakerel;
    cdb_sequence_relation_init(seqrel, tablespaceid, dbid, relid, istemp);

    /* CDB TODO: Catch errors. */

    /* Update the sequence object. */
1730
    cdb_sequence_nextval(elm, seqrel, plast, pcached, pincrement, poverflow);
1731 1732 1733 1734

    /* Cleanup. */
    cdb_sequence_relation_term(seqrel);
}                               /* cdb_sequence_server_nextval */
1735

1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765
/*
 * Mask last_value and log_cnt for consistency checking
 *
 * To avoid logging every fetch from a sequence, SEQ_LOG_VALS are pre-logged
 * and thus we need to mask the last_value and log_cnt during consistency
 * checks.
 */
static void
mask_seq_values(Page page)
{
	OffsetNumber 		i;
	OffsetNumber 		maxoff;
	Form_pg_sequence	seqtup;

	maxoff = PageGetMaxOffsetNumber(page);

	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
	{
		HeapTupleData	htup;
		ItemId			iid = PageGetItemId(page, i);

		htup.t_data = (HeapTupleHeader) ((char *) page + ItemIdGetOffset(iid));
		htup.t_len = ItemIdGetLength(iid);

		seqtup = (Form_pg_sequence) GETSTRUCT(&htup);
		MemSet(&seqtup->last_value, 0, sizeof(int64));
		MemSet(&seqtup->log_cnt, 0, sizeof(int64));
	}
}

1766 1767 1768 1769 1770 1771 1772 1773
/*
 * Mask a Sequence page before performing consistency checks on it.
 */
void
seq_mask(char *page, BlockNumber blkno)
{
	mask_page_lsn_and_checksum(page);

1774 1775 1776 1777 1778 1779
	/*
	 * last_value and log_cnt need to be masked to account for SEQ_LOG_VALS
	 * skipped loggings of fetching
	 */
	mask_seq_values(page);

1780 1781
	mask_unused_space(page);
}