sequence.c 15.4 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * sequence.c
4
 *	  PostgreSQL sequences support code.
5 6 7 8
 *
 *-------------------------------------------------------------------------
 */

9 10
#include <ctype.h>

11
#include "postgres.h"
12

13 14 15
#include "access/heapam.h"
#include "commands/creatinh.h"
#include "commands/sequence.h"
B
Bruce Momjian 已提交
16
#include "miscadmin.h"
17
#include "utils/acl.h"
B
Bruce Momjian 已提交
18
#include "utils/builtins.h"
19

20
#define SEQ_MAGIC	  0x1717
21 22 23 24

#define SEQ_MAXVALUE	((int4)0x7FFFFFFF)
#define SEQ_MINVALUE	-(SEQ_MAXVALUE)

25 26
typedef struct FormData_pg_sequence
{
27 28 29 30 31 32 33 34
	NameData	sequence_name;
	int4		last_value;
	int4		increment_by;
	int4		max_value;
	int4		min_value;
	int4		cache_value;
	char		is_cycled;
	char		is_called;
35
} FormData_pg_sequence;
36

37
typedef FormData_pg_sequence *Form_pg_sequence;
38 39 40

typedef struct sequence_magic
{
41
	uint32		magic;
42
} sequence_magic;
43

44 45
typedef struct SeqTableData
{
46 47 48 49 50 51
	char	   *name;
	Oid			relid;
	Relation	rel;
	int4		cached;
	int4		last;
	int4		increment;
52
	struct SeqTableData *next;
53
} SeqTableData;
54 55 56 57 58

typedef SeqTableData *SeqTable;

static SeqTable seqtab = NULL;

59
static char *get_seq_name(text *seqin);
60
static SeqTable init_sequence(char *caller, char *name);
61 62
static Form_pg_sequence read_info(char *caller, SeqTable elm, Buffer *buf);
static void init_params(CreateSeqStmt *seq, Form_pg_sequence new);
63
static int	get_param(DefElem *def);
64
static void do_setval(char *seqname, int32 next, bool iscalled);
65 66

/*
B
Bruce Momjian 已提交
67
 * DefineSequence
68
 *				Creates a new sequence relation
69 70
 */
void
71
DefineSequence(CreateSeqStmt *seq)
72
{
73
	FormData_pg_sequence new;
74 75 76 77 78 79
	CreateStmt *stmt = makeNode(CreateStmt);
	ColumnDef  *coldef;
	TypeName   *typnam;
	Relation	rel;
	Buffer		buf;
	PageHeader	page;
80
	sequence_magic *sm;
81 82 83 84 85
	HeapTuple	tuple;
	TupleDesc	tupDesc;
	Datum		value[SEQ_COL_LASTCOL];
	char		null[SEQ_COL_LASTCOL];
	int			i;
86
	NameData	name;
87 88 89 90 91

	/* Check and set values */
	init_params(seq, &new);

	/*
92
	 * Create relation (and fill *null & *value)
93 94 95
	 */
	stmt->tableElts = NIL;
	for (i = SEQ_COL_FIRSTCOL; i <= SEQ_COL_LASTCOL; i++)
96
	{
97 98 99
		typnam = makeNode(TypeName);
		typnam->setof = FALSE;
		typnam->arrayBounds = NULL;
B
Bruce Momjian 已提交
100
		typnam->typmod = -1;
101 102
		coldef = makeNode(ColumnDef);
		coldef->typename = typnam;
103 104
		coldef->raw_default = NULL;
		coldef->cooked_default = NULL;
105 106 107 108 109
		coldef->is_not_null = false;
		null[i - 1] = ' ';

		switch (i)
		{
110 111 112
			case SEQ_COL_NAME:
				typnam->name = "name";
				coldef->colname = "sequence_name";
113 114
				namestrcpy(&name, seq->seqname);
				value[i - 1] = NameGetDatum(&name);
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
				break;
			case SEQ_COL_LASTVAL:
				typnam->name = "int4";
				coldef->colname = "last_value";
				value[i - 1] = Int32GetDatum(new.last_value);
				break;
			case SEQ_COL_INCBY:
				typnam->name = "int4";
				coldef->colname = "increment_by";
				value[i - 1] = Int32GetDatum(new.increment_by);
				break;
			case SEQ_COL_MAXVALUE:
				typnam->name = "int4";
				coldef->colname = "max_value";
				value[i - 1] = Int32GetDatum(new.max_value);
				break;
			case SEQ_COL_MINVALUE:
				typnam->name = "int4";
				coldef->colname = "min_value";
				value[i - 1] = Int32GetDatum(new.min_value);
				break;
			case SEQ_COL_CACHE:
				typnam->name = "int4";
				coldef->colname = "cache_value";
				value[i - 1] = Int32GetDatum(new.cache_value);
				break;
			case SEQ_COL_CYCLE:
				typnam->name = "char";
				coldef->colname = "is_cycled";
				value[i - 1] = CharGetDatum(new.is_cycled);
				break;
			case SEQ_COL_CALLED:
				typnam->name = "char";
				coldef->colname = "is_called";
				value[i - 1] = CharGetDatum('f');
				break;
151 152 153 154 155 156 157 158
		}
		stmt->tableElts = lappend(stmt->tableElts, coldef);
	}

	stmt->relname = seq->seqname;
	stmt->inhRelnames = NIL;
	stmt->constraints = NIL;

159
	DefineRelation(stmt, RELKIND_SEQUENCE);
160

161
	rel = heap_openr(seq->seqname, AccessExclusiveLock);
162

163
	tupDesc = RelationGetDescr(rel);
164 165 166 167 168

	Assert(RelationGetNumberOfBlocks(rel) == 0);
	buf = ReadBuffer(rel, P_NEW);

	if (!BufferIsValid(buf))
169
		elog(ERROR, "DefineSequence: ReadBuffer failed");
170 171 172 173 174 175 176 177 178 179 180 181

	page = (PageHeader) BufferGetPage(buf);

	PageInit((Page) page, BufferGetPageSize(buf), sizeof(sequence_magic));
	sm = (sequence_magic *) PageGetSpecialPointer(page);
	sm->magic = SEQ_MAGIC;

	/* Now - form & insert sequence tuple */
	tuple = heap_formtuple(tupDesc, value, null);
	heap_insert(rel, tuple);

	if (WriteBuffer(buf) == STATUS_ERROR)
182
		elog(ERROR, "DefineSequence: WriteBuffer failed");
183

184
	heap_close(rel, AccessExclusiveLock);
185 186 187
}


188 189
Datum
nextval(PG_FUNCTION_ARGS)
190
{
191 192
	text	   *seqin = PG_GETARG_TEXT_P(0);
	char	   *seqname = get_seq_name(seqin);
193 194
	SeqTable	elm;
	Buffer		buf;
195
	Form_pg_sequence seq;
196
	int32		incby,
197 198 199
				maxv,
				minv,
				cache;
200
	int32		result,
201 202
				next,
				rescnt = 0;
203

204
#ifndef NO_SECURITY
205
	if (pg_aclcheck(seqname, GetUserId(), ACL_WR) != ACLCHECK_OK)
206 207 208 209
		elog(ERROR, "%s.nextval: you don't have permissions to set sequence %s",
			 seqname, seqname);
#endif

V
Vadim B. Mikheev 已提交
210
	/* open and AccessShareLock sequence */
211
	elm = init_sequence("nextval", seqname);
212

213 214 215 216 217
	pfree(seqname);

	if (elm->last != elm->cached)		/* some numbers were cached */
	{
		elm->last += elm->increment;
218
		PG_RETURN_INT32(elm->last);
219
	}
220

B
Bruce Momjian 已提交
221 222
	seq = read_info("nextval", elm, &buf);		/* lock page' buffer and
												 * read tuple */
223 224 225 226 227 228 229 230 231 232 233

	next = result = seq->last_value;
	incby = seq->increment_by;
	maxv = seq->max_value;
	minv = seq->min_value;
	cache = seq->cache_value;

	if (seq->is_called != 't')
		rescnt++;				/* last_value if not called */

	while (rescnt < cache)		/* try to fetch cache numbers */
234
	{
235 236 237 238 239

		/*
		 * Check MAXVALUE for ascending sequences and MINVALUE for
		 * descending sequences
		 */
240
		if (incby > 0)
241
		{
242
			/* ascending sequence */
243 244 245 246 247 248
			if ((maxv >= 0 && next > maxv - incby) ||
				(maxv < 0 && next + incby > maxv))
			{
				if (rescnt > 0)
					break;		/* stop caching */
				if (seq->is_cycled != 't')
249
					elog(ERROR, "%s.nextval: got MAXVALUE (%d)",
250 251 252 253 254 255 256 257
						 elm->name, maxv);
				next = minv;
			}
			else
				next += incby;
		}
		else
		{
258
			/* descending sequence */
259 260 261 262 263 264
			if ((minv < 0 && next < minv - incby) ||
				(minv >= 0 && next + incby < minv))
			{
				if (rescnt > 0)
					break;		/* stop caching */
				if (seq->is_cycled != 't')
265
					elog(ERROR, "%s.nextval: got MINVALUE (%d)",
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
						 elm->name, minv);
				next = maxv;
			}
			else
				next += incby;
		}
		rescnt++;				/* got result */
		if (rescnt == 1)		/* if it's first one - */
			result = next;		/* it's what to return */
	}

	/* save info in local cache */
	elm->last = result;			/* last returned number */
	elm->cached = next;			/* last cached number */

	/* save info in sequence relation */
	seq->last_value = next;		/* last fetched number */
	seq->is_called = 't';

V
Vadim B. Mikheev 已提交
285 286
	LockBuffer(buf, BUFFER_LOCK_UNLOCK);

287
	if (WriteBuffer(buf) == STATUS_ERROR)
288
		elog(ERROR, "%s.nextval: WriteBuffer failed", elm->name);
289

290
	PG_RETURN_INT32(result);
291 292
}

293 294
Datum
currval(PG_FUNCTION_ARGS)
295
{
296 297
	text	   *seqin = PG_GETARG_TEXT_P(0);
	char	   *seqname = get_seq_name(seqin);
298
	SeqTable	elm;
299 300 301
	int32		result;

#ifndef NO_SECURITY
302
	if (pg_aclcheck(seqname, GetUserId(), ACL_RD) != ACLCHECK_OK)
303 304 305
		elog(ERROR, "%s.currval: you don't have permissions to read sequence %s",
			 seqname, seqname);
#endif
306

V
Vadim B. Mikheev 已提交
307
	/* open and AccessShareLock sequence */
308 309 310
	elm = init_sequence("currval", seqname);

	if (elm->increment == 0)	/* nextval/read_info were not called */
311 312
		elog(ERROR, "%s.currval is not yet defined in this session",
			 seqname);
313 314 315

	result = elm->last;

316
	pfree(seqname);
317

318
	PG_RETURN_INT32(result);
319 320
}

321 322
static void 
do_setval(char *seqname, int32 next, bool iscalled)
M
 
Marc G. Fournier 已提交
323 324
{
	SeqTable	elm;
325
	Buffer		buf;
326
	Form_pg_sequence seq;
M
 
Marc G. Fournier 已提交
327 328

#ifndef NO_SECURITY
329
	if (pg_aclcheck(seqname, GetUserId(), ACL_WR) != ACLCHECK_OK)
M
 
Marc G. Fournier 已提交
330 331 332 333
		elog(ERROR, "%s.setval: you don't have permissions to set sequence %s",
			 seqname, seqname);
#endif

V
Vadim B. Mikheev 已提交
334
	/* open and AccessShareLock sequence */
335
	elm = init_sequence("setval", seqname);
B
Bruce Momjian 已提交
336 337
	seq = read_info("setval", elm, &buf);		/* lock page' buffer and
												 * read tuple */
M
 
Marc G. Fournier 已提交
338

339 340 341 342
	if (seq->cache_value != 1)
	{
		elog(ERROR, "%s.setval: can't set value of sequence %s, cache != 1",
			 seqname, seqname);
M
 
Marc G. Fournier 已提交
343 344
	}

345 346 347 348
	if ((next < seq->min_value) || (next > seq->max_value))
	{
		elog(ERROR, "%s.setval: value %d is of of bounds (%d,%d)",
			 seqname, next, seq->min_value, seq->max_value);
M
 
Marc G. Fournier 已提交
349 350 351 352 353 354 355 356
	}

	/* save info in local cache */
	elm->last = next;			/* last returned number */
	elm->cached = next;			/* last cached number */

	/* save info in sequence relation */
	seq->last_value = next;		/* last fetched number */
357
	seq->is_called = iscalled ? 't' : 'f';
M
 
Marc G. Fournier 已提交
358

V
Vadim B. Mikheev 已提交
359 360
	LockBuffer(buf, BUFFER_LOCK_UNLOCK);

361
	if (WriteBuffer(buf) == STATUS_ERROR)
362 363 364 365
		elog(ERROR, "%s.setval: WriteBuffer failed", seqname);

	pfree(seqname);

366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389
}

Datum
setval(PG_FUNCTION_ARGS)
{
	text	   *seqin = PG_GETARG_TEXT_P(0);
	int32		next = PG_GETARG_INT32(1);
	char	   *seqname = get_seq_name(seqin);

	do_setval(seqname, next, true);

	PG_RETURN_INT32(next);
}

Datum
setval_and_iscalled(PG_FUNCTION_ARGS)
{
	text	   *seqin = PG_GETARG_TEXT_P(0);
	int32		next = PG_GETARG_INT32(1);
	bool		iscalled = PG_GETARG_BOOL(2);
	char	   *seqname = get_seq_name(seqin);

	do_setval(seqname, next, iscalled);

390 391 392 393 394 395 396 397 398 399 400 401
	PG_RETURN_INT32(next);
}

/*
 * Given a 'text' parameter to a sequence function, extract the actual
 * sequence name.  We downcase the name if it's not double-quoted.
 *
 * This is a kluge, really --- should be able to write nextval(seqrel).
 */
static char *
get_seq_name(text *seqin)
{
402 403
	char	   *rawname = DatumGetCString(DirectFunctionCall1(textout,
													PointerGetDatum(seqin)));
404 405
	int			rawlen = strlen(rawname);
	char	   *seqname;
M
 
Marc G. Fournier 已提交
406

407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423
	if (rawlen >= 2 &&
		rawname[0] == '\"' && rawname[rawlen - 1] == '\"')
	{
		/* strip off quotes, keep case */
		rawname[rawlen - 1] = '\0';
		seqname = pstrdup(rawname + 1);
		pfree(rawname);
	}
	else
	{
		seqname = rawname;
		/*
		 * It's important that this match the identifier downcasing code
		 * used by backend/parser/scan.l.
		 */
		for (; *rawname; rawname++)
		{
424 425
			if (isascii((int) *rawname) &&
				isupper((int) *rawname))
426 427 428 429
				*rawname = tolower(*rawname);
		}
	}
	return seqname;
M
 
Marc G. Fournier 已提交
430 431
}

432
static Form_pg_sequence
B
Bruce Momjian 已提交
433
read_info(char *caller, SeqTable elm, Buffer *buf)
434
{
B
Bruce Momjian 已提交
435 436 437
	PageHeader	page;
	ItemId		lp;
	HeapTupleData tuple;
438
	sequence_magic *sm;
B
Bruce Momjian 已提交
439
	Form_pg_sequence seq;
440 441

	if (RelationGetNumberOfBlocks(elm->rel) != 1)
442
		elog(ERROR, "%s.%s: invalid number of blocks in sequence",
443 444 445 446
			 elm->name, caller);

	*buf = ReadBuffer(elm->rel, 0);
	if (!BufferIsValid(*buf))
447
		elog(ERROR, "%s.%s: ReadBuffer failed", elm->name, caller);
448

V
Vadim B. Mikheev 已提交
449 450
	LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);

451 452 453 454
	page = (PageHeader) BufferGetPage(*buf);
	sm = (sequence_magic *) PageGetSpecialPointer(page);

	if (sm->magic != SEQ_MAGIC)
455
		elog(ERROR, "%s.%s: bad magic (%08X)", elm->name, caller, sm->magic);
456 457 458

	lp = PageGetItemId(page, FirstOffsetNumber);
	Assert(ItemIdIsUsed(lp));
459
	tuple.t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
460

461
	seq = (Form_pg_sequence) GETSTRUCT(&tuple);
462 463 464

	elm->increment = seq->increment_by;

465
	return seq;
466 467 468 469

}


470
static SeqTable
471
init_sequence(char *caller, char *name)
472
{
473
	SeqTable	elm,
474 475
				prev = (SeqTable) NULL;
	Relation	seqrel;
476

477 478
	/* Look to see if we already have a seqtable entry for name */
	for (elm = seqtab; elm != (SeqTable) NULL; elm = elm->next)
479 480 481
	{
		if (strcmp(elm->name, name) == 0)
			break;
482
		prev = elm;
483 484
	}

485 486 487
	/* If so, and if it's already been opened in this xact, just return it */
	if (elm != (SeqTable) NULL && elm->rel != (Relation) NULL)
		return elm;
488

489 490 491
	/* Else open and check it */
	seqrel = heap_openr(name, AccessShareLock);
	if (seqrel->rd_rel->relkind != RELKIND_SEQUENCE)
492
		elog(ERROR, "%s.%s: %s is not sequence !", name, caller, name);
493

494 495
	if (elm != (SeqTable) NULL)
	{
496 497 498

		/*
		 * We are using a seqtable entry left over from a previous xact;
499 500 501 502
		 * must check for relid change.
		 */
		elm->rel = seqrel;
		if (RelationGetRelid(seqrel) != elm->relid)
503 504
		{
			elog(NOTICE, "%s.%s: sequence was re-created",
505
				 name, caller);
506
			elm->relid = RelationGetRelid(seqrel);
507 508 509 510 511
			elm->cached = elm->last = elm->increment = 0;
		}
	}
	else
	{
512 513 514

		/*
		 * Time to make a new seqtable entry.  These entries live as long
515 516 517 518 519 520 521 522 523 524
		 * as the backend does, so we use plain malloc for them.
		 */
		elm = (SeqTable) malloc(sizeof(SeqTableData));
		elm->name = malloc(strlen(name) + 1);
		strcpy(elm->name, name);
		elm->rel = seqrel;
		elm->relid = RelationGetRelid(seqrel);
		elm->cached = elm->last = elm->increment = 0;
		elm->next = (SeqTable) NULL;

525 526 527
		if (seqtab == (SeqTable) NULL)
			seqtab = elm;
		else
528
			prev->next = elm;
529 530
	}

531
	return elm;
532 533 534 535
}


/*
B
Bruce Momjian 已提交
536
 * CloseSequences
537
 *				is calling by xact mgr at commit/abort.
538 539
 */
void
540
CloseSequences(void)
541
{
542 543
	SeqTable	elm;
	Relation	rel;
544

545
	for (elm = seqtab; elm != (SeqTable) NULL; elm = elm->next)
546
	{
547 548 549 550
		if (elm->rel != (Relation) NULL)		/* opened in current xact */
		{
			rel = elm->rel;
			elm->rel = (Relation) NULL;
551
			heap_close(rel, AccessShareLock);
552 553
		}
	}
554 555 556
}


557
static void
558
init_params(CreateSeqStmt *seq, Form_pg_sequence new)
559
{
560 561 562 563 564 565
	DefElem    *last_value = NULL;
	DefElem    *increment_by = NULL;
	DefElem    *max_value = NULL;
	DefElem    *min_value = NULL;
	DefElem    *cache_value = NULL;
	List	   *option;
566 567 568 569

	new->is_cycled = 'f';
	foreach(option, seq->options)
	{
570
		DefElem    *defel = (DefElem *) lfirst(option);
571 572 573 574 575 576 577 578 579 580 581 582 583 584

		if (!strcasecmp(defel->defname, "increment"))
			increment_by = defel;
		else if (!strcasecmp(defel->defname, "start"))
			last_value = defel;
		else if (!strcasecmp(defel->defname, "maxvalue"))
			max_value = defel;
		else if (!strcasecmp(defel->defname, "minvalue"))
			min_value = defel;
		else if (!strcasecmp(defel->defname, "cache"))
			cache_value = defel;
		else if (!strcasecmp(defel->defname, "cycle"))
		{
			if (defel->arg != (Node *) NULL)
585
				elog(ERROR, "DefineSequence: CYCLE ??");
586 587 588
			new->is_cycled = 't';
		}
		else
589
			elog(ERROR, "DefineSequence: option \"%s\" not recognized",
590 591 592 593 594 595
				 defel->defname);
	}

	if (increment_by == (DefElem *) NULL)		/* INCREMENT BY */
		new->increment_by = 1;
	else if ((new->increment_by = get_param(increment_by)) == 0)
596
		elog(ERROR, "DefineSequence: can't INCREMENT by 0");
597 598

	if (max_value == (DefElem *) NULL)	/* MAXVALUE */
599
	{
600 601 602 603
		if (new->increment_by > 0)
			new->max_value = SEQ_MAXVALUE;		/* ascending seq */
		else
			new->max_value = -1;/* descending seq */
604
	}
605
	else
606
		new->max_value = get_param(max_value);
607

608
	if (min_value == (DefElem *) NULL)	/* MINVALUE */
609
	{
610 611 612 613
		if (new->increment_by > 0)
			new->min_value = 1; /* ascending seq */
		else
			new->min_value = SEQ_MINVALUE;		/* descending seq */
614
	}
615
	else
616 617 618
		new->min_value = get_param(min_value);

	if (new->min_value >= new->max_value)
619
		elog(ERROR, "DefineSequence: MINVALUE (%d) can't be >= MAXVALUE (%d)",
620 621 622
			 new->min_value, new->max_value);

	if (last_value == (DefElem *) NULL) /* START WITH */
623
	{
624 625 626 627
		if (new->increment_by > 0)
			new->last_value = new->min_value;	/* ascending seq */
		else
			new->last_value = new->max_value;	/* descending seq */
628
	}
629
	else
630 631 632
		new->last_value = get_param(last_value);

	if (new->last_value < new->min_value)
633
		elog(ERROR, "DefineSequence: START value (%d) can't be < MINVALUE (%d)",
634 635
			 new->last_value, new->min_value);
	if (new->last_value > new->max_value)
636
		elog(ERROR, "DefineSequence: START value (%d) can't be > MAXVALUE (%d)",
637 638 639 640 641
			 new->last_value, new->max_value);

	if (cache_value == (DefElem *) NULL)		/* CACHE */
		new->cache_value = 1;
	else if ((new->cache_value = get_param(cache_value)) <= 0)
642
		elog(ERROR, "DefineSequence: CACHE (%d) can't be <= 0",
643
			 new->cache_value);
644 645 646 647 648

}


static int
649
get_param(DefElem *def)
650
{
651
	if (def->arg == (Node *) NULL)
652
		elog(ERROR, "DefineSequence: \"%s\" value unspecified", def->defname);
653 654

	if (nodeTag(def->arg) == T_Integer)
655
		return intVal(def->arg);
656

657
	elog(ERROR, "DefineSequence: \"%s\" is to be integer", def->defname);
658
	return -1;
659
}