sequence.c 45.7 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * sequence.c
4
 *	  PostgreSQL sequences support code.
5
 *
6
 * Portions Copyright (c) 2005-2008, Greenplum inc.
7
 * Portions Copyright (c) 2012-Present Pivotal Software, Inc.
B
Bruce Momjian 已提交
8
 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
9 10 11 12
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
13
 *	  $PostgreSQL: pgsql/src/backend/commands/sequence.c,v 1.160 2009/06/11 14:48:56 momjian Exp $
14
 *
15 16
 *-------------------------------------------------------------------------
 */
17
#include "postgres.h"
18

19
#include "access/heapam.h"
20
#include "access/bufmask.h"
21 22
#include "access/transam.h"
#include "access/xact.h"
23
#include "access/xlogutils.h"
24
#include "catalog/dependency.h"
25
#include "catalog/heap.h"
26
#include "catalog/namespace.h"
27
#include "catalog/pg_type.h"
28
#include "commands/defrem.h"
29
#include "commands/sequence.h"
30
#include "commands/tablecmds.h"
B
Bruce Momjian 已提交
31
#include "miscadmin.h"
32
#include "storage/smgr.h"               /* RelationCloseSmgr -> smgrclose */
33
#include "nodes/makefuncs.h"
34 35
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
36
#include "storage/proc.h"
37
#include "utils/acl.h"
B
Bruce Momjian 已提交
38
#include "utils/builtins.h"
39
#include "utils/formatting.h"
40
#include "utils/lsyscache.h"
41
#include "utils/resowner.h"
42
#include "utils/syscache.h"
43

44
#include "catalog/oid_dispatch.h"
45
#include "cdb/cdbdisp_query.h"
H
Heikki Linnakangas 已提交
46
#include "cdb/cdbdoublylinked.h"
47 48 49 50 51 52 53
#include "cdb/cdbsrlz.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbmotion.h"
#include "cdb/ml_ipc.h"

#include "postmaster/seqserver.h"

54

V
Vadim B. Mikheev 已提交
55
/*
56
 * We don't want to log each fetching of a value from a sequence,
V
Vadim B. Mikheev 已提交
57
 * so we pre-log a few fetches in advance. In the event of
58
 * crash we can lose (skip over) as many values as we pre-logged.
V
Vadim B. Mikheev 已提交
59
 */
B
Bruce Momjian 已提交
60
#define SEQ_LOG_VALS	32
61

62 63 64 65 66
/*
 * The "special area" of a sequence's buffer page looks like this.
 */
#define SEQ_MAGIC	  0x1717

67 68
typedef struct sequence_magic
{
69
	uint32		magic;
70
} sequence_magic;
71

72 73 74 75 76 77
/*
 * We store a SeqTable item for every sequence we have touched in the current
 * session.  This is needed to hold onto nextval/currval state.  (We can't
 * rely on the relcache, since it's only, well, a cache, and may decide to
 * discard entries.)
 *
B
Bruce Momjian 已提交
78
 * XXX We use linear search to find pre-existing SeqTable entries.	This is
79 80 81
 * good when only a small number of sequences are touched in a session, but
 * would suck with many different sequences.  Perhaps use a hashtable someday.
 */
82 83
typedef struct SeqTableData
{
84 85
	struct SeqTableData *next;	/* link to next SeqTable object */
	Oid			relid;			/* pg_class OID of this sequence */
86
	LocalTransactionId lxid;	/* xact in which we last did a seq op */
87
	bool		last_valid;		/* do we have a valid "last" value? */
88 89 90 91
	int64		last;			/* value last returned by nextval */
	int64		cached;			/* last value already cached for nextval */
	/* if last != cached, we have not used up all the cached values */
	int64		increment;		/* copy of sequence's increment field */
92
	/* note that increment is zero until we first do read_seq_tuple() */
93
} SeqTableData;
94 95 96

typedef SeqTableData *SeqTable;

97
static SeqTable seqtab = NULL;	/* Head of list of SeqTable items */
98

99 100 101 102 103
/*
 * last_used_seq is updated by nextval() to point to the last used
 * sequence.
 */
static SeqTableData *last_used_seq = NULL;
104

105
static int64 nextval_internal(Oid relid);
106
static Relation open_share_lock(SeqTable seq);
107
static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel);
108 109
static Form_pg_sequence read_seq_tuple(SeqTable elm, Relation rel,
			   Buffer *buf, HeapTuple seqtuple);
110
static void init_params(List *options, bool isInit,
B
Bruce Momjian 已提交
111
			Form_pg_sequence new, List **owned_by);
112
static void do_setval(Oid relid, int64 next, bool iscalled);
113 114
static void process_owned_by(Relation seqrel, List *owned_by);

115
static void
116 117
cdb_sequence_nextval(SeqTable elm,
					 Relation   seqrel,
118 119 120 121 122 123 124 125 126 127 128
                     int64     *plast,
                     int64     *pcached,
                     int64     *pincrement,
                     bool      *seq_overflow);
static void
cdb_sequence_nextval_proxy(Relation seqrel,
                           int64   *plast,
                           int64   *pcached,
                           int64   *pincrement,
                           bool    *poverflow);

129
/*
B
Bruce Momjian 已提交
130
 * DefineSequence
131
 *				Creates a new sequence relation
132 133
 */
void
134
DefineSequence(CreateSeqStmt *seq)
135
{
136
	FormData_pg_sequence new;
137
	List	   *owned_by;
138
	CreateStmt *stmt = makeNode(CreateStmt);
139
	Oid			seqoid;
140 141
	Relation	rel;
	Buffer		buf;
142
	Page		page;
143
	sequence_magic *sm;
144 145 146
	HeapTuple	tuple;
	TupleDesc	tupDesc;
	Datum		value[SEQ_COL_LASTCOL];
147
	bool		null[SEQ_COL_LASTCOL];
148
	int			i;
149
	NameData	name;
150
	OffsetNumber offnum;
151

152 153
	bool shouldDispatch =  Gp_role == GP_ROLE_DISPATCH && !IsBootstrapProcessingMode();

154
	/* Check and set all option values */
155
	init_params(seq->options, true, &new, &owned_by);
156 157

	/*
158
	 * Create relation (and fill value[] and null[] for the tuple)
159 160 161
	 */
	stmt->tableElts = NIL;
	for (i = SEQ_COL_FIRSTCOL; i <= SEQ_COL_LASTCOL; i++)
162
	{
163
		ColumnDef  *coldef = makeNode(ColumnDef);
164

165 166
		coldef->inhcount = 0;
		coldef->is_local = true;
167
		coldef->is_not_null = true;
168 169
		coldef->raw_default = NULL;
		coldef->cooked_default = NULL;
170 171
		coldef->constraints = NIL;

172
		null[i - 1] = false;
173 174 175

		switch (i)
		{
176
			case SEQ_COL_NAME:
177
				coldef->typeName = makeTypeNameFromOid(NAMEOID, -1);
178
				coldef->colname = "sequence_name";
179
				namestrcpy(&name, seq->sequence->relname);
180
				value[i - 1] = NameGetDatum(&name);
181 182
				break;
			case SEQ_COL_LASTVAL:
183
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
184
				coldef->colname = "last_value";
185
				value[i - 1] = Int64GetDatumFast(new.last_value);
186
				break;
187
			case SEQ_COL_STARTVAL:
188
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
189 190 191
				coldef->colname = "start_value";
				value[i - 1] = Int64GetDatumFast(new.start_value);
				break;
192
			case SEQ_COL_INCBY:
193
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
194
				coldef->colname = "increment_by";
195
				value[i - 1] = Int64GetDatumFast(new.increment_by);
196 197
				break;
			case SEQ_COL_MAXVALUE:
198
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
199
				coldef->colname = "max_value";
200
				value[i - 1] = Int64GetDatumFast(new.max_value);
201 202
				break;
			case SEQ_COL_MINVALUE:
203
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
204
				coldef->colname = "min_value";
205
				value[i - 1] = Int64GetDatumFast(new.min_value);
206 207
				break;
			case SEQ_COL_CACHE:
208
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
209
				coldef->colname = "cache_value";
210
				value[i - 1] = Int64GetDatumFast(new.cache_value);
211
				break;
V
Vadim B. Mikheev 已提交
212
			case SEQ_COL_LOG:
213
				coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
V
Vadim B. Mikheev 已提交
214
				coldef->colname = "log_cnt";
215
				value[i - 1] = Int64GetDatum((int64) 0);
V
Vadim B. Mikheev 已提交
216
				break;
217
			case SEQ_COL_CYCLE:
218
				coldef->typeName = makeTypeNameFromOid(BOOLOID, -1);
219
				coldef->colname = "is_cycled";
220
				value[i - 1] = BoolGetDatum(new.is_cycled);
221 222
				break;
			case SEQ_COL_CALLED:
223
				coldef->typeName = makeTypeNameFromOid(BOOLOID, -1);
224
				coldef->colname = "is_called";
225
				value[i - 1] = BoolGetDatum(false);
226
				break;
227 228 229 230
		}
		stmt->tableElts = lappend(stmt->tableElts, coldef);
	}

231 232
	stmt->relation = seq->sequence;
	stmt->inhRelations = NIL;
233
	stmt->constraints = NIL;
234 235
	stmt->inhOids = NIL;
	stmt->parentOidCount = 0;
B
Bruce Momjian 已提交
236
	stmt->options = list_make1(defWithOids(false));
237
	stmt->oncommit = ONCOMMIT_NOOP;
238
	stmt->tablespacename = NULL;
239 240
	stmt->relKind = RELKIND_SEQUENCE;
	stmt->ownerid = GetUserId();
241

242
	seqoid = DefineRelation(stmt, RELKIND_SEQUENCE, RELSTORAGE_HEAP, false);
243

244 245 246 247 248 249 250 251
	/*
	 * Open and lock the new sequence.  (This lock is redundant; an
	 * AccessExclusiveLock was acquired above by DefineRelation and
	 * won't be released until end of transaction.)
	 *
	 * CDB: Acquire lock on qDisp before dispatching to qExecs, so
	 * qDisp can detect and resolve any deadlocks.
	 */
252
	rel = heap_open(seqoid, AccessExclusiveLock);
253
	tupDesc = RelationGetDescr(rel);
254

255 256 257 258
	/* Now form sequence tuple */
	tuple = heap_form_tuple(tupDesc, value, null);

	/* Initialize first page of relation with special magic number */
259
	buf = ReadBuffer(rel, P_NEW);
260 261
	Assert(BufferGetBlockNumber(buf) == 0);

262
	page = BufferGetPage(buf);
263

264
	PageInit(page, BufferGetPageSize(buf), sizeof(sequence_magic));
265 266 267
	sm = (sequence_magic *) PageGetSpecialPointer(page);
	sm->magic = SEQ_MAGIC;

268 269
	/* Now insert sequence tuple */
	LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
270

271
	/*
272
	 * Since VACUUM does not process sequences, we have to force the tuple
273
	 * to have xmin = FrozenTransactionId now.	Otherwise it would become
B
Bruce Momjian 已提交
274
	 * invisible to SELECTs after 2G transactions.	It is okay to do this
275 276 277
	 * because if the current transaction aborts, no other xact will ever
	 * examine the sequence tuple anyway.
	 *
278
	 */
279

280 281 282 283 284 285
	HeapTupleHeaderSetXmin(tuple->t_data, FrozenTransactionId);
	HeapTupleHeaderSetXminFrozen(tuple->t_data);
	HeapTupleHeaderSetCmin(tuple->t_data, FirstCommandId);
	HeapTupleHeaderSetXmax(tuple->t_data, InvalidTransactionId);
	tuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
	ItemPointerSet(&tuple->t_data->t_ctid, 0, FirstOffsetNumber);
286

287
	START_CRIT_SECTION();
288

289 290
	MarkBufferDirty(buf);

291 292 293 294 295
	offnum = PageAddItem(page, (Item) tuple->t_data, tuple->t_len,
						 InvalidOffsetNumber, false, false);
	if (offnum != FirstOffsetNumber)
		elog(ERROR, "failed to add sequence tuple to page");

296 297
	/* XLOG stuff */
	if (!rel->rd_istemp)
298
	{
299 300 301
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
		XLogRecData rdata[2];
302 303

		xlrec.node = rel->rd_node;
304

305 306
		rdata[0].data = (char *) &xlrec;
		rdata[0].len = sizeof(xl_seq_rec);
307
		rdata[0].buffer = InvalidBuffer;
308 309
		rdata[0].next = &(rdata[1]);

310
		rdata[1].data = (char *) tuple->t_data;
311
		rdata[1].len = tuple->t_len;
312
		rdata[1].buffer = InvalidBuffer;
313 314
		rdata[1].next = NULL;

315
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);
316 317 318

		PageSetLSN(page, recptr);
	}
319

320
	END_CRIT_SECTION();
321

322 323
	UnlockReleaseBuffer(buf);

324 325 326 327
	/* process OWNED BY if given */
	if (owned_by)
		process_owned_by(rel, owned_by);

328
	heap_close(rel, NoLock);
329 330 331 332 333

	
	/* Dispatch to segments */
	if (shouldDispatch)
	{
334 335 336 337
		CdbDispatchUtilityStatement((Node *) seq,
									DF_CANCEL_ON_ERROR|
									DF_WITH_SNAPSHOT|
									DF_NEED_TWO_PHASE,
338
									GetAssignedOidsForDispatch(),
339
									NULL);
340
	}
341 342
}

B
Bruce Momjian 已提交
343 344 345
/*
 * AlterSequence
 *
346
 * Modify the definition of a sequence relation
B
Bruce Momjian 已提交
347 348
 */
void
349
AlterSequence(AlterSeqStmt *stmt)
B
Bruce Momjian 已提交
350
{
351
	Oid			relid;
352 353 354 355 356 357 358 359 360 361 362 363

	/* find sequence */
	relid = RangeVarGetRelid(stmt->sequence, false);

	/* allow ALTER to sequence owner only */
	/* if you change this, see also callers of AlterSequenceInternal! */
	if (!pg_class_ownercheck(relid, GetUserId()))
		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
					   stmt->sequence->relname);

	/* do the work */
	AlterSequenceInternal(relid, stmt->options);
364 365 366 367 368 369 370 371

	if (Gp_role == GP_ROLE_DISPATCH)
		CdbDispatchUtilityStatement((Node *) stmt,
									DF_CANCEL_ON_ERROR|
									DF_WITH_SNAPSHOT|
									DF_NEED_TWO_PHASE,
									NIL,
									NULL);
372 373 374 375 376 377 378 379 380 381
}

/*
 * AlterSequenceInternal
 *
 * Same as AlterSequence except that the sequence is specified by OID
 * and we assume the caller already checked permissions.
 */
void
AlterSequenceInternal(Oid relid, List *options)
B
Bruce Momjian 已提交
382 383 384 385 386 387
{
	SeqTable	elm;
	Relation	seqrel;
	Buffer		buf;
	Form_pg_sequence seq;
	FormData_pg_sequence new;
388
	List	   *owned_by;
389
	HeapTupleData seqtuple;
390
	int64		save_increment;
391
	bool		bSeqIsTemp = false;
392 393 394
	int			numopts	   = 0;
	char	   *alter_subtype = "";		/* metadata tracking: kind of
										   redundant to say "role" */
B
Bruce Momjian 已提交
395 396

	/* open and AccessShareLock sequence */
397
	init_sequence(relid, &elm, &seqrel);
B
Bruce Momjian 已提交
398 399

	/* lock page' buffer and read tuple into new sequence structure */
400 401 402

	/* hack to keep ALTER SEQUENCE OWNED BY from changing currval state */
	save_increment = elm->increment;
403
	
404
	seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
405
	elm->increment = seq->increment_by;
B
Bruce Momjian 已提交
406

407 408
	/* Copy old values of options into workspace */
	memcpy(&new, seq, sizeof(FormData_pg_sequence));
B
Bruce Momjian 已提交
409

410
	/* Check and set new values */
411
	init_params(options, false, &new, &owned_by);
B
Bruce Momjian 已提交
412

413 414 415 416 417 418 419 420
	if (owned_by)
	{
		/* Restore previous state of elm (assume nothing else changes) */
		elm->increment = save_increment;
	}
	else
	{
		/* Clear local cache so that we don't think we have cached numbers */
421 422
		/* Note that we do not change the currval() state */
		elm->cached = elm->last;
423 424
	}

425
	/* Now okay to update the on-disk tuple */
B
Bruce Momjian 已提交
426 427
	START_CRIT_SECTION();

428 429
	memcpy(seq, &new, sizeof(FormData_pg_sequence));

430 431
	MarkBufferDirty(buf);

B
Bruce Momjian 已提交
432
	/* XLOG stuff */
433 434 435 436

	bSeqIsTemp = seqrel->rd_istemp;

	if (!bSeqIsTemp)
B
Bruce Momjian 已提交
437 438 439 440
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
		XLogRecData rdata[2];
441
		Page		page = BufferGetPage(buf);
B
Bruce Momjian 已提交
442 443

		xlrec.node = seqrel->rd_node;
444

B
Bruce Momjian 已提交
445 446
		rdata[0].data = (char *) &xlrec;
		rdata[0].len = sizeof(xl_seq_rec);
447
		rdata[0].buffer = InvalidBuffer;
B
Bruce Momjian 已提交
448 449
		rdata[0].next = &(rdata[1]);

450 451
		rdata[1].data = (char *) seqtuple.t_data;
		rdata[1].len = seqtuple.t_len;
452
		rdata[1].buffer = InvalidBuffer;
B
Bruce Momjian 已提交
453 454
		rdata[1].next = NULL;

455
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);
B
Bruce Momjian 已提交
456 457 458 459 460 461

		PageSetLSN(page, recptr);
	}

	END_CRIT_SECTION();

462
	UnlockReleaseBuffer(buf);
B
Bruce Momjian 已提交
463

464 465 466 467
	/* process OWNED BY if given */
	if (owned_by)
		process_owned_by(seqrel, owned_by);

B
Bruce Momjian 已提交
468
	relation_close(seqrel, NoLock);
469

470
	numopts = list_length(options);
471 472 473 474 475 476 477 478 479 480 481 482 483 484 485

	if (numopts > 1)
	{
		char allopts[NAMEDATALEN];

		sprintf(allopts, "%d OPTIONS", numopts);

		alter_subtype = pstrdup(allopts);
	}
	else if (0 == numopts)
	{
		alter_subtype = "0 OPTIONS";
	}
	else if ((Gp_role == GP_ROLE_DISPATCH) && (!bSeqIsTemp))
	{
486
		ListCell		*option = list_head(options);
487 488 489 490 491 492 493 494 495 496 497 498
		DefElem			*defel	= (DefElem *) lfirst(option);
		char			*tempo	= NULL;

		alter_subtype = defel->defname;
		if (0 == strcmp(alter_subtype, "owned_by"))
			alter_subtype = "OWNED BY";

		tempo = str_toupper(alter_subtype, strlen(alter_subtype));

		alter_subtype = tempo;
	}

499
	if (Gp_role == GP_ROLE_DISPATCH && !bSeqIsTemp)
500
	{
501 502 503 504 505
		/* MPP-6929: metadata tracking */
		MetaTrackUpdObject(RelationRelationId,
						   relid,
						   GetUserId(),
						   "ALTER", alter_subtype);
506
	}
B
Bruce Momjian 已提交
507 508
}

509

510 511 512 513 514
/*
 * Note: nextval with a text argument is no longer exported as a pg_proc
 * entry, but we keep it around to ease porting of C code that may have
 * called the function directly.
 */
515 516
Datum
nextval(PG_FUNCTION_ARGS)
517
{
518
	text	   *seqin = PG_GETARG_TEXT_P(0);
519
	RangeVar   *sequence;
520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538
	Oid			relid;

	sequence = makeRangeVarFromNameList(textToQualifiedNameList(seqin));
	relid = RangeVarGetRelid(sequence, false);

	PG_RETURN_INT64(nextval_internal(relid));
}

Datum
nextval_oid(PG_FUNCTION_ARGS)
{
	Oid			relid = PG_GETARG_OID(0);

	PG_RETURN_INT64(nextval_internal(relid));
}

static int64
nextval_internal(Oid relid)
{
539
	SeqTable	elm;
540
	Relation	seqrel;
541 542 543 544 545 546 547
	bool is_overflow = false;

	/* open and AccessShareLock sequence */
	init_sequence(relid, &elm, &seqrel);

	if (elm->last != elm->cached)		/* some numbers were cached */
	{
548 549
		Assert(elm->last_valid);
		Assert(elm->increment != 0);
550 551
		elm->last += elm->increment;
		relation_close(seqrel, NoLock);
552
		last_used_seq = elm;
553 554 555 556 557 558 559 560 561
		return elm->last;
	}

	if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 errmsg("permission denied for sequence %s",
						RelationGetRelationName(seqrel))));

562 563 564 565 566 567 568 569 570
	/* Update the sequence object. */
	if (Gp_role == GP_ROLE_EXECUTE)
		cdb_sequence_nextval_proxy(seqrel,
								   &elm->last,
								   &elm->cached,
								   &elm->increment,
								   &is_overflow);
	else
		cdb_sequence_nextval(elm,
571
							 seqrel,
572 573 574 575
							 &elm->last,
							 &elm->cached,
							 &elm->increment,
							 &is_overflow);
576 577
	last_used_seq = elm;

578 579 580
	if (is_overflow)
	{
		relation_close(seqrel, NoLock);
581

582 583 584
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("nextval: reached %s value of sequence \"%s\" (" INT64_FORMAT ")",
585 586
                        elm->increment>0 ? "maximum":"minimum",
                        RelationGetRelationName(seqrel), elm->last)));
587 588 589
	}
	else
		elm->last_valid = true;
590 591

	relation_close(seqrel, NoLock);
592 593 594 595
	return elm->last;
}


596 597 598
static void
cdb_sequence_nextval(SeqTable elm,
					 Relation   seqrel,
599 600 601 602 603
                     int64     *plast,
                     int64     *pcached,
                     int64     *pincrement,
                     bool      *poverflow)
{
604
	Buffer		buf;
605
	Page		page;
606
	HeapTupleData seqtuple;
607
	Form_pg_sequence seq;
608
	int64		incby,
609 610
				maxv,
				minv,
V
Vadim B. Mikheev 已提交
611 612 613 614
				cache,
				log,
				fetch,
				last;
615
	int64		result,
616 617
				next,
				rescnt = 0;
618
	bool 		have_overflow = false;
V
Vadim B. Mikheev 已提交
619
	bool		logit = false;
620

621
	/* lock page' buffer and read tuple */
622
	seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
623
	page = BufferGetPage(buf);
624

V
Vadim B. Mikheev 已提交
625
	last = next = result = seq->last_value;
626 627 628
	incby = seq->increment_by;
	maxv = seq->max_value;
	minv = seq->min_value;
V
Vadim B. Mikheev 已提交
629 630
	fetch = cache = seq->cache_value;
	log = seq->log_cnt;
631

632
	if (!seq->is_called)
V
Vadim B. Mikheev 已提交
633
	{
634
		rescnt++;				/* return last_value if not is_called */
V
Vadim B. Mikheev 已提交
635 636
		fetch--;
	}
637

638
	/*
B
Bruce Momjian 已提交
639 640 641
	 * Decide whether we should emit a WAL log record.	If so, force up the
	 * fetch count to grab SEQ_LOG_VALS more values than we actually need to
	 * cache.  (These will then be usable without logging.)
642
	 *
643 644 645 646
	 * If this is the first nextval after a checkpoint, we must force a new
	 * WAL record to be written anyway, else replay starting from the
	 * checkpoint would fail to advance the sequence past the logged values.
	 * In this case we may as well fetch extra values.
647
	 */
648
	if (log < fetch || !seq->is_called)
V
Vadim B. Mikheev 已提交
649
	{
650 651
		/* forced log to satisfy local demand for values */
		fetch = log = fetch + SEQ_LOG_VALS;
V
Vadim B. Mikheev 已提交
652 653
		logit = true;
	}
654 655 656 657 658 659 660 661 662 663 664
	else
	{
		XLogRecPtr	redoptr = GetRedoRecPtr();

		if (XLByteLE(PageGetLSN(page), redoptr))
		{
			/* last update of seq was before checkpoint */
			fetch = log = fetch + SEQ_LOG_VALS;
			logit = true;
		}
	}
V
Vadim B. Mikheev 已提交
665

B
Bruce Momjian 已提交
666
	while (fetch)				/* try to fetch cache [+ log ] numbers */
667
	{
668
		/*
B
Bruce Momjian 已提交
669 670
		 * Check MAXVALUE for ascending sequences and MINVALUE for descending
		 * sequences
671
		 */
672
		if (incby > 0)
673
		{
674
			/* ascending sequence */
675 676 677 678
			if ((maxv >= 0 && next > maxv - incby) ||
				(maxv < 0 && next + incby > maxv))
			{
				if (rescnt > 0)
V
Vadim B. Mikheev 已提交
679
					break;		/* stop fetching */
680
				if (!seq->is_cycled)
681
				{
682 683 684 685 686
					have_overflow = true;
				}
				else
				{
					next = minv;
687
				}
688 689 690 691 692 693
			}
			else
				next += incby;
		}
		else
		{
694
			/* descending sequence */
695 696 697 698
			if ((minv < 0 && next < minv - incby) ||
				(minv >= 0 && next + incby < minv))
			{
				if (rescnt > 0)
V
Vadim B. Mikheev 已提交
699
					break;		/* stop fetching */
700
				if (!seq->is_cycled)
701
				{
702 703 704 705 706
					have_overflow = true;
				}
				else
				{
					next = maxv;
707
				}
708 709 710 711
			}
			else
				next += incby;
		}
V
Vadim B. Mikheev 已提交
712 713 714 715 716 717
		fetch--;
		if (rescnt < cache)
		{
			log--;
			rescnt++;
			last = next;
B
Bruce Momjian 已提交
718 719
			if (rescnt == 1)	/* if it's first result - */
				result = next;	/* it's what to return */
V
Vadim B. Mikheev 已提交
720
		}
721 722
	}

723 724 725
	log -= fetch;				/* adjust for any unfetched numbers */
	Assert(log >= 0);

726 727 728 729 730
    /* set results for caller */
	*poverflow = have_overflow; /* has the sequence overflown */
    *plast = result;            /* last returned number */
    *pcached = last;            /* last fetched number */
	*pincrement = incby;
V
Vadim B. Mikheev 已提交
731

732
	/* ready to change the on-disk (or really, in-buffer) tuple */
733
	START_CRIT_SECTION();
734

735 736 737 738 739 740 741 742 743
	/*
	 * We must mark the buffer dirty before doing XLogInsert(); see notes in
	 * SyncOneBuffer().  However, we don't apply the desired changes just yet.
	 * This looks like a violation of the buffer update protocol, but it is
	 * in fact safe because we hold exclusive lock on the buffer.  Any other
	 * process, including a checkpoint, that tries to examine the buffer
	 * contents will block until we release the lock, and then will see the
	 * final state that we install below.
	 */
744 745
	MarkBufferDirty(buf);

746 747
	/* XLOG stuff */
	if (logit && !seqrel->rd_istemp)
V
Vadim B. Mikheev 已提交
748 749 750
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
B
Bruce Momjian 已提交
751
		XLogRecData rdata[2];
V
Vadim B. Mikheev 已提交
752

753 754 755 756 757 758
		/*
		 * We don't log the current state of the tuple, but rather the state
		 * as it would appear after "log" more fetches.  This lets us skip
		 * that many future WAL records, at the cost that we lose those
		 * sequence values if we crash.
		 */
759

760
		/* set values that will be saved in xlog */
761
		seq->last_value = next;
762
		seq->is_called = true;
763
		seq->log_cnt = 0;
764

765
		xlrec.node = seqrel->rd_node;
B
Bruce Momjian 已提交
766
		rdata[0].data = (char *) &xlrec;
767
		rdata[0].len = sizeof(xl_seq_rec);
768
		rdata[0].buffer = InvalidBuffer;
769 770
		rdata[0].next = &(rdata[1]);

771 772
		rdata[1].data = (char *) seqtuple.t_data;
		rdata[1].len = seqtuple.t_len;
773
		rdata[1].buffer = InvalidBuffer;
774 775
		rdata[1].next = NULL;

776
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);
V
Vadim B. Mikheev 已提交
777

778
		PageSetLSN(page, recptr);
779 780 781 782 783 784 785 786 787 788 789 790 791

		/* need to update where we've inserted to into shmem so that the QD can flush it
		 * when necessary
		 */
		LWLockAcquire(SeqServerControlLock, LW_EXCLUSIVE);

		if (XLByteLT(seqServerCtl->lastXlogEntry, recptr))
		{
			seqServerCtl->lastXlogEntry.xlogid = recptr.xlogid;
			seqServerCtl->lastXlogEntry.xrecoff = recptr.xrecoff;
		}

		LWLockRelease(SeqServerControlLock);
V
Vadim B. Mikheev 已提交
792
	}
793

794
	/* Now update sequence tuple to the intended final state */
V
Vadim B. Mikheev 已提交
795
	seq->last_value = last;		/* last fetched number */
796
	seq->is_called = true;
V
Vadim B. Mikheev 已提交
797
	seq->log_cnt = log;			/* how much is logged */
798

799
	END_CRIT_SECTION();
800

801
	UnlockReleaseBuffer(buf);
802
}                               /* cdb_sequence_nextval */
803

804

805
Datum
806
currval_oid(PG_FUNCTION_ARGS)
807
{
808 809
	Oid			relid = PG_GETARG_OID(0);
	int64		result;
810
	SeqTable	elm;
811
	Relation	seqrel;
812

813 814 815 816 817 818 819 820
	/* For now, strictly forbidden on MPP. */
	if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE)
	{
		ereport(ERROR,
				(errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED),
				 errmsg("currval() not supported")));
	}

V
Vadim B. Mikheev 已提交
821
	/* open and AccessShareLock sequence */
822
	init_sequence(relid, &elm, &seqrel);
823

824 825
	if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_SELECT) != ACLCHECK_OK &&
		pg_class_aclcheck(elm->relid, GetUserId(), ACL_USAGE) != ACLCHECK_OK)
826 827
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
828
				 errmsg("permission denied for sequence %s",
829
						RelationGetRelationName(seqrel))));
830

831
	if (!elm->last_valid)
832 833
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
834
				 errmsg("currval of sequence \"%s\" is not yet defined in this session",
835
						RelationGetRelationName(seqrel))));
836 837 838

	result = elm->last;

839 840
	relation_close(seqrel, NoLock);

841
	PG_RETURN_INT64(result);
842 843
}

844 845 846 847 848 849
Datum
lastval(PG_FUNCTION_ARGS)
{
	Relation	seqrel;
	int64		result;

850 851 852 853 854 855 856 857
	/* For now, strictly forbidden on MPP. */
	if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE)
	{
		ereport(ERROR,
				(errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED),
				 errmsg("lastval() not supported")));
	}

858 859 860 861 862 863
	if (last_used_seq == NULL)
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("lastval is not yet defined in this session")));

	/* Someone may have dropped the sequence since the last nextval() */
864 865 866
	if (!SearchSysCacheExists(RELOID,
							  ObjectIdGetDatum(last_used_seq->relid),
							  0, 0, 0))
867 868 869 870
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("lastval is not yet defined in this session")));

871
	seqrel = open_share_lock(last_used_seq);
872 873

	/* nextval() must have already been called for this sequence */
874
	Assert(last_used_seq->last_valid);
875

876 877
	if (pg_class_aclcheck(last_used_seq->relid, GetUserId(), ACL_SELECT) != ACLCHECK_OK &&
		pg_class_aclcheck(last_used_seq->relid, GetUserId(), ACL_USAGE) != ACLCHECK_OK)
878 879 880 881 882 883 884
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 errmsg("permission denied for sequence %s",
						RelationGetRelationName(seqrel))));

	result = last_used_seq->last;
	relation_close(seqrel, NoLock);
885

886 887 888
	PG_RETURN_INT64(result);
}

B
Bruce Momjian 已提交
889
/*
890 891 892 893
 * Main internal procedure that handles 2 & 3 arg forms of SETVAL.
 *
 * Note that the 3 arg version (which sets the is_called flag) is
 * only for use in pg_dump, and setting the is_called flag may not
B
Bruce Momjian 已提交
894
 * work if multiple users are attached to the database and referencing
895 896
 * the sequence (unlikely if pg_dump is restoring it).
 *
B
Bruce Momjian 已提交
897
 * It is necessary to have the 3 arg version so that pg_dump can
898 899 900 901
 * restore the state of a sequence exactly during data-only restores -
 * it is the only way to clear the is_called flag in an existing
 * sequence.
 */
B
Bruce Momjian 已提交
902
static void
903
do_setval(Oid relid, int64 next, bool iscalled)
M
 
Marc G. Fournier 已提交
904 905
{
	SeqTable	elm;
906
	Relation	seqrel;
907
	Buffer		buf;
908
	HeapTupleData seqtuple;
909
	Form_pg_sequence seq;
M
 
Marc G. Fournier 已提交
910

911 912 913 914 915 916 917
	if (Gp_role == GP_ROLE_EXECUTE)
	{
		ereport(ERROR,
				(errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED),
				 errmsg("setval() not supported in this context")));
	}

918
	/* open and AccessShareLock sequence */
919
	init_sequence(relid, &elm, &seqrel);
920 921

	if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
922 923
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
924
				 errmsg("permission denied for sequence %s",
925
						RelationGetRelationName(seqrel))));
M
 
Marc G. Fournier 已提交
926

927
	/* lock page' buffer and read tuple */
928
	seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
929
	elm->increment = seq->increment_by;
M
 
Marc G. Fournier 已提交
930

931
	if ((next < seq->min_value) || (next > seq->max_value))
932
	{
B
Bruce Momjian 已提交
933 934 935 936
		char		bufv[100],
					bufm[100],
					bufx[100];

937 938 939
		snprintf(bufv, sizeof(bufv), INT64_FORMAT, next);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, seq->min_value);
		snprintf(bufx, sizeof(bufx), INT64_FORMAT, seq->max_value);
940 941
		ereport(ERROR,
				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
942
				 errmsg("setval: value %s is out of bounds for sequence \"%s\" (%s..%s)",
943 944
						bufv, RelationGetRelationName(seqrel),
						bufm, bufx)));
945
	}
M
 
Marc G. Fournier 已提交
946

947 948 949 950 951 952 953 954 955
	/* Set the currval() state only if iscalled = true */
	if (iscalled)
	{
		elm->last = next;		/* last returned number */
		elm->last_valid = true;
	}

	/* In any case, forget any future cached numbers */
	elm->cached = elm->last;
M
 
Marc G. Fournier 已提交
956

957
	/* ready to change the on-disk (or really, in-buffer) tuple */
958
	START_CRIT_SECTION();
959

960 961 962 963
	seq->last_value = next;		/* last fetched number */
	seq->is_called = iscalled;
	seq->log_cnt = 0;

964 965
	MarkBufferDirty(buf);

966 967
	/* XLOG stuff */
	if (!seqrel->rd_istemp)
V
Vadim B. Mikheev 已提交
968 969 970
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
B
Bruce Momjian 已提交
971
		XLogRecData rdata[2];
972
		Page		page = BufferGetPage(buf);
V
Vadim B. Mikheev 已提交
973

974
		xlrec.node = seqrel->rd_node;
975

B
Bruce Momjian 已提交
976
		rdata[0].data = (char *) &xlrec;
977
		rdata[0].len = sizeof(xl_seq_rec);
978
		rdata[0].buffer = InvalidBuffer;
979 980
		rdata[0].next = &(rdata[1]);

981 982
		rdata[1].data = (char *) seqtuple.t_data;
		rdata[1].len = seqtuple.t_len;
983
		rdata[1].buffer = InvalidBuffer;
984 985
		rdata[1].next = NULL;

986
		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);
987 988

		PageSetLSN(page, recptr);
V
Vadim B. Mikheev 已提交
989
	}
990

991
	END_CRIT_SECTION();
M
 
Marc G. Fournier 已提交
992

993
	UnlockReleaseBuffer(buf);
994
	relation_close(seqrel, NoLock);
995 996
}

997 998 999 1000
/*
 * Implement the 2 arg setval procedure.
 * See do_setval for discussion.
 */
1001
Datum
1002
setval_oid(PG_FUNCTION_ARGS)
1003
{
1004
	Oid			relid = PG_GETARG_OID(0);
1005
	int64		next = PG_GETARG_INT64(1);
1006

1007
	do_setval(relid, next, true);
1008

1009
	PG_RETURN_INT64(next);
1010 1011
}

1012 1013 1014 1015
/*
 * Implement the 3 arg setval procedure.
 * See do_setval for discussion.
 */
1016
Datum
1017
setval3_oid(PG_FUNCTION_ARGS)
1018
{
1019
	Oid			relid = PG_GETARG_OID(0);
1020
	int64		next = PG_GETARG_INT64(1);
1021 1022
	bool		iscalled = PG_GETARG_BOOL(2);

1023
	do_setval(relid, next, iscalled);
1024

1025
	PG_RETURN_INT64(next);
M
 
Marc G. Fournier 已提交
1026 1027
}

1028

1029
/*
1030 1031
 * Open the sequence and acquire AccessShareLock if needed
 *
1032
 * If we haven't touched the sequence already in this transaction,
B
Bruce Momjian 已提交
1033
 * we need to acquire AccessShareLock.	We arrange for the lock to
1034 1035 1036
 * be owned by the top transaction, so that we don't need to do it
 * more than once per xact.
 */
1037 1038
static Relation
open_share_lock(SeqTable seq)
1039
{
1040
	LocalTransactionId thislxid = MyProc->lxid;
1041

1042
	/* Get the lock if not already held in this xact */
1043
	if (seq->lxid != thislxid)
1044 1045 1046 1047 1048 1049 1050
	{
		ResourceOwner currentOwner;

		currentOwner = CurrentResourceOwner;
		PG_TRY();
		{
			CurrentResourceOwner = TopTransactionResourceOwner;
1051
			LockRelationOid(seq->relid, AccessShareLock);
1052 1053 1054 1055 1056 1057 1058 1059 1060 1061
		}
		PG_CATCH();
		{
			/* Ensure CurrentResourceOwner is restored on error */
			CurrentResourceOwner = currentOwner;
			PG_RE_THROW();
		}
		PG_END_TRY();
		CurrentResourceOwner = currentOwner;

1062
		/* Flag that we have a lock in the current xact */
1063
		seq->lxid = thislxid;
1064
	}
1065 1066 1067

	/* We now know we have AccessShareLock, and can safely open the rel */
	return relation_open(seq->relid, NoLock);
1068 1069
}

1070
/*
1071
 * Given a relation OID, open and lock the sequence.  p_elm and p_rel are
1072
 * output parameters.
1073 1074
 *
 * GPDB: If p_rel is NULL, the sequence relation is not opened or locked.
1075 1076
 */
static void
1077
init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel)
1078
{
B
Bruce Momjian 已提交
1079
	SeqTable	elm;
1080
	Relation	seqrel;
1081

1082 1083 1084 1085 1086 1087 1088
	/* Look to see if we already have a seqtable entry for relation */
	for (elm = seqtab; elm != NULL; elm = elm->next)
	{
		if (elm->relid == relid)
			break;
	}

1089
	/*
1090
	 * Allocate new seqtable entry if we didn't find one.
1091
	 *
B
Bruce Momjian 已提交
1092 1093 1094
	 * NOTE: seqtable entries remain in the list for the life of a backend. If
	 * the sequence itself is deleted then the entry becomes wasted memory,
	 * but it's small enough that this should not matter.
B
Bruce Momjian 已提交
1095
	 */
1096
	if (elm == NULL)
1097
	{
1098
		/*
B
Bruce Momjian 已提交
1099 1100
		 * Time to make a new seqtable entry.  These entries live as long as
		 * the backend does, so we use plain malloc for them.
1101 1102
		 */
		elm = (SeqTable) malloc(sizeof(SeqTableData));
T
Tom Lane 已提交
1103
		if (elm == NULL)
1104 1105 1106
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory")));
1107
		elm->relid = relid;
1108
		elm->lxid = InvalidLocalTransactionId;
1109
		elm->last_valid = false;
1110 1111 1112
		elm->last = elm->cached = elm->increment = 0;
		elm->next = seqtab;
		seqtab = elm;
1113 1114
	}

1115 1116 1117
	/*
	 * Open the sequence relation.
	 */
1118 1119 1120
	if (p_rel)
	{
		seqrel = open_share_lock(elm);
1121

1122 1123 1124 1125 1126
		if (seqrel->rd_rel->relkind != RELKIND_SEQUENCE)
			ereport(ERROR,
					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
					 errmsg("\"%s\" is not a sequence",
							RelationGetRelationName(seqrel))));
1127

1128 1129
		*p_rel = seqrel;
	}
1130
	*p_elm = elm;
1131 1132 1133
}


1134 1135 1136 1137 1138 1139 1140 1141 1142
/*
 * Given an opened sequence relation, lock the page buffer and find the tuple
 *
 * *buf receives the reference to the pinned-and-ex-locked buffer
 * *seqtuple receives the reference to the sequence tuple proper
 *		(this arg should point to a local variable of type HeapTupleData)
 *
 * Function's return value points to the data payload of the tuple
 */
1143
static Form_pg_sequence
1144
read_seq_tuple(SeqTable elm, Relation rel, Buffer *buf, HeapTuple seqtuple)
1145
{
1146
	Page		page;
1147 1148 1149
	ItemId		lp;
	sequence_magic *sm;
	Form_pg_sequence seq;
1150

1151 1152 1153
	*buf = ReadBuffer(rel, 0);
	LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);

1154
	page = BufferGetPage(*buf);
1155 1156 1157
	sm = (sequence_magic *) PageGetSpecialPointer(page);

	if (sm->magic != SEQ_MAGIC)
1158 1159
		elog(ERROR, "bad magic number in sequence \"%s\": %08X",
			 RelationGetRelationName(rel), sm->magic);
1160 1161

	lp = PageGetItemId(page, FirstOffsetNumber);
1162
	Assert(ItemIdIsNormal(lp));
1163 1164 1165 1166

	/* Note we currently only bother to set these two fields of *seqtuple */
	seqtuple->t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
	seqtuple->t_len = ItemIdGetLength(lp);
1167

1168 1169 1170 1171 1172 1173 1174 1175
	/*
	 * Previous releases of Postgres neglected to prevent SELECT FOR UPDATE
	 * on a sequence, which would leave a non-frozen XID in the sequence
	 * tuple's xmax, which eventually leads to clog access failures or worse.
	 * If we see this has happened, clean up after it.  We treat this like a
	 * hint bit update, ie, don't bother to WAL-log it, since we can certainly
	 * do this again if the update gets lost.
	 */
1176
	if (HeapTupleHeaderGetXmax(seqtuple->t_data) != InvalidTransactionId)
1177
	{
1178 1179 1180
		HeapTupleHeaderSetXmax(seqtuple->t_data, InvalidTransactionId);
		seqtuple->t_data->t_infomask &= ~HEAP_XMAX_COMMITTED;
		seqtuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
1181
		MarkBufferDirtyHint(*buf, rel);
1182
	}
1183

1184
	seq = (Form_pg_sequence) GETSTRUCT(seqtuple);
1185

1186
	/* this is a handy place to update our copy of the increment */
1187 1188 1189
	elm->increment = seq->increment_by;

	return seq;
1190 1191
}

1192 1193
/*
 * init_params: process the options list of CREATE or ALTER SEQUENCE,
1194 1195
 * and store the values into appropriate fields of *new.  Also set
 * *owned_by to any OWNED BY option, or to NIL if there is none.
1196 1197 1198 1199
 *
 * If isInit is true, fill any unspecified options with default values;
 * otherwise, do not change existing options that aren't explicitly overridden.
 */
1200
static void
1201 1202
init_params(List *options, bool isInit,
			Form_pg_sequence new, List **owned_by)
1203
{
1204 1205
	DefElem    *start_value = NULL;
	DefElem    *restart_value = NULL;
1206 1207 1208 1209
	DefElem    *increment_by = NULL;
	DefElem    *max_value = NULL;
	DefElem    *min_value = NULL;
	DefElem    *cache_value = NULL;
1210
	DefElem    *is_cycled = NULL;
1211
	ListCell   *option;
1212

1213 1214
	*owned_by = NIL;

B
Bruce Momjian 已提交
1215
	foreach(option, options)
1216
	{
1217
		DefElem    *defel = (DefElem *) lfirst(option);
1218

1219
		if (strcmp(defel->defname, "increment") == 0)
1220 1221
		{
			if (increment_by)
1222 1223 1224
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1225
			increment_by = defel;
1226
		}
1227 1228
		else if (strcmp(defel->defname, "start") == 0)
		{
1229
			if (start_value)
1230 1231 1232
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1233
			start_value = defel;
1234 1235
		}
		else if (strcmp(defel->defname, "restart") == 0)
1236
		{
1237
			if (restart_value)
1238 1239 1240
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1241
			restart_value = defel;
1242
		}
1243
		else if (strcmp(defel->defname, "maxvalue") == 0)
1244 1245
		{
			if (max_value)
1246 1247 1248
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1249
			max_value = defel;
1250
		}
1251
		else if (strcmp(defel->defname, "minvalue") == 0)
1252 1253
		{
			if (min_value)
1254 1255 1256
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1257
			min_value = defel;
1258
		}
1259
		else if (strcmp(defel->defname, "cache") == 0)
1260 1261
		{
			if (cache_value)
1262 1263 1264
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1265
			cache_value = defel;
1266
		}
1267
		else if (strcmp(defel->defname, "cycle") == 0)
1268
		{
1269
			if (is_cycled)
1270 1271 1272
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
1273
			is_cycled = defel;
1274
		}
1275 1276 1277 1278 1279 1280 1281 1282
		else if (strcmp(defel->defname, "owned_by") == 0)
		{
			if (*owned_by)
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
			*owned_by = defGetQualifiedName(defel);
		}
1283
		else
1284
			elog(ERROR, "option \"%s\" not recognized",
1285 1286 1287
				 defel->defname);
	}

1288 1289 1290 1291 1292 1293 1294
	/*
	 * We must reset log_cnt when isInit or when changing any parameters
	 * that would affect future nextval allocations.
	 */
	if (isInit)
		new->log_cnt = 0;

B
Bruce Momjian 已提交
1295
	/* INCREMENT BY */
1296
	if (increment_by != NULL)
B
Bruce Momjian 已提交
1297 1298
	{
		new->increment_by = defGetInt64(increment_by);
1299 1300 1301
		if (new->increment_by == 0)
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1302
					 errmsg("INCREMENT must not be zero")));
1303
		new->log_cnt = 0;
B
Bruce Momjian 已提交
1304
	}
1305 1306 1307 1308
	else if (isInit)
		new->increment_by = 1;

	/* CYCLE */
1309
	if (is_cycled != NULL)
1310 1311 1312
	{
		new->is_cycled = intVal(is_cycled->arg);
		Assert(new->is_cycled == false || new->is_cycled == true);
1313
		new->log_cnt = 0;
1314 1315 1316
	}
	else if (isInit)
		new->is_cycled = false;
1317

1318
	/* MAXVALUE (null arg means NO MAXVALUE) */
1319
	if (max_value != NULL && max_value->arg)
1320
	{
1321
		new->max_value = defGetInt64(max_value);
1322 1323
		new->log_cnt = 0;
	}
1324
	else if (isInit || max_value != NULL)
1325
	{
1326
		if (new->increment_by > 0)
B
Bruce Momjian 已提交
1327
			new->max_value = SEQ_MAXVALUE;		/* ascending seq */
1328
		else
B
Bruce Momjian 已提交
1329
			new->max_value = -1;	/* descending seq */
1330
		new->log_cnt = 0;
1331
	}
1332

1333
	/* MINVALUE (null arg means NO MINVALUE) */
1334
	if (min_value != NULL && min_value->arg)
1335
	{
1336
		new->min_value = defGetInt64(min_value);
1337 1338
		new->log_cnt = 0;
	}
1339
	else if (isInit || min_value != NULL)
1340
	{
1341
		if (new->increment_by > 0)
B
Bruce Momjian 已提交
1342
			new->min_value = 1; /* ascending seq */
1343
		else
B
Bruce Momjian 已提交
1344
			new->min_value = SEQ_MINVALUE;		/* descending seq */
1345
		new->log_cnt = 0;
1346
	}
1347

1348
	/* crosscheck min/max */
1349
	if (new->min_value >= new->max_value)
1350
	{
B
Bruce Momjian 已提交
1351 1352 1353
		char		bufm[100],
					bufx[100];

1354 1355
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->min_value);
		snprintf(bufx, sizeof(bufx), INT64_FORMAT, new->max_value);
1356 1357 1358 1359
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("MINVALUE (%s) must be less than MAXVALUE (%s)",
						bufm, bufx)));
1360
	}
1361

B
Bruce Momjian 已提交
1362
	/* START WITH */
1363 1364
	if (start_value != NULL)
		new->start_value = defGetInt64(start_value);
1365
	else if (isInit)
1366
	{
1367
		if (new->increment_by > 0)
1368
			new->start_value = new->min_value;	/* ascending seq */
1369
		else
1370
			new->start_value = new->max_value;	/* descending seq */
1371
	}
1372

1373 1374
	/* crosscheck START */
	if (new->start_value < new->min_value)
1375
	{
1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398
		char		bufs[100],
					bufm[100];

		snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->start_value);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->min_value);
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("START value (%s) cannot be less than MINVALUE (%s)",
						bufs, bufm)));
	}
	if (new->start_value > new->max_value)
	{
		char		bufs[100],
					bufm[100];

		snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->start_value);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->max_value);
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
			  errmsg("START value (%s) cannot be greater than MAXVALUE (%s)",
					 bufs, bufm)));
	}

1399 1400 1401 1402 1403 1404 1405
	/* RESTART [WITH] */
	if (restart_value != NULL)
	{
		if (restart_value->arg != NULL)
			new->last_value = defGetInt64(restart_value);
		else
			new->last_value = new->start_value;
1406 1407 1408
		new->is_called = false;
		new->log_cnt = 1;
	}
1409
	else if (isInit)
1410
	{
1411
		new->last_value = new->start_value;
1412 1413
		new->is_called = false;
		new->log_cnt = 1;
1414
	}
1415

1416
	/* crosscheck RESTART (or current value, if changing MIN/MAX) */
1417
	if (new->last_value < new->min_value)
1418
	{
B
Bruce Momjian 已提交
1419 1420 1421
		char		bufs[100],
					bufm[100];

1422 1423
		snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->last_value);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->min_value);
1424 1425
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1426 1427
			   errmsg("RESTART value (%s) cannot be less than MINVALUE (%s)",
					  bufs, bufm)));
1428
	}
1429
	if (new->last_value > new->max_value)
1430
	{
B
Bruce Momjian 已提交
1431 1432 1433
		char		bufs[100],
					bufm[100];

1434 1435
		snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->last_value);
		snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->max_value);
1436 1437
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1438 1439
			errmsg("RESTART value (%s) cannot be greater than MAXVALUE (%s)",
				   bufs, bufm)));
1440
	}
1441

B
Bruce Momjian 已提交
1442
	/* CACHE */
1443
	if (cache_value != NULL)
1444
	{
1445 1446 1447 1448
		new->cache_value = defGetInt64(cache_value);
		if (new->cache_value <= 0)
		{
			char		buf[100];
B
Bruce Momjian 已提交
1449

1450 1451 1452 1453 1454 1455
			snprintf(buf, sizeof(buf), INT64_FORMAT, new->cache_value);
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					 errmsg("CACHE (%s) must be greater than zero",
							buf)));
		}
1456
		new->log_cnt = 0;
1457
	}
1458 1459
	else if (isInit)
		new->cache_value = 1;
1460 1461
}

1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485
/*
 * Process an OWNED BY option for CREATE/ALTER SEQUENCE
 *
 * Ownership permissions on the sequence are already checked,
 * but if we are establishing a new owned-by dependency, we must
 * enforce that the referenced table has the same owner and namespace
 * as the sequence.
 */
static void
process_owned_by(Relation seqrel, List *owned_by)
{
	int			nnames;
	Relation	tablerel;
	AttrNumber	attnum;

	nnames = list_length(owned_by);
	Assert(nnames > 0);
	if (nnames == 1)
	{
		/* Must be OWNED BY NONE */
		if (strcmp(strVal(linitial(owned_by)), "none") != 0)
			ereport(ERROR,
					(errcode(ERRCODE_SYNTAX_ERROR),
					 errmsg("invalid OWNED BY option"),
B
Bruce Momjian 已提交
1486
				errhint("Specify OWNED BY table.column or OWNED BY NONE.")));
1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514
		tablerel = NULL;
		attnum = 0;
	}
	else
	{
		List	   *relname;
		char	   *attrname;
		RangeVar   *rel;

		/* Separate relname and attr name */
		relname = list_truncate(list_copy(owned_by), nnames - 1);
		attrname = strVal(lfirst(list_tail(owned_by)));

		/* Open and lock rel to ensure it won't go away meanwhile */
		rel = makeRangeVarFromNameList(relname);
		tablerel = relation_openrv(rel, AccessShareLock);

		/* Must be a regular table */
		if (tablerel->rd_rel->relkind != RELKIND_RELATION)
			ereport(ERROR,
					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
					 errmsg("referenced relation \"%s\" is not a table",
							RelationGetRelationName(tablerel))));

		/* We insist on same owner and schema */
		if (seqrel->rd_rel->relowner != tablerel->rd_rel->relowner)
			ereport(ERROR,
					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
B
Bruce Momjian 已提交
1515
					 errmsg("sequence must have same owner as table it is linked to")));
1516 1517 1518
		if (RelationGetNamespace(seqrel) != RelationGetNamespace(tablerel))
			ereport(ERROR,
					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
P
Peter Eisentraut 已提交
1519
					 errmsg("sequence must be in same schema as table it is linked to")));
1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530

		/* Now, fetch the attribute number from the system cache */
		attnum = get_attnum(RelationGetRelid(tablerel), attrname);
		if (attnum == InvalidAttrNumber)
			ereport(ERROR,
					(errcode(ERRCODE_UNDEFINED_COLUMN),
					 errmsg("column \"%s\" of relation \"%s\" does not exist",
							attrname, RelationGetRelationName(tablerel))));
	}

	/*
B
Bruce Momjian 已提交
1531 1532
	 * OK, we are ready to update pg_depend.  First remove any existing AUTO
	 * dependencies for the sequence, then optionally add a new one.
1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554
	 */
	markSequenceUnowned(RelationGetRelid(seqrel));

	if (tablerel)
	{
		ObjectAddress refobject,
					depobject;

		refobject.classId = RelationRelationId;
		refobject.objectId = RelationGetRelid(tablerel);
		refobject.objectSubId = attnum;
		depobject.classId = RelationRelationId;
		depobject.objectId = RelationGetRelid(seqrel);
		depobject.objectSubId = 0;
		recordDependencyOn(&depobject, &refobject, DEPENDENCY_AUTO);
	}

	/* Done, but hold lock until commit */
	if (tablerel)
		relation_close(tablerel, NoLock);
}

V
Vadim B. Mikheev 已提交
1555

B
Bruce Momjian 已提交
1556
void
1557
seq_redo(XLogRecPtr beginLoc, XLogRecPtr lsn, XLogRecord *record)
V
Vadim B. Mikheev 已提交
1558
{
B
Bruce Momjian 已提交
1559 1560 1561 1562 1563 1564
	uint8		info = record->xl_info & ~XLR_INFO_MASK;
	Buffer		buffer;
	Page		page;
	char	   *item;
	Size		itemsz;
	xl_seq_rec *xlrec = (xl_seq_rec *) XLogRecGetData(record);
1565
	sequence_magic *sm;
V
Vadim B. Mikheev 已提交
1566

1567 1568 1569
	/* Backup blocks are not used in seq records */
	Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));

1570
	if (info != XLOG_SEQ_LOG)
1571
		elog(PANIC, "seq_redo: unknown op code %u", info);
1572

1573
	buffer = XLogReadBuffer(xlrec->node, 0, true);
1574
	Assert(BufferIsValid(buffer));
V
Vadim B. Mikheev 已提交
1575 1576
	page = (Page) BufferGetPage(buffer);

1577 1578
	/* Always reinit the page and reinstall the magic number */
	/* See comments in DefineSequence */
1579 1580 1581
	PageInit((Page) page, BufferGetPageSize(buffer), sizeof(sequence_magic));
	sm = (sequence_magic *) PageGetSpecialPointer(page);
	sm->magic = SEQ_MAGIC;
V
Vadim B. Mikheev 已提交
1582

B
Bruce Momjian 已提交
1583
	item = (char *) xlrec + sizeof(xl_seq_rec);
1584
	itemsz = record->xl_len - sizeof(xl_seq_rec);
1585

B
Bruce Momjian 已提交
1586
	if (PageAddItem(page, (Item) item, itemsz,
1587
					FirstOffsetNumber, false, false) == InvalidOffsetNumber)
1588
		elog(PANIC, "seq_redo: failed to add item to page");
V
Vadim B. Mikheev 已提交
1589 1590

	PageSetLSN(page, lsn);
1591 1592
	MarkBufferDirty(buffer);
	UnlockReleaseBuffer(buffer);
V
Vadim B. Mikheev 已提交
1593 1594
}

B
Bruce Momjian 已提交
1595
void
1596
seq_desc(StringInfo buf, XLogRecPtr beginLoc, XLogRecord *record)
V
Vadim B. Mikheev 已提交
1597
{
1598 1599
	uint8		info = record->xl_info & ~XLR_INFO_MASK;
	char		*rec = XLogRecGetData(record);
B
Bruce Momjian 已提交
1600
	xl_seq_rec *xlrec = (xl_seq_rec *) rec;
V
Vadim B. Mikheev 已提交
1601 1602

	if (info == XLOG_SEQ_LOG)
1603
		appendStringInfo(buf, "log: ");
V
Vadim B. Mikheev 已提交
1604 1605
	else
	{
1606
		appendStringInfo(buf, "UNKNOWN");
V
Vadim B. Mikheev 已提交
1607 1608 1609
		return;
	}

1610
	appendStringInfo(buf, "rel %u/%u/%u",
B
Bruce Momjian 已提交
1611
			   xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode);
V
Vadim B. Mikheev 已提交
1612
}
1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696


/*
 * Initialize a pseudo relcache entry with just enough info to call bufmgr.
 */
static void
cdb_sequence_relation_init(Relation seqrel,
                           Oid      tablespaceid,
                           Oid      dbid,
                           Oid      relid,
                           bool     istemp)
{
    /* See RelationBuildDesc in relcache.c */
    memset(seqrel, 0, sizeof(*seqrel));

    seqrel->rd_smgr = NULL;
    seqrel->rd_refcnt = 99;

    seqrel->rd_id = relid;
    seqrel->rd_istemp = istemp;

    /* Must use shared buffer pool so seqserver & QDs can see the data. */
    seqrel->rd_isLocalBuf = false;

	seqrel->rd_rel = (Form_pg_class)palloc0(CLASS_TUPLE_SIZE);
    sprintf(seqrel->rd_rel->relname.data, "pg_class.oid=%d", relid);

    /* as in RelationInitPhysicalAddr... */
    seqrel->rd_node.spcNode = tablespaceid;
    seqrel->rd_node.dbNode = dbid;
    seqrel->rd_node.relNode = relid;
}                               /* cdb_sequence_relation_init */

/*
 * Clean up pseudo relcache entry.
 */
static void
cdb_sequence_relation_term(Relation seqrel)
{
    /* Close the file. */
    RelationCloseSmgr(seqrel);

    if (seqrel->rd_rel)
        pfree(seqrel->rd_rel);
}                               /* cdb_sequence_relation_term */



/*
 * CDB: forward a nextval request from qExec to the sequence server
 */
void
cdb_sequence_nextval_proxy(Relation	seqrel,
                           int64   *plast,
                           int64   *pcached,
                           int64   *pincrement,
                           bool    *poverflow)
{

	sendSequenceRequest(GetSeqServerFD(),
						seqrel,
    					gp_session_id,
    					plast,
    					pcached,
    					pincrement,
    					poverflow);

}                               /* cdb_sequence_server_nextval */


/*
 * CDB: nextval entry point called by sequence server
 */
void
cdb_sequence_nextval_server(Oid    tablespaceid,
                            Oid    dbid,
                            Oid    relid,
                            bool   istemp,
                            int64 *plast,
                            int64 *pcached,
                            int64 *pincrement,
                            bool  *poverflow)
{
    RelationData    fakerel;
1697
	SeqTable	elm;
1698 1699 1700 1701 1702 1703
	Relation	    seqrel = &fakerel;

    *plast = 0;
    *pcached = 0;
    *pincrement = 0;

1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718
	/*
	 * In Postgres, this method is to find the SeqTable entry for the sequence.
	 * This is not required by sequence server. We only need to initialize
	 * the `elm` which is used later in `cdb_sequence_nextval()`, which
	 * is calling `read_seq_tuple()` method, and require `elm` parameter.
	 *
	 * In GPDB, a sequence server is used to generate unique values for all the sequence.
	 * It doesn't have to lock on the sequence relation, because there will be
	 * only a single instance of sequence server to handle all the requests from
	 * segments to generate the sequence values.
	 * To prevent collision of generating sequence values between 'master'
	 * (e.g.`select nextval(seq)`) and 'segments' (e.g. `insert into table with
	 * serial column`), an BUFFER_LOCK_EXCLUSIVE lock is held on the shared buffer
	 * of the sequence relation.
	 */
1719 1720
	init_sequence(relid, &elm, NULL);

1721 1722 1723 1724 1725 1726 1727
    /* Build a pseudo relcache entry with just enough info to call bufmgr. */
    seqrel = &fakerel;
    cdb_sequence_relation_init(seqrel, tablespaceid, dbid, relid, istemp);

    /* CDB TODO: Catch errors. */

    /* Update the sequence object. */
1728
    cdb_sequence_nextval(elm, seqrel, plast, pcached, pincrement, poverflow);
1729 1730 1731 1732

    /* Cleanup. */
    cdb_sequence_relation_term(seqrel);
}                               /* cdb_sequence_server_nextval */
1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743

/*
 * Mask a Sequence page before performing consistency checks on it.
 */
void
seq_mask(char *page, BlockNumber blkno)
{
	mask_page_lsn_and_checksum(page);

	mask_unused_space(page);
}