sequence.c 19.6 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * sequence.c
4
 *	  PostgreSQL sequences support code.
5
 *
6
 * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
7 8 9 10
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
11
 *	  $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.51 2001/03/07 21:20:26 tgl Exp $
12
 *
13 14
 *-------------------------------------------------------------------------
 */
15
#include "postgres.h"
16

17 18
#include <ctype.h>

19 20 21
#include "access/heapam.h"
#include "commands/creatinh.h"
#include "commands/sequence.h"
B
Bruce Momjian 已提交
22
#include "miscadmin.h"
23
#include "utils/acl.h"
B
Bruce Momjian 已提交
24
#include "utils/builtins.h"
25

26
#define SEQ_MAGIC	  0x1717
27 28 29 30

#define SEQ_MAXVALUE	((int4)0x7FFFFFFF)
#define SEQ_MINVALUE	-(SEQ_MAXVALUE)

V
Vadim B. Mikheev 已提交
31 32 33 34 35 36
/*
 * We don't want to log each fetching values from sequences,
 * so we pre-log a few fetches in advance. In the event of
 * crash we can lose as much as we pre-logged.
 */
#define	SEQ_LOG_VALS	32
37 38 39

typedef struct sequence_magic
{
40
	uint32		magic;
41
} sequence_magic;
42

43 44
typedef struct SeqTableData
{
45 46 47 48 49 50
	char	   *name;
	Oid			relid;
	Relation	rel;
	int4		cached;
	int4		last;
	int4		increment;
51
	struct SeqTableData *next;
52
} SeqTableData;
53 54 55 56 57

typedef SeqTableData *SeqTable;

static SeqTable seqtab = NULL;

58
static char *get_seq_name(text *seqin);
59
static SeqTable init_sequence(char *caller, char *name);
60 61
static Form_pg_sequence read_info(char *caller, SeqTable elm, Buffer *buf);
static void init_params(CreateSeqStmt *seq, Form_pg_sequence new);
62
static int	get_param(DefElem *def);
63
static void do_setval(char *seqname, int32 next, bool iscalled);
64 65

/*
B
Bruce Momjian 已提交
66
 * DefineSequence
67
 *				Creates a new sequence relation
68 69
 */
void
70
DefineSequence(CreateSeqStmt *seq)
71
{
72
	FormData_pg_sequence new;
73 74 75 76 77 78
	CreateStmt *stmt = makeNode(CreateStmt);
	ColumnDef  *coldef;
	TypeName   *typnam;
	Relation	rel;
	Buffer		buf;
	PageHeader	page;
79
	sequence_magic *sm;
80 81 82 83 84
	HeapTuple	tuple;
	TupleDesc	tupDesc;
	Datum		value[SEQ_COL_LASTCOL];
	char		null[SEQ_COL_LASTCOL];
	int			i;
85
	NameData	name;
86 87 88 89 90

	/* Check and set values */
	init_params(seq, &new);

	/*
91
	 * Create relation (and fill *null & *value)
92 93 94
	 */
	stmt->tableElts = NIL;
	for (i = SEQ_COL_FIRSTCOL; i <= SEQ_COL_LASTCOL; i++)
95
	{
96 97 98
		typnam = makeNode(TypeName);
		typnam->setof = FALSE;
		typnam->arrayBounds = NULL;
B
Bruce Momjian 已提交
99
		typnam->typmod = -1;
100 101
		coldef = makeNode(ColumnDef);
		coldef->typename = typnam;
102 103
		coldef->raw_default = NULL;
		coldef->cooked_default = NULL;
104 105 106 107 108
		coldef->is_not_null = false;
		null[i - 1] = ' ';

		switch (i)
		{
109 110 111
			case SEQ_COL_NAME:
				typnam->name = "name";
				coldef->colname = "sequence_name";
112 113
				namestrcpy(&name, seq->seqname);
				value[i - 1] = NameGetDatum(&name);
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
				break;
			case SEQ_COL_LASTVAL:
				typnam->name = "int4";
				coldef->colname = "last_value";
				value[i - 1] = Int32GetDatum(new.last_value);
				break;
			case SEQ_COL_INCBY:
				typnam->name = "int4";
				coldef->colname = "increment_by";
				value[i - 1] = Int32GetDatum(new.increment_by);
				break;
			case SEQ_COL_MAXVALUE:
				typnam->name = "int4";
				coldef->colname = "max_value";
				value[i - 1] = Int32GetDatum(new.max_value);
				break;
			case SEQ_COL_MINVALUE:
				typnam->name = "int4";
				coldef->colname = "min_value";
				value[i - 1] = Int32GetDatum(new.min_value);
				break;
			case SEQ_COL_CACHE:
				typnam->name = "int4";
				coldef->colname = "cache_value";
				value[i - 1] = Int32GetDatum(new.cache_value);
				break;
V
Vadim B. Mikheev 已提交
140 141 142 143 144
			case SEQ_COL_LOG:
				typnam->name = "int4";
				coldef->colname = "log_cnt";
				value[i - 1] = Int32GetDatum((int32)1);
				break;
145 146 147 148 149 150 151 152 153 154
			case SEQ_COL_CYCLE:
				typnam->name = "char";
				coldef->colname = "is_cycled";
				value[i - 1] = CharGetDatum(new.is_cycled);
				break;
			case SEQ_COL_CALLED:
				typnam->name = "char";
				coldef->colname = "is_called";
				value[i - 1] = CharGetDatum('f');
				break;
155 156 157 158 159 160 161 162
		}
		stmt->tableElts = lappend(stmt->tableElts, coldef);
	}

	stmt->relname = seq->seqname;
	stmt->inhRelnames = NIL;
	stmt->constraints = NIL;

163
	DefineRelation(stmt, RELKIND_SEQUENCE);
164

165
	rel = heap_openr(seq->seqname, AccessExclusiveLock);
166

167
	tupDesc = RelationGetDescr(rel);
168 169 170 171 172

	Assert(RelationGetNumberOfBlocks(rel) == 0);
	buf = ReadBuffer(rel, P_NEW);

	if (!BufferIsValid(buf))
173
		elog(ERROR, "DefineSequence: ReadBuffer failed");
174 175 176 177 178 179 180 181 182 183 184 185

	page = (PageHeader) BufferGetPage(buf);

	PageInit((Page) page, BufferGetPageSize(buf), sizeof(sequence_magic));
	sm = (sequence_magic *) PageGetSpecialPointer(page);
	sm->magic = SEQ_MAGIC;

	/* Now - form & insert sequence tuple */
	tuple = heap_formtuple(tupDesc, value, null);
	heap_insert(rel, tuple);

	if (WriteBuffer(buf) == STATUS_ERROR)
186
		elog(ERROR, "DefineSequence: WriteBuffer failed");
187

188
	heap_close(rel, AccessExclusiveLock);
189 190 191
}


192 193
Datum
nextval(PG_FUNCTION_ARGS)
194
{
195 196
	text	   *seqin = PG_GETARG_TEXT_P(0);
	char	   *seqname = get_seq_name(seqin);
197 198
	SeqTable	elm;
	Buffer		buf;
199
	Form_pg_sequence seq;
200
	int32		incby,
201 202
				maxv,
				minv,
V
Vadim B. Mikheev 已提交
203 204 205 206
				cache,
				log,
				fetch,
				last;
207
	int32		result,
208 209
				next,
				rescnt = 0;
V
Vadim B. Mikheev 已提交
210
	bool		logit = false;
211

212
	if (pg_aclcheck(seqname, GetUserId(), ACL_WR) != ACLCHECK_OK)
213 214 215
		elog(ERROR, "%s.nextval: you don't have permissions to set sequence %s",
			 seqname, seqname);

V
Vadim B. Mikheev 已提交
216
	/* open and AccessShareLock sequence */
217
	elm = init_sequence("nextval", seqname);
218

219 220 221 222 223
	pfree(seqname);

	if (elm->last != elm->cached)		/* some numbers were cached */
	{
		elm->last += elm->increment;
224
		PG_RETURN_INT32(elm->last);
225
	}
226

B
Bruce Momjian 已提交
227 228
	seq = read_info("nextval", elm, &buf);		/* lock page' buffer and
												 * read tuple */
229

V
Vadim B. Mikheev 已提交
230
	last = next = result = seq->last_value;
231 232 233
	incby = seq->increment_by;
	maxv = seq->max_value;
	minv = seq->min_value;
V
Vadim B. Mikheev 已提交
234 235
	fetch = cache = seq->cache_value;
	log = seq->log_cnt;
236 237

	if (seq->is_called != 't')
V
Vadim B. Mikheev 已提交
238
	{
239
		rescnt++;				/* last_value if not called */
V
Vadim B. Mikheev 已提交
240 241 242
		fetch--;
		log--;
	}
243

V
Vadim B. Mikheev 已提交
244 245 246 247 248 249 250
	if (log < fetch)
	{
		fetch = log = fetch - log + SEQ_LOG_VALS;
		logit = true;
	}

	while (fetch)		/* try to fetch cache [+ log ] numbers */
251
	{
252 253 254 255 256

		/*
		 * Check MAXVALUE for ascending sequences and MINVALUE for
		 * descending sequences
		 */
257
		if (incby > 0)
258
		{
259
			/* ascending sequence */
260 261 262 263
			if ((maxv >= 0 && next > maxv - incby) ||
				(maxv < 0 && next + incby > maxv))
			{
				if (rescnt > 0)
V
Vadim B. Mikheev 已提交
264
					break;		/* stop fetching */
265
				if (seq->is_cycled != 't')
266
					elog(ERROR, "%s.nextval: reached MAXVALUE (%d)",
267 268 269 270 271 272 273 274
						 elm->name, maxv);
				next = minv;
			}
			else
				next += incby;
		}
		else
		{
275
			/* descending sequence */
276 277 278 279
			if ((minv < 0 && next < minv - incby) ||
				(minv >= 0 && next + incby < minv))
			{
				if (rescnt > 0)
V
Vadim B. Mikheev 已提交
280
					break;		/* stop fetching */
281
				if (seq->is_cycled != 't')
282
					elog(ERROR, "%s.nextval: reached MINVALUE (%d)",
283 284 285 286 287 288
						 elm->name, minv);
				next = maxv;
			}
			else
				next += incby;
		}
V
Vadim B. Mikheev 已提交
289 290 291 292 293 294 295 296 297
		fetch--;
		if (rescnt < cache)
		{
			log--;
			rescnt++;
			last = next;
			if (rescnt == 1)		/* if it's first result - */
				result = next;		/* it's what to return */
		}
298 299 300 301
	}

	/* save info in local cache */
	elm->last = result;			/* last returned number */
V
Vadim B. Mikheev 已提交
302 303
	elm->cached = last;			/* last fetched number */

304
	START_CRIT_SECTION();
V
Vadim B. Mikheev 已提交
305 306 307 308
	if (logit)
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
309 310
		XLogRecData	rdata[2];
		Page		page = BufferGetPage(buf);
V
Vadim B. Mikheev 已提交
311 312

		xlrec.node = elm->rel->rd_node;
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
		rdata[0].buffer = InvalidBuffer;
		rdata[0].data = (char*)&xlrec;
		rdata[0].len = sizeof(xl_seq_rec);
		rdata[0].next = &(rdata[1]);

		seq->last_value = next;
		seq->is_called = 't';
		seq->log_cnt = 0;
		rdata[1].buffer = InvalidBuffer;
		rdata[1].data = (char*)page + ((PageHeader) page)->pd_upper;
		rdata[1].len = ((PageHeader)page)->pd_special - 
						((PageHeader)page)->pd_upper;
		rdata[1].next = NULL;

		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG|XLOG_NO_TRAN, rdata);
V
Vadim B. Mikheev 已提交
328

329 330
		PageSetLSN(page, recptr);
		PageSetSUI(page, ThisStartUpID);
V
Vadim B. Mikheev 已提交
331

332 333
		if (fetch)		/* not all numbers were fetched */
			log -= fetch;
V
Vadim B. Mikheev 已提交
334
	}
335

336
	/* update on-disk data */
V
Vadim B. Mikheev 已提交
337
	seq->last_value = last;		/* last fetched number */
338
	seq->is_called = 't';
V
Vadim B. Mikheev 已提交
339 340
	Assert(log >= 0);
	seq->log_cnt = log;			/* how much is logged */
341
	END_CRIT_SECTION();
342

V
Vadim B. Mikheev 已提交
343 344
	LockBuffer(buf, BUFFER_LOCK_UNLOCK);

345
	if (WriteBuffer(buf) == STATUS_ERROR)
346
		elog(ERROR, "%s.nextval: WriteBuffer failed", elm->name);
347

348
	PG_RETURN_INT32(result);
349 350
}

351 352
Datum
currval(PG_FUNCTION_ARGS)
353
{
354 355
	text	   *seqin = PG_GETARG_TEXT_P(0);
	char	   *seqname = get_seq_name(seqin);
356
	SeqTable	elm;
357 358
	int32		result;

359
	if (pg_aclcheck(seqname, GetUserId(), ACL_RD) != ACLCHECK_OK)
360 361
		elog(ERROR, "%s.currval: you don't have permissions to read sequence %s",
			 seqname, seqname);
362

V
Vadim B. Mikheev 已提交
363
	/* open and AccessShareLock sequence */
364 365 366
	elm = init_sequence("currval", seqname);

	if (elm->increment == 0)	/* nextval/read_info were not called */
367 368
		elog(ERROR, "%s.currval is not yet defined in this session",
			 seqname);
369 370 371

	result = elm->last;

372
	pfree(seqname);
373

374
	PG_RETURN_INT32(result);
375 376
}

377 378 379 380 381 382 383 384 385 386 387 388 389
/* 
 * Main internal procedure that handles 2 & 3 arg forms of SETVAL.
 *
 * Note that the 3 arg version (which sets the is_called flag) is
 * only for use in pg_dump, and setting the is_called flag may not
 * work if multiple users are attached to the database and referencing 
 * the sequence (unlikely if pg_dump is restoring it).
 *
 * It is necessary to have the 3 arg version so that pg_dump can 
 * restore the state of a sequence exactly during data-only restores -
 * it is the only way to clear the is_called flag in an existing
 * sequence.
 */
B
Bruce Momjian 已提交
390
static void
391
do_setval(char *seqname, int32 next, bool iscalled)
M
 
Marc G. Fournier 已提交
392 393
{
	SeqTable	elm;
394
	Buffer		buf;
395
	Form_pg_sequence seq;
M
 
Marc G. Fournier 已提交
396

397
	if (pg_aclcheck(seqname, GetUserId(), ACL_WR) != ACLCHECK_OK)
M
 
Marc G. Fournier 已提交
398 399 400
		elog(ERROR, "%s.setval: you don't have permissions to set sequence %s",
			 seqname, seqname);

V
Vadim B. Mikheev 已提交
401
	/* open and AccessShareLock sequence */
402
	elm = init_sequence("setval", seqname);
B
Bruce Momjian 已提交
403 404
	seq = read_info("setval", elm, &buf);		/* lock page' buffer and
												 * read tuple */
M
 
Marc G. Fournier 已提交
405

406
	if ((next < seq->min_value) || (next > seq->max_value))
407
		elog(ERROR, "%s.setval: value %d is out of bounds (%d,%d)",
408
			 seqname, next, seq->min_value, seq->max_value);
M
 
Marc G. Fournier 已提交
409 410 411

	/* save info in local cache */
	elm->last = next;			/* last returned number */
412
	elm->cached = next;			/* last cached number (forget cached values) */
M
 
Marc G. Fournier 已提交
413

414
	START_CRIT_SECTION();
V
Vadim B. Mikheev 已提交
415 416 417
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
418 419
		XLogRecData	rdata[2];
		Page		page = BufferGetPage(buf);
V
Vadim B. Mikheev 已提交
420 421

		xlrec.node = elm->rel->rd_node;
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439
		rdata[0].buffer = InvalidBuffer;
		rdata[0].data = (char*)&xlrec;
		rdata[0].len = sizeof(xl_seq_rec);
		rdata[0].next = &(rdata[1]);

		seq->last_value = next;
		seq->is_called = 't';
		seq->log_cnt = 0;
		rdata[1].buffer = InvalidBuffer;
		rdata[1].data = (char*)page + ((PageHeader) page)->pd_upper;
		rdata[1].len = ((PageHeader)page)->pd_special - 
						((PageHeader)page)->pd_upper;
		rdata[1].next = NULL;

		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG|XLOG_NO_TRAN, rdata);

		PageSetLSN(page, recptr);
		PageSetSUI(page, ThisStartUpID);
V
Vadim B. Mikheev 已提交
440
	}
441 442 443 444
	/* save info in sequence relation */
	seq->last_value = next;		/* last fetched number */
	seq->is_called = iscalled ? 't' : 'f';
	seq->log_cnt = (iscalled) ? 0 : 1;
445
	END_CRIT_SECTION();
M
 
Marc G. Fournier 已提交
446

V
Vadim B. Mikheev 已提交
447 448
	LockBuffer(buf, BUFFER_LOCK_UNLOCK);

449
	if (WriteBuffer(buf) == STATUS_ERROR)
450 451 452 453
		elog(ERROR, "%s.setval: WriteBuffer failed", seqname);

	pfree(seqname);

454 455
}

456 457 458 459
/*
 * Implement the 2 arg setval procedure.
 * See do_setval for discussion.
 */
460 461 462 463 464 465 466 467 468 469 470 471
Datum
setval(PG_FUNCTION_ARGS)
{
	text	   *seqin = PG_GETARG_TEXT_P(0);
	int32		next = PG_GETARG_INT32(1);
	char	   *seqname = get_seq_name(seqin);

	do_setval(seqname, next, true);

	PG_RETURN_INT32(next);
}

472 473 474 475
/*
 * Implement the 3 arg setval procedure.
 * See do_setval for discussion.
 */
476 477 478 479 480 481 482 483 484 485
Datum
setval_and_iscalled(PG_FUNCTION_ARGS)
{
	text	   *seqin = PG_GETARG_TEXT_P(0);
	int32		next = PG_GETARG_INT32(1);
	bool		iscalled = PG_GETARG_BOOL(2);
	char	   *seqname = get_seq_name(seqin);

	do_setval(seqname, next, iscalled);

486 487 488 489 490 491 492 493 494 495 496 497
	PG_RETURN_INT32(next);
}

/*
 * Given a 'text' parameter to a sequence function, extract the actual
 * sequence name.  We downcase the name if it's not double-quoted.
 *
 * This is a kluge, really --- should be able to write nextval(seqrel).
 */
static char *
get_seq_name(text *seqin)
{
498 499
	char	   *rawname = DatumGetCString(DirectFunctionCall1(textout,
													PointerGetDatum(seqin)));
500 501
	int			rawlen = strlen(rawname);
	char	   *seqname;
M
 
Marc G. Fournier 已提交
502

503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519
	if (rawlen >= 2 &&
		rawname[0] == '\"' && rawname[rawlen - 1] == '\"')
	{
		/* strip off quotes, keep case */
		rawname[rawlen - 1] = '\0';
		seqname = pstrdup(rawname + 1);
		pfree(rawname);
	}
	else
	{
		seqname = rawname;
		/*
		 * It's important that this match the identifier downcasing code
		 * used by backend/parser/scan.l.
		 */
		for (; *rawname; rawname++)
		{
520 521
			if (isupper((unsigned char) *rawname))
				*rawname = tolower((unsigned char) *rawname);
522 523 524
		}
	}
	return seqname;
M
 
Marc G. Fournier 已提交
525 526
}

527
static Form_pg_sequence
B
Bruce Momjian 已提交
528
read_info(char *caller, SeqTable elm, Buffer *buf)
529
{
B
Bruce Momjian 已提交
530 531 532
	PageHeader	page;
	ItemId		lp;
	HeapTupleData tuple;
533
	sequence_magic *sm;
B
Bruce Momjian 已提交
534
	Form_pg_sequence seq;
535 536

	if (RelationGetNumberOfBlocks(elm->rel) != 1)
537
		elog(ERROR, "%s.%s: invalid number of blocks in sequence",
538 539 540 541
			 elm->name, caller);

	*buf = ReadBuffer(elm->rel, 0);
	if (!BufferIsValid(*buf))
542
		elog(ERROR, "%s.%s: ReadBuffer failed", elm->name, caller);
543

V
Vadim B. Mikheev 已提交
544 545
	LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);

546 547 548 549
	page = (PageHeader) BufferGetPage(*buf);
	sm = (sequence_magic *) PageGetSpecialPointer(page);

	if (sm->magic != SEQ_MAGIC)
550
		elog(ERROR, "%s.%s: bad magic (%08X)", elm->name, caller, sm->magic);
551 552 553

	lp = PageGetItemId(page, FirstOffsetNumber);
	Assert(ItemIdIsUsed(lp));
554
	tuple.t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
555

556
	seq = (Form_pg_sequence) GETSTRUCT(&tuple);
557 558 559

	elm->increment = seq->increment_by;

560
	return seq;
561 562 563 564

}


565
static SeqTable
566
init_sequence(char *caller, char *name)
567
{
568
	SeqTable	elm,
569 570
				prev = (SeqTable) NULL;
	Relation	seqrel;
571

572 573
	/* Look to see if we already have a seqtable entry for name */
	for (elm = seqtab; elm != (SeqTable) NULL; elm = elm->next)
574 575 576
	{
		if (strcmp(elm->name, name) == 0)
			break;
577
		prev = elm;
578 579
	}

580 581 582
	/* If so, and if it's already been opened in this xact, just return it */
	if (elm != (SeqTable) NULL && elm->rel != (Relation) NULL)
		return elm;
583

584 585 586
	/* Else open and check it */
	seqrel = heap_openr(name, AccessShareLock);
	if (seqrel->rd_rel->relkind != RELKIND_SEQUENCE)
587
		elog(ERROR, "%s.%s: %s is not a sequence", name, caller, name);
588

589 590
	if (elm != (SeqTable) NULL)
	{
591 592 593

		/*
		 * We are using a seqtable entry left over from a previous xact;
594 595 596 597
		 * must check for relid change.
		 */
		elm->rel = seqrel;
		if (RelationGetRelid(seqrel) != elm->relid)
598 599
		{
			elog(NOTICE, "%s.%s: sequence was re-created",
600
				 name, caller);
601
			elm->relid = RelationGetRelid(seqrel);
602 603 604 605 606
			elm->cached = elm->last = elm->increment = 0;
		}
	}
	else
	{
607 608 609

		/*
		 * Time to make a new seqtable entry.  These entries live as long
610 611 612 613 614 615 616 617 618 619
		 * as the backend does, so we use plain malloc for them.
		 */
		elm = (SeqTable) malloc(sizeof(SeqTableData));
		elm->name = malloc(strlen(name) + 1);
		strcpy(elm->name, name);
		elm->rel = seqrel;
		elm->relid = RelationGetRelid(seqrel);
		elm->cached = elm->last = elm->increment = 0;
		elm->next = (SeqTable) NULL;

620 621 622
		if (seqtab == (SeqTable) NULL)
			seqtab = elm;
		else
623
			prev->next = elm;
624 625
	}

626
	return elm;
627 628 629 630
}


/*
B
Bruce Momjian 已提交
631
 * CloseSequences
632
 *				is calling by xact mgr at commit/abort.
633 634
 */
void
635
CloseSequences(void)
636
{
637 638
	SeqTable	elm;
	Relation	rel;
639

640
	for (elm = seqtab; elm != (SeqTable) NULL; elm = elm->next)
641
	{
642 643 644 645
		if (elm->rel != (Relation) NULL)		/* opened in current xact */
		{
			rel = elm->rel;
			elm->rel = (Relation) NULL;
646
			heap_close(rel, AccessShareLock);
647 648
		}
	}
649 650 651
}


652
static void
653
init_params(CreateSeqStmt *seq, Form_pg_sequence new)
654
{
655 656 657 658 659 660
	DefElem    *last_value = NULL;
	DefElem    *increment_by = NULL;
	DefElem    *max_value = NULL;
	DefElem    *min_value = NULL;
	DefElem    *cache_value = NULL;
	List	   *option;
661 662 663 664

	new->is_cycled = 'f';
	foreach(option, seq->options)
	{
665
		DefElem    *defel = (DefElem *) lfirst(option);
666 667 668 669 670 671 672 673 674 675 676 677 678 679

		if (!strcasecmp(defel->defname, "increment"))
			increment_by = defel;
		else if (!strcasecmp(defel->defname, "start"))
			last_value = defel;
		else if (!strcasecmp(defel->defname, "maxvalue"))
			max_value = defel;
		else if (!strcasecmp(defel->defname, "minvalue"))
			min_value = defel;
		else if (!strcasecmp(defel->defname, "cache"))
			cache_value = defel;
		else if (!strcasecmp(defel->defname, "cycle"))
		{
			if (defel->arg != (Node *) NULL)
680
				elog(ERROR, "DefineSequence: CYCLE ??");
681 682 683
			new->is_cycled = 't';
		}
		else
684
			elog(ERROR, "DefineSequence: option \"%s\" not recognized",
685 686 687 688 689 690
				 defel->defname);
	}

	if (increment_by == (DefElem *) NULL)		/* INCREMENT BY */
		new->increment_by = 1;
	else if ((new->increment_by = get_param(increment_by)) == 0)
691
		elog(ERROR, "DefineSequence: can't INCREMENT by 0");
692 693

	if (max_value == (DefElem *) NULL)	/* MAXVALUE */
694
	{
695 696 697 698
		if (new->increment_by > 0)
			new->max_value = SEQ_MAXVALUE;		/* ascending seq */
		else
			new->max_value = -1;/* descending seq */
699
	}
700
	else
701
		new->max_value = get_param(max_value);
702

703
	if (min_value == (DefElem *) NULL)	/* MINVALUE */
704
	{
705 706 707 708
		if (new->increment_by > 0)
			new->min_value = 1; /* ascending seq */
		else
			new->min_value = SEQ_MINVALUE;		/* descending seq */
709
	}
710
	else
711 712 713
		new->min_value = get_param(min_value);

	if (new->min_value >= new->max_value)
714
		elog(ERROR, "DefineSequence: MINVALUE (%d) can't be >= MAXVALUE (%d)",
715 716 717
			 new->min_value, new->max_value);

	if (last_value == (DefElem *) NULL) /* START WITH */
718
	{
719 720 721 722
		if (new->increment_by > 0)
			new->last_value = new->min_value;	/* ascending seq */
		else
			new->last_value = new->max_value;	/* descending seq */
723
	}
724
	else
725 726 727
		new->last_value = get_param(last_value);

	if (new->last_value < new->min_value)
728
		elog(ERROR, "DefineSequence: START value (%d) can't be < MINVALUE (%d)",
729 730
			 new->last_value, new->min_value);
	if (new->last_value > new->max_value)
731
		elog(ERROR, "DefineSequence: START value (%d) can't be > MAXVALUE (%d)",
732 733 734 735 736
			 new->last_value, new->max_value);

	if (cache_value == (DefElem *) NULL)		/* CACHE */
		new->cache_value = 1;
	else if ((new->cache_value = get_param(cache_value)) <= 0)
737
		elog(ERROR, "DefineSequence: CACHE (%d) can't be <= 0",
738
			 new->cache_value);
739 740 741 742

}

static int
743
get_param(DefElem *def)
744
{
745
	if (def->arg == (Node *) NULL)
746
		elog(ERROR, "DefineSequence: \"%s\" value unspecified", def->defname);
747 748

	if (nodeTag(def->arg) == T_Integer)
749
		return intVal(def->arg);
750

751
	elog(ERROR, "DefineSequence: \"%s\" value must be integer", def->defname);
752
	return -1;
753
}
V
Vadim B. Mikheev 已提交
754 755 756

void seq_redo(XLogRecPtr lsn, XLogRecord *record)
{
757 758 759 760 761 762 763 764
	uint8			info = record->xl_info & ~XLR_INFO_MASK;
	Relation		reln;
	Buffer			buffer;
	Page			page;
	char		   *item;
	Size			itemsz;
	xl_seq_rec	   *xlrec = (xl_seq_rec*) XLogRecGetData(record);
	sequence_magic *sm;
V
Vadim B. Mikheev 已提交
765

766 767
	if (info != XLOG_SEQ_LOG)
		elog(STOP, "seq_redo: unknown op code %u", info);
V
Vadim B. Mikheev 已提交
768 769 770 771 772

	reln = XLogOpenRelation(true, RM_SEQ_ID, xlrec->node);
	if (!RelationIsValid(reln))
		return;

773
	buffer = XLogReadBuffer(true, reln, 0);
V
Vadim B. Mikheev 已提交
774 775 776 777 778 779
	if (!BufferIsValid(buffer))
		elog(STOP, "seq_redo: can't read block of %u/%u", 
			xlrec->node.tblNode, xlrec->node.relNode);

	page = (Page) BufferGetPage(buffer);

780 781 782
	PageInit((Page) page, BufferGetPageSize(buffer), sizeof(sequence_magic));
	sm = (sequence_magic *) PageGetSpecialPointer(page);
	sm->magic = SEQ_MAGIC;
V
Vadim B. Mikheev 已提交
783

784 785 786 787
	item = (char*)xlrec + sizeof(xl_seq_rec);
	itemsz = record->xl_len - sizeof(xl_seq_rec);
	itemsz = MAXALIGN(itemsz);
	if (PageAddItem(page, (Item)item, itemsz, 
788 789
					FirstOffsetNumber, LP_USED) == InvalidOffsetNumber)
		elog(STOP, "seq_redo: failed to add item to page");
V
Vadim B. Mikheev 已提交
790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814

	PageSetLSN(page, lsn);
	PageSetSUI(page, ThisStartUpID);
	UnlockAndWriteBuffer(buffer);

	return;
}

void seq_undo(XLogRecPtr lsn, XLogRecord *record)
{
}

void seq_desc(char *buf, uint8 xl_info, char* rec)
{
	uint8				info = xl_info & ~XLR_INFO_MASK;
	xl_seq_rec		   *xlrec = (xl_seq_rec*) rec;

	if (info == XLOG_SEQ_LOG)
		strcat(buf, "log: ");
	else
	{
		strcat(buf, "UNKNOWN");
		return;
	}

815 816
	sprintf(buf + strlen(buf), "node %u/%u",
		xlrec->node.tblNode, xlrec->node.relNode);
V
Vadim B. Mikheev 已提交
817
}