sequence.c 18.8 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * sequence.c
4
 *	  PostgreSQL sequences support code.
5
 *
6 7 8 9 10
 * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
11
 *	  $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.47 2000/12/28 13:00:17 vadim Exp $
12
 *
13 14
 *-------------------------------------------------------------------------
 */
15
#include "postgres.h"
16

17 18
#include <ctype.h>

19 20 21
#include "access/heapam.h"
#include "commands/creatinh.h"
#include "commands/sequence.h"
B
Bruce Momjian 已提交
22
#include "miscadmin.h"
23
#include "utils/acl.h"
B
Bruce Momjian 已提交
24
#include "utils/builtins.h"
25

26
#define SEQ_MAGIC	  0x1717
27 28 29 30

#define SEQ_MAXVALUE	((int4)0x7FFFFFFF)
#define SEQ_MINVALUE	-(SEQ_MAXVALUE)

V
Vadim B. Mikheev 已提交
31 32 33 34 35 36
/*
 * We don't want to log each fetching values from sequences,
 * so we pre-log a few fetches in advance. In the event of
 * crash we can lose as much as we pre-logged.
 */
#define	SEQ_LOG_VALS	32
37 38 39

typedef struct sequence_magic
{
40
	uint32		magic;
41
} sequence_magic;
42

43 44
typedef struct SeqTableData
{
45 46 47 48 49 50
	char	   *name;
	Oid			relid;
	Relation	rel;
	int4		cached;
	int4		last;
	int4		increment;
51
	struct SeqTableData *next;
52
} SeqTableData;
53 54 55 56 57

typedef SeqTableData *SeqTable;

static SeqTable seqtab = NULL;

58
static char *get_seq_name(text *seqin);
59
static SeqTable init_sequence(char *caller, char *name);
60 61
static Form_pg_sequence read_info(char *caller, SeqTable elm, Buffer *buf);
static void init_params(CreateSeqStmt *seq, Form_pg_sequence new);
62
static int	get_param(DefElem *def);
63
static void do_setval(char *seqname, int32 next, bool iscalled);
64 65

/*
B
Bruce Momjian 已提交
66
 * DefineSequence
67
 *				Creates a new sequence relation
68 69
 */
void
70
DefineSequence(CreateSeqStmt *seq)
71
{
72
	FormData_pg_sequence new;
73 74 75 76 77 78
	CreateStmt *stmt = makeNode(CreateStmt);
	ColumnDef  *coldef;
	TypeName   *typnam;
	Relation	rel;
	Buffer		buf;
	PageHeader	page;
79
	sequence_magic *sm;
80 81 82 83 84
	HeapTuple	tuple;
	TupleDesc	tupDesc;
	Datum		value[SEQ_COL_LASTCOL];
	char		null[SEQ_COL_LASTCOL];
	int			i;
85
	NameData	name;
86 87 88 89 90

	/* Check and set values */
	init_params(seq, &new);

	/*
91
	 * Create relation (and fill *null & *value)
92 93 94
	 */
	stmt->tableElts = NIL;
	for (i = SEQ_COL_FIRSTCOL; i <= SEQ_COL_LASTCOL; i++)
95
	{
96 97 98
		typnam = makeNode(TypeName);
		typnam->setof = FALSE;
		typnam->arrayBounds = NULL;
B
Bruce Momjian 已提交
99
		typnam->typmod = -1;
100 101
		coldef = makeNode(ColumnDef);
		coldef->typename = typnam;
102 103
		coldef->raw_default = NULL;
		coldef->cooked_default = NULL;
104 105 106 107 108
		coldef->is_not_null = false;
		null[i - 1] = ' ';

		switch (i)
		{
109 110 111
			case SEQ_COL_NAME:
				typnam->name = "name";
				coldef->colname = "sequence_name";
112 113
				namestrcpy(&name, seq->seqname);
				value[i - 1] = NameGetDatum(&name);
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
				break;
			case SEQ_COL_LASTVAL:
				typnam->name = "int4";
				coldef->colname = "last_value";
				value[i - 1] = Int32GetDatum(new.last_value);
				break;
			case SEQ_COL_INCBY:
				typnam->name = "int4";
				coldef->colname = "increment_by";
				value[i - 1] = Int32GetDatum(new.increment_by);
				break;
			case SEQ_COL_MAXVALUE:
				typnam->name = "int4";
				coldef->colname = "max_value";
				value[i - 1] = Int32GetDatum(new.max_value);
				break;
			case SEQ_COL_MINVALUE:
				typnam->name = "int4";
				coldef->colname = "min_value";
				value[i - 1] = Int32GetDatum(new.min_value);
				break;
			case SEQ_COL_CACHE:
				typnam->name = "int4";
				coldef->colname = "cache_value";
				value[i - 1] = Int32GetDatum(new.cache_value);
				break;
V
Vadim B. Mikheev 已提交
140 141 142 143 144
			case SEQ_COL_LOG:
				typnam->name = "int4";
				coldef->colname = "log_cnt";
				value[i - 1] = Int32GetDatum((int32)1);
				break;
145 146 147 148 149 150 151 152 153 154
			case SEQ_COL_CYCLE:
				typnam->name = "char";
				coldef->colname = "is_cycled";
				value[i - 1] = CharGetDatum(new.is_cycled);
				break;
			case SEQ_COL_CALLED:
				typnam->name = "char";
				coldef->colname = "is_called";
				value[i - 1] = CharGetDatum('f');
				break;
155 156 157 158 159 160 161 162
		}
		stmt->tableElts = lappend(stmt->tableElts, coldef);
	}

	stmt->relname = seq->seqname;
	stmt->inhRelnames = NIL;
	stmt->constraints = NIL;

163
	DefineRelation(stmt, RELKIND_SEQUENCE);
164

165
	rel = heap_openr(seq->seqname, AccessExclusiveLock);
166

167
	tupDesc = RelationGetDescr(rel);
168 169 170 171 172

	Assert(RelationGetNumberOfBlocks(rel) == 0);
	buf = ReadBuffer(rel, P_NEW);

	if (!BufferIsValid(buf))
173
		elog(ERROR, "DefineSequence: ReadBuffer failed");
174 175 176 177 178 179 180 181 182 183 184 185

	page = (PageHeader) BufferGetPage(buf);

	PageInit((Page) page, BufferGetPageSize(buf), sizeof(sequence_magic));
	sm = (sequence_magic *) PageGetSpecialPointer(page);
	sm->magic = SEQ_MAGIC;

	/* Now - form & insert sequence tuple */
	tuple = heap_formtuple(tupDesc, value, null);
	heap_insert(rel, tuple);

	if (WriteBuffer(buf) == STATUS_ERROR)
186
		elog(ERROR, "DefineSequence: WriteBuffer failed");
187

188
	heap_close(rel, AccessExclusiveLock);
189 190 191
}


192 193
Datum
nextval(PG_FUNCTION_ARGS)
194
{
195 196
	text	   *seqin = PG_GETARG_TEXT_P(0);
	char	   *seqname = get_seq_name(seqin);
197 198
	SeqTable	elm;
	Buffer		buf;
199
	Form_pg_sequence seq;
200
	int32		incby,
201 202
				maxv,
				minv,
V
Vadim B. Mikheev 已提交
203 204 205 206
				cache,
				log,
				fetch,
				last;
207
	int32		result,
208 209
				next,
				rescnt = 0;
V
Vadim B. Mikheev 已提交
210
	bool		logit = false;
211

212
	if (pg_aclcheck(seqname, GetUserId(), ACL_WR) != ACLCHECK_OK)
213 214 215
		elog(ERROR, "%s.nextval: you don't have permissions to set sequence %s",
			 seqname, seqname);

V
Vadim B. Mikheev 已提交
216
	/* open and AccessShareLock sequence */
217
	elm = init_sequence("nextval", seqname);
218

219 220 221 222 223
	pfree(seqname);

	if (elm->last != elm->cached)		/* some numbers were cached */
	{
		elm->last += elm->increment;
224
		PG_RETURN_INT32(elm->last);
225
	}
226

B
Bruce Momjian 已提交
227 228
	seq = read_info("nextval", elm, &buf);		/* lock page' buffer and
												 * read tuple */
229

V
Vadim B. Mikheev 已提交
230
	last = next = result = seq->last_value;
231 232 233
	incby = seq->increment_by;
	maxv = seq->max_value;
	minv = seq->min_value;
V
Vadim B. Mikheev 已提交
234 235
	fetch = cache = seq->cache_value;
	log = seq->log_cnt;
236 237

	if (seq->is_called != 't')
V
Vadim B. Mikheev 已提交
238
	{
239
		rescnt++;				/* last_value if not called */
V
Vadim B. Mikheev 已提交
240 241 242
		fetch--;
		log--;
	}
243

V
Vadim B. Mikheev 已提交
244 245 246 247 248 249 250
	if (log < fetch)
	{
		fetch = log = fetch - log + SEQ_LOG_VALS;
		logit = true;
	}

	while (fetch)		/* try to fetch cache [+ log ] numbers */
251
	{
252 253 254 255 256

		/*
		 * Check MAXVALUE for ascending sequences and MINVALUE for
		 * descending sequences
		 */
257
		if (incby > 0)
258
		{
259
			/* ascending sequence */
260 261 262 263
			if ((maxv >= 0 && next > maxv - incby) ||
				(maxv < 0 && next + incby > maxv))
			{
				if (rescnt > 0)
V
Vadim B. Mikheev 已提交
264
					break;		/* stop fetching */
265
				if (seq->is_cycled != 't')
266
					elog(ERROR, "%s.nextval: reached MAXVALUE (%d)",
267 268 269 270 271 272 273 274
						 elm->name, maxv);
				next = minv;
			}
			else
				next += incby;
		}
		else
		{
275
			/* descending sequence */
276 277 278 279
			if ((minv < 0 && next < minv - incby) ||
				(minv >= 0 && next + incby < minv))
			{
				if (rescnt > 0)
V
Vadim B. Mikheev 已提交
280
					break;		/* stop fetching */
281
				if (seq->is_cycled != 't')
282
					elog(ERROR, "%s.nextval: reached MINVALUE (%d)",
283 284 285 286 287 288
						 elm->name, minv);
				next = maxv;
			}
			else
				next += incby;
		}
V
Vadim B. Mikheev 已提交
289 290 291 292 293 294 295 296 297
		fetch--;
		if (rescnt < cache)
		{
			log--;
			rescnt++;
			last = next;
			if (rescnt == 1)		/* if it's first result - */
				result = next;		/* it's what to return */
		}
298 299 300 301
	}

	/* save info in local cache */
	elm->last = result;			/* last returned number */
V
Vadim B. Mikheev 已提交
302 303
	elm->cached = last;			/* last fetched number */

304
	START_CRIT_CODE;
V
Vadim B. Mikheev 已提交
305 306 307 308
	if (logit)
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
309 310
		XLogRecData	rdata[2];
		Page		page = BufferGetPage(buf);
V
Vadim B. Mikheev 已提交
311 312

		xlrec.node = elm->rel->rd_node;
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
		rdata[0].buffer = InvalidBuffer;
		rdata[0].data = (char*)&xlrec;
		rdata[0].len = sizeof(xl_seq_rec);
		rdata[0].next = &(rdata[1]);

		seq->last_value = next;
		seq->is_called = 't';
		seq->log_cnt = 0;
		rdata[1].buffer = InvalidBuffer;
		rdata[1].data = (char*)page + ((PageHeader) page)->pd_upper;
		rdata[1].len = ((PageHeader)page)->pd_special - 
						((PageHeader)page)->pd_upper;
		rdata[1].next = NULL;

		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG|XLOG_NO_TRAN, rdata);
V
Vadim B. Mikheev 已提交
328

329 330
		PageSetLSN(page, recptr);
		PageSetSUI(page, ThisStartUpID);
V
Vadim B. Mikheev 已提交
331

332 333
		if (fetch)		/* not all numbers were fetched */
			log -= fetch;
V
Vadim B. Mikheev 已提交
334
	}
335

336
	/* update on-disk data */
V
Vadim B. Mikheev 已提交
337
	seq->last_value = last;		/* last fetched number */
338
	seq->is_called = 't';
V
Vadim B. Mikheev 已提交
339 340
	Assert(log >= 0);
	seq->log_cnt = log;			/* how much is logged */
341
	END_CRIT_CODE;
342

V
Vadim B. Mikheev 已提交
343 344
	LockBuffer(buf, BUFFER_LOCK_UNLOCK);

345
	if (WriteBuffer(buf) == STATUS_ERROR)
346
		elog(ERROR, "%s.nextval: WriteBuffer failed", elm->name);
347

348
	PG_RETURN_INT32(result);
349 350
}

351 352
Datum
currval(PG_FUNCTION_ARGS)
353
{
354 355
	text	   *seqin = PG_GETARG_TEXT_P(0);
	char	   *seqname = get_seq_name(seqin);
356
	SeqTable	elm;
357 358
	int32		result;

359
	if (pg_aclcheck(seqname, GetUserId(), ACL_RD) != ACLCHECK_OK)
360 361
		elog(ERROR, "%s.currval: you don't have permissions to read sequence %s",
			 seqname, seqname);
362

V
Vadim B. Mikheev 已提交
363
	/* open and AccessShareLock sequence */
364 365 366
	elm = init_sequence("currval", seqname);

	if (elm->increment == 0)	/* nextval/read_info were not called */
367 368
		elog(ERROR, "%s.currval is not yet defined in this session",
			 seqname);
369 370 371

	result = elm->last;

372
	pfree(seqname);
373

374
	PG_RETURN_INT32(result);
375 376
}

B
Bruce Momjian 已提交
377
static void
378
do_setval(char *seqname, int32 next, bool iscalled)
M
 
Marc G. Fournier 已提交
379 380
{
	SeqTable	elm;
381
	Buffer		buf;
382
	Form_pg_sequence seq;
M
 
Marc G. Fournier 已提交
383

384
	if (pg_aclcheck(seqname, GetUserId(), ACL_WR) != ACLCHECK_OK)
M
 
Marc G. Fournier 已提交
385 386 387
		elog(ERROR, "%s.setval: you don't have permissions to set sequence %s",
			 seqname, seqname);

V
Vadim B. Mikheev 已提交
388
	/* open and AccessShareLock sequence */
389
	elm = init_sequence("setval", seqname);
B
Bruce Momjian 已提交
390 391
	seq = read_info("setval", elm, &buf);		/* lock page' buffer and
												 * read tuple */
M
 
Marc G. Fournier 已提交
392

393
	if ((next < seq->min_value) || (next > seq->max_value))
394
		elog(ERROR, "%s.setval: value %d is out of bounds (%d,%d)",
395
			 seqname, next, seq->min_value, seq->max_value);
M
 
Marc G. Fournier 已提交
396 397 398

	/* save info in local cache */
	elm->last = next;			/* last returned number */
399
	elm->cached = next;			/* last cached number (forget cached values) */
M
 
Marc G. Fournier 已提交
400

401
	START_CRIT_CODE;
V
Vadim B. Mikheev 已提交
402 403 404
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;
405 406
		XLogRecData	rdata[2];
		Page		page = BufferGetPage(buf);
V
Vadim B. Mikheev 已提交
407 408

		xlrec.node = elm->rel->rd_node;
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426
		rdata[0].buffer = InvalidBuffer;
		rdata[0].data = (char*)&xlrec;
		rdata[0].len = sizeof(xl_seq_rec);
		rdata[0].next = &(rdata[1]);

		seq->last_value = next;
		seq->is_called = 't';
		seq->log_cnt = 0;
		rdata[1].buffer = InvalidBuffer;
		rdata[1].data = (char*)page + ((PageHeader) page)->pd_upper;
		rdata[1].len = ((PageHeader)page)->pd_special - 
						((PageHeader)page)->pd_upper;
		rdata[1].next = NULL;

		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG|XLOG_NO_TRAN, rdata);

		PageSetLSN(page, recptr);
		PageSetSUI(page, ThisStartUpID);
V
Vadim B. Mikheev 已提交
427
	}
428 429 430 431
	/* save info in sequence relation */
	seq->last_value = next;		/* last fetched number */
	seq->is_called = iscalled ? 't' : 'f';
	seq->log_cnt = (iscalled) ? 0 : 1;
432
	END_CRIT_CODE;
M
 
Marc G. Fournier 已提交
433

V
Vadim B. Mikheev 已提交
434 435
	LockBuffer(buf, BUFFER_LOCK_UNLOCK);

436
	if (WriteBuffer(buf) == STATUS_ERROR)
437 438 439 440
		elog(ERROR, "%s.setval: WriteBuffer failed", seqname);

	pfree(seqname);

441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464
}

Datum
setval(PG_FUNCTION_ARGS)
{
	text	   *seqin = PG_GETARG_TEXT_P(0);
	int32		next = PG_GETARG_INT32(1);
	char	   *seqname = get_seq_name(seqin);

	do_setval(seqname, next, true);

	PG_RETURN_INT32(next);
}

Datum
setval_and_iscalled(PG_FUNCTION_ARGS)
{
	text	   *seqin = PG_GETARG_TEXT_P(0);
	int32		next = PG_GETARG_INT32(1);
	bool		iscalled = PG_GETARG_BOOL(2);
	char	   *seqname = get_seq_name(seqin);

	do_setval(seqname, next, iscalled);

465 466 467 468 469 470 471 472 473 474 475 476
	PG_RETURN_INT32(next);
}

/*
 * Given a 'text' parameter to a sequence function, extract the actual
 * sequence name.  We downcase the name if it's not double-quoted.
 *
 * This is a kluge, really --- should be able to write nextval(seqrel).
 */
static char *
get_seq_name(text *seqin)
{
477 478
	char	   *rawname = DatumGetCString(DirectFunctionCall1(textout,
													PointerGetDatum(seqin)));
479 480
	int			rawlen = strlen(rawname);
	char	   *seqname;
M
 
Marc G. Fournier 已提交
481

482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498
	if (rawlen >= 2 &&
		rawname[0] == '\"' && rawname[rawlen - 1] == '\"')
	{
		/* strip off quotes, keep case */
		rawname[rawlen - 1] = '\0';
		seqname = pstrdup(rawname + 1);
		pfree(rawname);
	}
	else
	{
		seqname = rawname;
		/*
		 * It's important that this match the identifier downcasing code
		 * used by backend/parser/scan.l.
		 */
		for (; *rawname; rawname++)
		{
499 500
			if (isupper((unsigned char) *rawname))
				*rawname = tolower((unsigned char) *rawname);
501 502 503
		}
	}
	return seqname;
M
 
Marc G. Fournier 已提交
504 505
}

506
static Form_pg_sequence
B
Bruce Momjian 已提交
507
read_info(char *caller, SeqTable elm, Buffer *buf)
508
{
B
Bruce Momjian 已提交
509 510 511
	PageHeader	page;
	ItemId		lp;
	HeapTupleData tuple;
512
	sequence_magic *sm;
B
Bruce Momjian 已提交
513
	Form_pg_sequence seq;
514 515

	if (RelationGetNumberOfBlocks(elm->rel) != 1)
516
		elog(ERROR, "%s.%s: invalid number of blocks in sequence",
517 518 519 520
			 elm->name, caller);

	*buf = ReadBuffer(elm->rel, 0);
	if (!BufferIsValid(*buf))
521
		elog(ERROR, "%s.%s: ReadBuffer failed", elm->name, caller);
522

V
Vadim B. Mikheev 已提交
523 524
	LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);

525 526 527 528
	page = (PageHeader) BufferGetPage(*buf);
	sm = (sequence_magic *) PageGetSpecialPointer(page);

	if (sm->magic != SEQ_MAGIC)
529
		elog(ERROR, "%s.%s: bad magic (%08X)", elm->name, caller, sm->magic);
530 531 532

	lp = PageGetItemId(page, FirstOffsetNumber);
	Assert(ItemIdIsUsed(lp));
533
	tuple.t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
534

535
	seq = (Form_pg_sequence) GETSTRUCT(&tuple);
536 537 538

	elm->increment = seq->increment_by;

539
	return seq;
540 541 542 543

}


544
static SeqTable
545
init_sequence(char *caller, char *name)
546
{
547
	SeqTable	elm,
548 549
				prev = (SeqTable) NULL;
	Relation	seqrel;
550

551 552
	/* Look to see if we already have a seqtable entry for name */
	for (elm = seqtab; elm != (SeqTable) NULL; elm = elm->next)
553 554 555
	{
		if (strcmp(elm->name, name) == 0)
			break;
556
		prev = elm;
557 558
	}

559 560 561
	/* If so, and if it's already been opened in this xact, just return it */
	if (elm != (SeqTable) NULL && elm->rel != (Relation) NULL)
		return elm;
562

563 564 565
	/* Else open and check it */
	seqrel = heap_openr(name, AccessShareLock);
	if (seqrel->rd_rel->relkind != RELKIND_SEQUENCE)
566
		elog(ERROR, "%s.%s: %s is not a sequence", name, caller, name);
567

568 569
	if (elm != (SeqTable) NULL)
	{
570 571 572

		/*
		 * We are using a seqtable entry left over from a previous xact;
573 574 575 576
		 * must check for relid change.
		 */
		elm->rel = seqrel;
		if (RelationGetRelid(seqrel) != elm->relid)
577 578
		{
			elog(NOTICE, "%s.%s: sequence was re-created",
579
				 name, caller);
580
			elm->relid = RelationGetRelid(seqrel);
581 582 583 584 585
			elm->cached = elm->last = elm->increment = 0;
		}
	}
	else
	{
586 587 588

		/*
		 * Time to make a new seqtable entry.  These entries live as long
589 590 591 592 593 594 595 596 597 598
		 * as the backend does, so we use plain malloc for them.
		 */
		elm = (SeqTable) malloc(sizeof(SeqTableData));
		elm->name = malloc(strlen(name) + 1);
		strcpy(elm->name, name);
		elm->rel = seqrel;
		elm->relid = RelationGetRelid(seqrel);
		elm->cached = elm->last = elm->increment = 0;
		elm->next = (SeqTable) NULL;

599 600 601
		if (seqtab == (SeqTable) NULL)
			seqtab = elm;
		else
602
			prev->next = elm;
603 604
	}

605
	return elm;
606 607 608 609
}


/*
B
Bruce Momjian 已提交
610
 * CloseSequences
611
 *				is calling by xact mgr at commit/abort.
612 613
 */
void
614
CloseSequences(void)
615
{
616 617
	SeqTable	elm;
	Relation	rel;
618

619
	for (elm = seqtab; elm != (SeqTable) NULL; elm = elm->next)
620
	{
621 622 623 624
		if (elm->rel != (Relation) NULL)		/* opened in current xact */
		{
			rel = elm->rel;
			elm->rel = (Relation) NULL;
625
			heap_close(rel, AccessShareLock);
626 627
		}
	}
628 629 630
}


631
static void
632
init_params(CreateSeqStmt *seq, Form_pg_sequence new)
633
{
634 635 636 637 638 639
	DefElem    *last_value = NULL;
	DefElem    *increment_by = NULL;
	DefElem    *max_value = NULL;
	DefElem    *min_value = NULL;
	DefElem    *cache_value = NULL;
	List	   *option;
640 641 642 643

	new->is_cycled = 'f';
	foreach(option, seq->options)
	{
644
		DefElem    *defel = (DefElem *) lfirst(option);
645 646 647 648 649 650 651 652 653 654 655 656 657 658

		if (!strcasecmp(defel->defname, "increment"))
			increment_by = defel;
		else if (!strcasecmp(defel->defname, "start"))
			last_value = defel;
		else if (!strcasecmp(defel->defname, "maxvalue"))
			max_value = defel;
		else if (!strcasecmp(defel->defname, "minvalue"))
			min_value = defel;
		else if (!strcasecmp(defel->defname, "cache"))
			cache_value = defel;
		else if (!strcasecmp(defel->defname, "cycle"))
		{
			if (defel->arg != (Node *) NULL)
659
				elog(ERROR, "DefineSequence: CYCLE ??");
660 661 662
			new->is_cycled = 't';
		}
		else
663
			elog(ERROR, "DefineSequence: option \"%s\" not recognized",
664 665 666 667 668 669
				 defel->defname);
	}

	if (increment_by == (DefElem *) NULL)		/* INCREMENT BY */
		new->increment_by = 1;
	else if ((new->increment_by = get_param(increment_by)) == 0)
670
		elog(ERROR, "DefineSequence: can't INCREMENT by 0");
671 672

	if (max_value == (DefElem *) NULL)	/* MAXVALUE */
673
	{
674 675 676 677
		if (new->increment_by > 0)
			new->max_value = SEQ_MAXVALUE;		/* ascending seq */
		else
			new->max_value = -1;/* descending seq */
678
	}
679
	else
680
		new->max_value = get_param(max_value);
681

682
	if (min_value == (DefElem *) NULL)	/* MINVALUE */
683
	{
684 685 686 687
		if (new->increment_by > 0)
			new->min_value = 1; /* ascending seq */
		else
			new->min_value = SEQ_MINVALUE;		/* descending seq */
688
	}
689
	else
690 691 692
		new->min_value = get_param(min_value);

	if (new->min_value >= new->max_value)
693
		elog(ERROR, "DefineSequence: MINVALUE (%d) can't be >= MAXVALUE (%d)",
694 695 696
			 new->min_value, new->max_value);

	if (last_value == (DefElem *) NULL) /* START WITH */
697
	{
698 699 700 701
		if (new->increment_by > 0)
			new->last_value = new->min_value;	/* ascending seq */
		else
			new->last_value = new->max_value;	/* descending seq */
702
	}
703
	else
704 705 706
		new->last_value = get_param(last_value);

	if (new->last_value < new->min_value)
707
		elog(ERROR, "DefineSequence: START value (%d) can't be < MINVALUE (%d)",
708 709
			 new->last_value, new->min_value);
	if (new->last_value > new->max_value)
710
		elog(ERROR, "DefineSequence: START value (%d) can't be > MAXVALUE (%d)",
711 712 713 714 715
			 new->last_value, new->max_value);

	if (cache_value == (DefElem *) NULL)		/* CACHE */
		new->cache_value = 1;
	else if ((new->cache_value = get_param(cache_value)) <= 0)
716
		elog(ERROR, "DefineSequence: CACHE (%d) can't be <= 0",
717
			 new->cache_value);
718 719 720 721

}

static int
722
get_param(DefElem *def)
723
{
724
	if (def->arg == (Node *) NULL)
725
		elog(ERROR, "DefineSequence: \"%s\" value unspecified", def->defname);
726 727

	if (nodeTag(def->arg) == T_Integer)
728
		return intVal(def->arg);
729

730
	elog(ERROR, "DefineSequence: \"%s\" value must be integer", def->defname);
731
	return -1;
732
}
V
Vadim B. Mikheev 已提交
733 734 735

void seq_redo(XLogRecPtr lsn, XLogRecord *record)
{
736 737 738 739 740 741 742 743
	uint8			info = record->xl_info & ~XLR_INFO_MASK;
	Relation		reln;
	Buffer			buffer;
	Page			page;
	char		   *item;
	Size			itemsz;
	xl_seq_rec	   *xlrec = (xl_seq_rec*) XLogRecGetData(record);
	sequence_magic *sm;
V
Vadim B. Mikheev 已提交
744

745 746
	if (info != XLOG_SEQ_LOG)
		elog(STOP, "seq_redo: unknown op code %u", info);
V
Vadim B. Mikheev 已提交
747 748 749 750 751

	reln = XLogOpenRelation(true, RM_SEQ_ID, xlrec->node);
	if (!RelationIsValid(reln))
		return;

752
	buffer = XLogReadBuffer(true, reln, 0);
V
Vadim B. Mikheev 已提交
753 754 755 756 757 758
	if (!BufferIsValid(buffer))
		elog(STOP, "seq_redo: can't read block of %u/%u", 
			xlrec->node.tblNode, xlrec->node.relNode);

	page = (Page) BufferGetPage(buffer);

759 760 761
	PageInit((Page) page, BufferGetPageSize(buffer), sizeof(sequence_magic));
	sm = (sequence_magic *) PageGetSpecialPointer(page);
	sm->magic = SEQ_MAGIC;
V
Vadim B. Mikheev 已提交
762

763 764 765 766 767
	item = (char*)xlrec + sizeof(xl_seq_rec);
	itemsz = record->xl_len - sizeof(xl_seq_rec);
	itemsz = MAXALIGN(itemsz);
	if (PageAddItem(page, (Item)item, itemsz, 
			FirstOffsetNumber, LP_USED) == InvalidOffsetNumber)
V
Vadim B. Mikheev 已提交
768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792

	PageSetLSN(page, lsn);
	PageSetSUI(page, ThisStartUpID);
	UnlockAndWriteBuffer(buffer);

	return;
}

void seq_undo(XLogRecPtr lsn, XLogRecord *record)
{
}

void seq_desc(char *buf, uint8 xl_info, char* rec)
{
	uint8				info = xl_info & ~XLR_INFO_MASK;
	xl_seq_rec		   *xlrec = (xl_seq_rec*) rec;

	if (info == XLOG_SEQ_LOG)
		strcat(buf, "log: ");
	else
	{
		strcat(buf, "UNKNOWN");
		return;
	}

793 794
	sprintf(buf + strlen(buf), "node %u/%u",
		xlrec->node.tblNode, xlrec->node.relNode);
V
Vadim B. Mikheev 已提交
795
}