sequence.c 18.4 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * sequence.c
4
 *	  PostgreSQL sequences support code.
5
 *
6 7 8 9 10 11 12
 * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *	  $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.46 2000/12/08 20:10:19 tgl Exp $
 *
13 14
 *-------------------------------------------------------------------------
 */
15
#include "postgres.h"
16

17 18
#include <ctype.h>

19 20 21
#include "access/heapam.h"
#include "commands/creatinh.h"
#include "commands/sequence.h"
B
Bruce Momjian 已提交
22
#include "miscadmin.h"
23
#include "utils/acl.h"
B
Bruce Momjian 已提交
24
#include "utils/builtins.h"
25

26
#define SEQ_MAGIC	  0x1717
27 28 29 30

#define SEQ_MAXVALUE	((int4)0x7FFFFFFF)
#define SEQ_MINVALUE	-(SEQ_MAXVALUE)

V
Vadim B. Mikheev 已提交
31 32 33 34 35 36
/*
 * We don't want to log each fetching values from sequences,
 * so we pre-log a few fetches in advance. In the event of
 * crash we can lose as much as we pre-logged.
 */
#define	SEQ_LOG_VALS	32
37 38 39

typedef struct sequence_magic
{
40
	uint32		magic;
41
} sequence_magic;
42

43 44
typedef struct SeqTableData
{
45 46 47 48 49 50
	char	   *name;
	Oid			relid;
	Relation	rel;
	int4		cached;
	int4		last;
	int4		increment;
51
	struct SeqTableData *next;
52
} SeqTableData;
53 54 55 56 57

typedef SeqTableData *SeqTable;

static SeqTable seqtab = NULL;

58
static char *get_seq_name(text *seqin);
59
static SeqTable init_sequence(char *caller, char *name);
60 61
static Form_pg_sequence read_info(char *caller, SeqTable elm, Buffer *buf);
static void init_params(CreateSeqStmt *seq, Form_pg_sequence new);
62
static int	get_param(DefElem *def);
63
static void do_setval(char *seqname, int32 next, bool iscalled);
64 65

/*
B
Bruce Momjian 已提交
66
 * DefineSequence
67
 *				Creates a new sequence relation
68 69
 */
void
70
DefineSequence(CreateSeqStmt *seq)
71
{
72
	FormData_pg_sequence new;
73 74 75 76 77 78
	CreateStmt *stmt = makeNode(CreateStmt);
	ColumnDef  *coldef;
	TypeName   *typnam;
	Relation	rel;
	Buffer		buf;
	PageHeader	page;
79
	sequence_magic *sm;
80 81 82 83 84
	HeapTuple	tuple;
	TupleDesc	tupDesc;
	Datum		value[SEQ_COL_LASTCOL];
	char		null[SEQ_COL_LASTCOL];
	int			i;
85
	NameData	name;
86 87 88 89 90

	/* Check and set values */
	init_params(seq, &new);

	/*
91
	 * Create relation (and fill *null & *value)
92 93 94
	 */
	stmt->tableElts = NIL;
	for (i = SEQ_COL_FIRSTCOL; i <= SEQ_COL_LASTCOL; i++)
95
	{
96 97 98
		typnam = makeNode(TypeName);
		typnam->setof = FALSE;
		typnam->arrayBounds = NULL;
B
Bruce Momjian 已提交
99
		typnam->typmod = -1;
100 101
		coldef = makeNode(ColumnDef);
		coldef->typename = typnam;
102 103
		coldef->raw_default = NULL;
		coldef->cooked_default = NULL;
104 105 106 107 108
		coldef->is_not_null = false;
		null[i - 1] = ' ';

		switch (i)
		{
109 110 111
			case SEQ_COL_NAME:
				typnam->name = "name";
				coldef->colname = "sequence_name";
112 113
				namestrcpy(&name, seq->seqname);
				value[i - 1] = NameGetDatum(&name);
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
				break;
			case SEQ_COL_LASTVAL:
				typnam->name = "int4";
				coldef->colname = "last_value";
				value[i - 1] = Int32GetDatum(new.last_value);
				break;
			case SEQ_COL_INCBY:
				typnam->name = "int4";
				coldef->colname = "increment_by";
				value[i - 1] = Int32GetDatum(new.increment_by);
				break;
			case SEQ_COL_MAXVALUE:
				typnam->name = "int4";
				coldef->colname = "max_value";
				value[i - 1] = Int32GetDatum(new.max_value);
				break;
			case SEQ_COL_MINVALUE:
				typnam->name = "int4";
				coldef->colname = "min_value";
				value[i - 1] = Int32GetDatum(new.min_value);
				break;
			case SEQ_COL_CACHE:
				typnam->name = "int4";
				coldef->colname = "cache_value";
				value[i - 1] = Int32GetDatum(new.cache_value);
				break;
V
Vadim B. Mikheev 已提交
140 141 142 143 144
			case SEQ_COL_LOG:
				typnam->name = "int4";
				coldef->colname = "log_cnt";
				value[i - 1] = Int32GetDatum((int32)1);
				break;
145 146 147 148 149 150 151 152 153 154
			case SEQ_COL_CYCLE:
				typnam->name = "char";
				coldef->colname = "is_cycled";
				value[i - 1] = CharGetDatum(new.is_cycled);
				break;
			case SEQ_COL_CALLED:
				typnam->name = "char";
				coldef->colname = "is_called";
				value[i - 1] = CharGetDatum('f');
				break;
155 156 157 158 159 160 161 162
		}
		stmt->tableElts = lappend(stmt->tableElts, coldef);
	}

	stmt->relname = seq->seqname;
	stmt->inhRelnames = NIL;
	stmt->constraints = NIL;

163
	DefineRelation(stmt, RELKIND_SEQUENCE);
164

165
	rel = heap_openr(seq->seqname, AccessExclusiveLock);
166

167
	tupDesc = RelationGetDescr(rel);
168 169 170 171 172

	Assert(RelationGetNumberOfBlocks(rel) == 0);
	buf = ReadBuffer(rel, P_NEW);

	if (!BufferIsValid(buf))
173
		elog(ERROR, "DefineSequence: ReadBuffer failed");
174 175 176 177 178 179 180 181 182 183 184 185

	page = (PageHeader) BufferGetPage(buf);

	PageInit((Page) page, BufferGetPageSize(buf), sizeof(sequence_magic));
	sm = (sequence_magic *) PageGetSpecialPointer(page);
	sm->magic = SEQ_MAGIC;

	/* Now - form & insert sequence tuple */
	tuple = heap_formtuple(tupDesc, value, null);
	heap_insert(rel, tuple);

	if (WriteBuffer(buf) == STATUS_ERROR)
186
		elog(ERROR, "DefineSequence: WriteBuffer failed");
187

188
	heap_close(rel, AccessExclusiveLock);
189 190 191
}


192 193
Datum
nextval(PG_FUNCTION_ARGS)
194
{
195 196
	text	   *seqin = PG_GETARG_TEXT_P(0);
	char	   *seqname = get_seq_name(seqin);
197 198
	SeqTable	elm;
	Buffer		buf;
199
	Form_pg_sequence seq;
200
	int32		incby,
201 202
				maxv,
				minv,
V
Vadim B. Mikheev 已提交
203 204 205 206
				cache,
				log,
				fetch,
				last;
207
	int32		result,
208 209
				next,
				rescnt = 0;
V
Vadim B. Mikheev 已提交
210
	bool		logit = false;
211

212
	if (pg_aclcheck(seqname, GetUserId(), ACL_WR) != ACLCHECK_OK)
213 214 215
		elog(ERROR, "%s.nextval: you don't have permissions to set sequence %s",
			 seqname, seqname);

V
Vadim B. Mikheev 已提交
216
	/* open and AccessShareLock sequence */
217
	elm = init_sequence("nextval", seqname);
218

219 220 221 222 223
	pfree(seqname);

	if (elm->last != elm->cached)		/* some numbers were cached */
	{
		elm->last += elm->increment;
224
		PG_RETURN_INT32(elm->last);
225
	}
226

B
Bruce Momjian 已提交
227 228
	seq = read_info("nextval", elm, &buf);		/* lock page' buffer and
												 * read tuple */
229

V
Vadim B. Mikheev 已提交
230
	last = next = result = seq->last_value;
231 232 233
	incby = seq->increment_by;
	maxv = seq->max_value;
	minv = seq->min_value;
V
Vadim B. Mikheev 已提交
234 235
	fetch = cache = seq->cache_value;
	log = seq->log_cnt;
236 237

	if (seq->is_called != 't')
V
Vadim B. Mikheev 已提交
238
	{
239
		rescnt++;				/* last_value if not called */
V
Vadim B. Mikheev 已提交
240 241 242
		fetch--;
		log--;
	}
243

V
Vadim B. Mikheev 已提交
244 245 246 247 248 249 250
	if (log < fetch)
	{
		fetch = log = fetch - log + SEQ_LOG_VALS;
		logit = true;
	}

	while (fetch)		/* try to fetch cache [+ log ] numbers */
251
	{
252 253 254 255 256

		/*
		 * Check MAXVALUE for ascending sequences and MINVALUE for
		 * descending sequences
		 */
257
		if (incby > 0)
258
		{
259
			/* ascending sequence */
260 261 262 263
			if ((maxv >= 0 && next > maxv - incby) ||
				(maxv < 0 && next + incby > maxv))
			{
				if (rescnt > 0)
V
Vadim B. Mikheev 已提交
264
					break;		/* stop fetching */
265
				if (seq->is_cycled != 't')
266
					elog(ERROR, "%s.nextval: reached MAXVALUE (%d)",
267 268 269 270 271 272 273 274
						 elm->name, maxv);
				next = minv;
			}
			else
				next += incby;
		}
		else
		{
275
			/* descending sequence */
276 277 278 279
			if ((minv < 0 && next < minv - incby) ||
				(minv >= 0 && next + incby < minv))
			{
				if (rescnt > 0)
V
Vadim B. Mikheev 已提交
280
					break;		/* stop fetching */
281
				if (seq->is_cycled != 't')
282
					elog(ERROR, "%s.nextval: reached MINVALUE (%d)",
283 284 285 286 287 288
						 elm->name, minv);
				next = maxv;
			}
			else
				next += incby;
		}
V
Vadim B. Mikheev 已提交
289 290 291 292 293 294 295 296 297
		fetch--;
		if (rescnt < cache)
		{
			log--;
			rescnt++;
			last = next;
			if (rescnt == 1)		/* if it's first result - */
				result = next;		/* it's what to return */
		}
298 299 300 301
	}

	/* save info in local cache */
	elm->last = result;			/* last returned number */
V
Vadim B. Mikheev 已提交
302 303
	elm->cached = last;			/* last fetched number */

304
	START_CRIT_CODE;
V
Vadim B. Mikheev 已提交
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
	if (logit)
	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;

		if (fetch)		/* not all numbers were fetched */
			log -= fetch;

		xlrec.node = elm->rel->rd_node;
		xlrec.value = next;

		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG|XLOG_NO_TRAN,
					(char*) &xlrec, sizeof(xlrec), NULL, 0);

		PageSetLSN(BufferGetPage(buf), recptr);
		PageSetSUI(BufferGetPage(buf), ThisStartUpID);
	}
322 323

	/* save info in sequence relation */
V
Vadim B. Mikheev 已提交
324 325 326
	seq->last_value = last;		/* last fetched number */
	Assert(log >= 0);
	seq->log_cnt = log;			/* how much is logged */
327
	seq->is_called = 't';
328
	END_CRIT_CODE;
329

V
Vadim B. Mikheev 已提交
330 331
	LockBuffer(buf, BUFFER_LOCK_UNLOCK);

332
	if (WriteBuffer(buf) == STATUS_ERROR)
333
		elog(ERROR, "%s.nextval: WriteBuffer failed", elm->name);
334

335
	PG_RETURN_INT32(result);
336 337
}

338 339
Datum
currval(PG_FUNCTION_ARGS)
340
{
341 342
	text	   *seqin = PG_GETARG_TEXT_P(0);
	char	   *seqname = get_seq_name(seqin);
343
	SeqTable	elm;
344 345
	int32		result;

346
	if (pg_aclcheck(seqname, GetUserId(), ACL_RD) != ACLCHECK_OK)
347 348
		elog(ERROR, "%s.currval: you don't have permissions to read sequence %s",
			 seqname, seqname);
349

V
Vadim B. Mikheev 已提交
350
	/* open and AccessShareLock sequence */
351 352 353
	elm = init_sequence("currval", seqname);

	if (elm->increment == 0)	/* nextval/read_info were not called */
354 355
		elog(ERROR, "%s.currval is not yet defined in this session",
			 seqname);
356 357 358

	result = elm->last;

359
	pfree(seqname);
360

361
	PG_RETURN_INT32(result);
362 363
}

B
Bruce Momjian 已提交
364
static void
365
do_setval(char *seqname, int32 next, bool iscalled)
M
 
Marc G. Fournier 已提交
366 367
{
	SeqTable	elm;
368
	Buffer		buf;
369
	Form_pg_sequence seq;
M
 
Marc G. Fournier 已提交
370

371
	if (pg_aclcheck(seqname, GetUserId(), ACL_WR) != ACLCHECK_OK)
M
 
Marc G. Fournier 已提交
372 373 374
		elog(ERROR, "%s.setval: you don't have permissions to set sequence %s",
			 seqname, seqname);

V
Vadim B. Mikheev 已提交
375
	/* open and AccessShareLock sequence */
376
	elm = init_sequence("setval", seqname);
B
Bruce Momjian 已提交
377 378
	seq = read_info("setval", elm, &buf);		/* lock page' buffer and
												 * read tuple */
M
 
Marc G. Fournier 已提交
379

380
	if ((next < seq->min_value) || (next > seq->max_value))
381
		elog(ERROR, "%s.setval: value %d is out of bounds (%d,%d)",
382
			 seqname, next, seq->min_value, seq->max_value);
M
 
Marc G. Fournier 已提交
383 384 385

	/* save info in local cache */
	elm->last = next;			/* last returned number */
386
	elm->cached = next;			/* last cached number (forget cached values) */
M
 
Marc G. Fournier 已提交
387 388

	/* save info in sequence relation */
389
	START_CRIT_CODE;
M
 
Marc G. Fournier 已提交
390
	seq->last_value = next;		/* last fetched number */
391
	seq->is_called = iscalled ? 't' : 'f';
V
Vadim B. Mikheev 已提交
392 393 394 395 396 397 398 399 400 401 402 403 404 405 406
	seq->log_cnt = (iscalled) ? 0 : 1;

	{
		xl_seq_rec	xlrec;
		XLogRecPtr	recptr;

		xlrec.node = elm->rel->rd_node;
		xlrec.value = next;

		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_SET|XLOG_NO_TRAN,
					(char*) &xlrec, sizeof(xlrec), NULL, 0);

		PageSetLSN(BufferGetPage(buf), recptr);
		PageSetSUI(BufferGetPage(buf), ThisStartUpID);
	}
407
	END_CRIT_CODE;
M
 
Marc G. Fournier 已提交
408

V
Vadim B. Mikheev 已提交
409 410
	LockBuffer(buf, BUFFER_LOCK_UNLOCK);

411
	if (WriteBuffer(buf) == STATUS_ERROR)
412 413 414 415
		elog(ERROR, "%s.setval: WriteBuffer failed", seqname);

	pfree(seqname);

416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439
}

Datum
setval(PG_FUNCTION_ARGS)
{
	text	   *seqin = PG_GETARG_TEXT_P(0);
	int32		next = PG_GETARG_INT32(1);
	char	   *seqname = get_seq_name(seqin);

	do_setval(seqname, next, true);

	PG_RETURN_INT32(next);
}

Datum
setval_and_iscalled(PG_FUNCTION_ARGS)
{
	text	   *seqin = PG_GETARG_TEXT_P(0);
	int32		next = PG_GETARG_INT32(1);
	bool		iscalled = PG_GETARG_BOOL(2);
	char	   *seqname = get_seq_name(seqin);

	do_setval(seqname, next, iscalled);

440 441 442 443 444 445 446 447 448 449 450 451
	PG_RETURN_INT32(next);
}

/*
 * Given a 'text' parameter to a sequence function, extract the actual
 * sequence name.  We downcase the name if it's not double-quoted.
 *
 * This is a kluge, really --- should be able to write nextval(seqrel).
 */
static char *
get_seq_name(text *seqin)
{
452 453
	char	   *rawname = DatumGetCString(DirectFunctionCall1(textout,
													PointerGetDatum(seqin)));
454 455
	int			rawlen = strlen(rawname);
	char	   *seqname;
M
 
Marc G. Fournier 已提交
456

457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473
	if (rawlen >= 2 &&
		rawname[0] == '\"' && rawname[rawlen - 1] == '\"')
	{
		/* strip off quotes, keep case */
		rawname[rawlen - 1] = '\0';
		seqname = pstrdup(rawname + 1);
		pfree(rawname);
	}
	else
	{
		seqname = rawname;
		/*
		 * It's important that this match the identifier downcasing code
		 * used by backend/parser/scan.l.
		 */
		for (; *rawname; rawname++)
		{
474 475
			if (isupper((unsigned char) *rawname))
				*rawname = tolower((unsigned char) *rawname);
476 477 478
		}
	}
	return seqname;
M
 
Marc G. Fournier 已提交
479 480
}

481
static Form_pg_sequence
B
Bruce Momjian 已提交
482
read_info(char *caller, SeqTable elm, Buffer *buf)
483
{
B
Bruce Momjian 已提交
484 485 486
	PageHeader	page;
	ItemId		lp;
	HeapTupleData tuple;
487
	sequence_magic *sm;
B
Bruce Momjian 已提交
488
	Form_pg_sequence seq;
489 490

	if (RelationGetNumberOfBlocks(elm->rel) != 1)
491
		elog(ERROR, "%s.%s: invalid number of blocks in sequence",
492 493 494 495
			 elm->name, caller);

	*buf = ReadBuffer(elm->rel, 0);
	if (!BufferIsValid(*buf))
496
		elog(ERROR, "%s.%s: ReadBuffer failed", elm->name, caller);
497

V
Vadim B. Mikheev 已提交
498 499
	LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);

500 501 502 503
	page = (PageHeader) BufferGetPage(*buf);
	sm = (sequence_magic *) PageGetSpecialPointer(page);

	if (sm->magic != SEQ_MAGIC)
504
		elog(ERROR, "%s.%s: bad magic (%08X)", elm->name, caller, sm->magic);
505 506 507

	lp = PageGetItemId(page, FirstOffsetNumber);
	Assert(ItemIdIsUsed(lp));
508
	tuple.t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
509

510
	seq = (Form_pg_sequence) GETSTRUCT(&tuple);
511 512 513

	elm->increment = seq->increment_by;

514
	return seq;
515 516 517 518

}


519
static SeqTable
520
init_sequence(char *caller, char *name)
521
{
522
	SeqTable	elm,
523 524
				prev = (SeqTable) NULL;
	Relation	seqrel;
525

526 527
	/* Look to see if we already have a seqtable entry for name */
	for (elm = seqtab; elm != (SeqTable) NULL; elm = elm->next)
528 529 530
	{
		if (strcmp(elm->name, name) == 0)
			break;
531
		prev = elm;
532 533
	}

534 535 536
	/* If so, and if it's already been opened in this xact, just return it */
	if (elm != (SeqTable) NULL && elm->rel != (Relation) NULL)
		return elm;
537

538 539 540
	/* Else open and check it */
	seqrel = heap_openr(name, AccessShareLock);
	if (seqrel->rd_rel->relkind != RELKIND_SEQUENCE)
541
		elog(ERROR, "%s.%s: %s is not a sequence", name, caller, name);
542

543 544
	if (elm != (SeqTable) NULL)
	{
545 546 547

		/*
		 * We are using a seqtable entry left over from a previous xact;
548 549 550 551
		 * must check for relid change.
		 */
		elm->rel = seqrel;
		if (RelationGetRelid(seqrel) != elm->relid)
552 553
		{
			elog(NOTICE, "%s.%s: sequence was re-created",
554
				 name, caller);
555
			elm->relid = RelationGetRelid(seqrel);
556 557 558 559 560
			elm->cached = elm->last = elm->increment = 0;
		}
	}
	else
	{
561 562 563

		/*
		 * Time to make a new seqtable entry.  These entries live as long
564 565 566 567 568 569 570 571 572 573
		 * as the backend does, so we use plain malloc for them.
		 */
		elm = (SeqTable) malloc(sizeof(SeqTableData));
		elm->name = malloc(strlen(name) + 1);
		strcpy(elm->name, name);
		elm->rel = seqrel;
		elm->relid = RelationGetRelid(seqrel);
		elm->cached = elm->last = elm->increment = 0;
		elm->next = (SeqTable) NULL;

574 575 576
		if (seqtab == (SeqTable) NULL)
			seqtab = elm;
		else
577
			prev->next = elm;
578 579
	}

580
	return elm;
581 582 583 584
}


/*
B
Bruce Momjian 已提交
585
 * CloseSequences
586
 *				is calling by xact mgr at commit/abort.
587 588
 */
void
589
CloseSequences(void)
590
{
591 592
	SeqTable	elm;
	Relation	rel;
593

594
	for (elm = seqtab; elm != (SeqTable) NULL; elm = elm->next)
595
	{
596 597 598 599
		if (elm->rel != (Relation) NULL)		/* opened in current xact */
		{
			rel = elm->rel;
			elm->rel = (Relation) NULL;
600
			heap_close(rel, AccessShareLock);
601 602
		}
	}
603 604 605
}


606
static void
607
init_params(CreateSeqStmt *seq, Form_pg_sequence new)
608
{
609 610 611 612 613 614
	DefElem    *last_value = NULL;
	DefElem    *increment_by = NULL;
	DefElem    *max_value = NULL;
	DefElem    *min_value = NULL;
	DefElem    *cache_value = NULL;
	List	   *option;
615 616 617 618

	new->is_cycled = 'f';
	foreach(option, seq->options)
	{
619
		DefElem    *defel = (DefElem *) lfirst(option);
620 621 622 623 624 625 626 627 628 629 630 631 632 633

		if (!strcasecmp(defel->defname, "increment"))
			increment_by = defel;
		else if (!strcasecmp(defel->defname, "start"))
			last_value = defel;
		else if (!strcasecmp(defel->defname, "maxvalue"))
			max_value = defel;
		else if (!strcasecmp(defel->defname, "minvalue"))
			min_value = defel;
		else if (!strcasecmp(defel->defname, "cache"))
			cache_value = defel;
		else if (!strcasecmp(defel->defname, "cycle"))
		{
			if (defel->arg != (Node *) NULL)
634
				elog(ERROR, "DefineSequence: CYCLE ??");
635 636 637
			new->is_cycled = 't';
		}
		else
638
			elog(ERROR, "DefineSequence: option \"%s\" not recognized",
639 640 641 642 643 644
				 defel->defname);
	}

	if (increment_by == (DefElem *) NULL)		/* INCREMENT BY */
		new->increment_by = 1;
	else if ((new->increment_by = get_param(increment_by)) == 0)
645
		elog(ERROR, "DefineSequence: can't INCREMENT by 0");
646 647

	if (max_value == (DefElem *) NULL)	/* MAXVALUE */
648
	{
649 650 651 652
		if (new->increment_by > 0)
			new->max_value = SEQ_MAXVALUE;		/* ascending seq */
		else
			new->max_value = -1;/* descending seq */
653
	}
654
	else
655
		new->max_value = get_param(max_value);
656

657
	if (min_value == (DefElem *) NULL)	/* MINVALUE */
658
	{
659 660 661 662
		if (new->increment_by > 0)
			new->min_value = 1; /* ascending seq */
		else
			new->min_value = SEQ_MINVALUE;		/* descending seq */
663
	}
664
	else
665 666 667
		new->min_value = get_param(min_value);

	if (new->min_value >= new->max_value)
668
		elog(ERROR, "DefineSequence: MINVALUE (%d) can't be >= MAXVALUE (%d)",
669 670 671
			 new->min_value, new->max_value);

	if (last_value == (DefElem *) NULL) /* START WITH */
672
	{
673 674 675 676
		if (new->increment_by > 0)
			new->last_value = new->min_value;	/* ascending seq */
		else
			new->last_value = new->max_value;	/* descending seq */
677
	}
678
	else
679 680 681
		new->last_value = get_param(last_value);

	if (new->last_value < new->min_value)
682
		elog(ERROR, "DefineSequence: START value (%d) can't be < MINVALUE (%d)",
683 684
			 new->last_value, new->min_value);
	if (new->last_value > new->max_value)
685
		elog(ERROR, "DefineSequence: START value (%d) can't be > MAXVALUE (%d)",
686 687 688 689 690
			 new->last_value, new->max_value);

	if (cache_value == (DefElem *) NULL)		/* CACHE */
		new->cache_value = 1;
	else if ((new->cache_value = get_param(cache_value)) <= 0)
691
		elog(ERROR, "DefineSequence: CACHE (%d) can't be <= 0",
692
			 new->cache_value);
693 694 695 696

}

static int
697
get_param(DefElem *def)
698
{
699
	if (def->arg == (Node *) NULL)
700
		elog(ERROR, "DefineSequence: \"%s\" value unspecified", def->defname);
701 702

	if (nodeTag(def->arg) == T_Integer)
703
		return intVal(def->arg);
704

705
	elog(ERROR, "DefineSequence: \"%s\" value must be integer", def->defname);
706
	return -1;
707
}
V
Vadim B. Mikheev 已提交
708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784

void seq_redo(XLogRecPtr lsn, XLogRecord *record)
{
	uint8				info = record->xl_info & ~XLR_INFO_MASK;
	Relation			reln;
	Buffer				buffer;
	Page				page;
	ItemId				lp;
	HeapTupleData		tuple;
	Form_pg_sequence	seq;
	xl_seq_rec		   *xlrec;

	if (info != XLOG_SEQ_LOG && info != XLOG_SEQ_SET)
		elog(STOP, "seq_redo: unknown op code %u", info);

	xlrec = (xl_seq_rec*) XLogRecGetData(record);

	reln = XLogOpenRelation(true, RM_SEQ_ID, xlrec->node);
	if (!RelationIsValid(reln))
		return;

	buffer = XLogReadBuffer(false, reln, 0);
	if (!BufferIsValid(buffer))
		elog(STOP, "seq_redo: can't read block of %u/%u", 
			xlrec->node.tblNode, xlrec->node.relNode);

	page = (Page) BufferGetPage(buffer);
	if (PageIsNew((PageHeader) page) ||
		((sequence_magic *) PageGetSpecialPointer(page))->magic != SEQ_MAGIC)
		elog(STOP, "seq_redo: uninitialized page of %u/%u",
			xlrec->node.tblNode, xlrec->node.relNode);

	if (XLByteLE(lsn, PageGetLSN(page)))
	{
		UnlockAndReleaseBuffer(buffer);
		return;
	}

	lp = PageGetItemId(page, FirstOffsetNumber);
	Assert(ItemIdIsUsed(lp));
	tuple.t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);

	seq = (Form_pg_sequence) GETSTRUCT(&tuple);

	seq->last_value = xlrec->value;		/* last logged value */
	seq->is_called = 't';
	seq->log_cnt = 0;

	PageSetLSN(page, lsn);
	PageSetSUI(page, ThisStartUpID);
	UnlockAndWriteBuffer(buffer);

	return;
}

void seq_undo(XLogRecPtr lsn, XLogRecord *record)
{
}

void seq_desc(char *buf, uint8 xl_info, char* rec)
{
	uint8				info = xl_info & ~XLR_INFO_MASK;
	xl_seq_rec		   *xlrec = (xl_seq_rec*) rec;

	if (info == XLOG_SEQ_LOG)
		strcat(buf, "log: ");
	else if (info == XLOG_SEQ_SET)
		strcat(buf, "set: ");
	else
	{
		strcat(buf, "UNKNOWN");
		return;
	}

	sprintf(buf + strlen(buf), "node %u/%u; value %d",
		xlrec->node.tblNode, xlrec->node.relNode, xlrec->value);
}