creatinh.c 16.4 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * creatinh.c
4
 *	  POSTGRES create/destroy relation with inheritance utility code.
5 6 7 8 9
 *
 * Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
10
 *	  $Header: /cvsroot/pgsql/src/backend/commands/Attic/creatinh.c,v 1.49 1999/10/15 01:49:39 momjian Exp $
11 12 13 14
 *
 *-------------------------------------------------------------------------
 */

15
#include "postgres.h"
M
Marc G. Fournier 已提交
16

17 18
#include "access/heapam.h"
#include "catalog/catname.h"
19
#include "catalog/indexing.h"
B
Bruce Momjian 已提交
20
#include "catalog/heap.h"
21 22
#include "catalog/pg_inherits.h"
#include "catalog/pg_ipl.h"
B
Bruce Momjian 已提交
23
#include "catalog/pg_type.h"
24
#include "catalog/pg_description.h"
B
Bruce Momjian 已提交
25 26
#include "commands/creatinh.h"
#include "utils/syscache.h"
27 28

/* ----------------
29
 *		local stuff
30 31 32
 * ----------------
 */

33
static int checkAttrExists(char *attributeName,
34 35 36
				char *attributeType, List *schema);
static List *MergeAttributes(List *schema, List *supers, List **supconstr);
static void StoreCatalogInheritance(Oid relationId, List *supers);
37 38

/* ----------------------------------------------------------------
B
Bruce Momjian 已提交
39
 *		DefineRelation
40
 *				Creates a new relation.
41 42 43
 * ----------------------------------------------------------------
 */
void
44
DefineRelation(CreateStmt *stmt, char relkind)
45
{
46 47 48 49
	char	   *relname = palloc(NAMEDATALEN);
	List	   *schema = stmt->tableElts;
	int			numberOfAttributes;
	Oid			relationId;
50 51
	Relation	rel;
	List	   *inheritList;
52
	TupleDesc	descriptor;
53 54 55 56 57
	List	   *old_constraints;
	List	   *rawDefaults;
	List	   *listptr;
	int			i;
	AttrNumber	attnum;
58 59

	if (strlen(stmt->relname) >= NAMEDATALEN)
60 61 62
		elog(ERROR, "the relation name %s is >= %d characters long",
			 stmt->relname, NAMEDATALEN);
	StrNCpy(relname, stmt->relname, NAMEDATALEN);
63 64 65 66 67 68 69 70 71 72 73 74

	/* ----------------
	 *	Handle parameters
	 *	XXX parameter handling missing below.
	 * ----------------
	 */
	inheritList = stmt->inhRelnames;

	/* ----------------
	 *	generate relation schema, including inherited attributes.
	 * ----------------
	 */
75
	schema = MergeAttributes(schema, inheritList, &old_constraints);
76 77 78 79

	numberOfAttributes = length(schema);
	if (numberOfAttributes <= 0)
	{
80
		elog(ERROR, "DefineRelation: %s",
81
			 "please inherit from a relation or define an attribute");
82
	}
83 84 85

	/* ----------------
	 *	create a relation descriptor from the relation schema
86 87 88 89 90
	 *	and create the relation.  Note that in this stage only
	 *	inherited (pre-cooked) defaults and constraints will be
	 *	included into the new relation.  (BuildDescForRelation
	 *	takes care of the inherited defaults, but we have to copy
	 *	inherited constraints here.)
91 92 93 94
	 * ----------------
	 */
	descriptor = BuildDescForRelation(schema, relname);

95
	if (old_constraints != NIL)
96
	{
97 98 99
		ConstrCheck *check = (ConstrCheck *) palloc(length(old_constraints) *
													sizeof(ConstrCheck));
		int			ncheck = 0;
100

101
		foreach(listptr, old_constraints)
102
		{
103
			Constraint *cdef = (Constraint *) lfirst(listptr);
104

105 106 107 108
			if (cdef->contype != CONSTR_CHECK)
				continue;

			if (cdef->name != NULL)
109
			{
110
				for (i = 0; i < ncheck; i++)
111
				{
112 113 114
					if (strcmp(check[i].ccname, cdef->name) == 0)
						elog(ERROR, "Duplicate CHECK constraint name: '%s'",
							 cdef->name);
115
				}
116 117 118 119 120 121
				check[ncheck].ccname = cdef->name;
			}
			else
			{
				check[ncheck].ccname = (char *) palloc(NAMEDATALEN);
				snprintf(check[ncheck].ccname, NAMEDATALEN, "$%d", ncheck + 1);
122
			}
123 124 125
			Assert(cdef->raw_expr == NULL && cdef->cooked_expr != NULL);
			check[ncheck].ccbin = pstrdup(cdef->cooked_expr);
			ncheck++;
126 127 128 129 130 131
		}
		if (ncheck > 0)
		{
			if (descriptor->constr == NULL)
			{
				descriptor->constr = (TupleConstr *) palloc(sizeof(TupleConstr));
132
				descriptor->constr->defval = NULL;
133 134 135 136 137 138 139 140
				descriptor->constr->num_defval = 0;
				descriptor->constr->has_not_null = false;
			}
			descriptor->constr->num_check = ncheck;
			descriptor->constr->check = check;
		}
	}

141 142
	relationId = heap_create_with_catalog(relname, descriptor,
										  relkind, stmt->istemp);
143 144

	StoreCatalogInheritance(relationId, inheritList);
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199

	/*
	 * Now add any newly specified column default values
	 * and CHECK constraints to the new relation.  These are passed
	 * to us in the form of raw parsetrees; we need to transform
	 * them to executable expression trees before they can be added.
	 * The most convenient way to do that is to apply the parser's
	 * transformExpr routine, but transformExpr doesn't work unless
	 * we have a pre-existing relation.  So, the transformation has
	 * to be postponed to this final step of CREATE TABLE.
	 *
	 * First, scan schema to find new column defaults.
	 */
	rawDefaults = NIL;
	attnum = 0;

	foreach(listptr, schema)
	{
		ColumnDef  *colDef = lfirst(listptr);
		RawColumnDefault *rawEnt;

		attnum++;

		if (colDef->raw_default == NULL)
			continue;
		Assert(colDef->cooked_default == NULL);

		rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
		rawEnt->attnum = attnum;
		rawEnt->raw_default = colDef->raw_default;
		rawDefaults = lappend(rawDefaults, rawEnt);
	}

	/* If no raw defaults and no constraints, nothing to do. */
	if (rawDefaults == NIL && stmt->constraints == NIL)
		return;

	/*
	 * We must bump the command counter to make the newly-created
	 * relation tuple visible for opening.
	 */
	CommandCounterIncrement();
	/*
	 * Open the new relation.
	 */
	rel = heap_openr(relname, AccessExclusiveLock);
	/*
	 * Parse and add the defaults/constraints.
	 */
	AddRelationRawConstraints(rel, rawDefaults, stmt->constraints);
	/*
	 * Clean up.  We keep lock on new relation (although it shouldn't
	 * be visible to anyone else anyway, until commit).
	 */
	heap_close(rel, NoLock);
200 201 202
}

/*
B
Bruce Momjian 已提交
203
 * RemoveRelation
204
 *		Deletes a new relation.
205 206
 *
 * Exceptions:
207
 *		BadArg if name is invalid.
208 209
 *
 * Note:
210
 *		If the relation has indices defined on it, then the index relations
211 212 213 214 215
 * themselves will be destroyed, too.
 */
void
RemoveRelation(char *name)
{
216
	AssertArg(name);
217
	heap_destroy_with_catalog(name);
218 219
}

220 221 222 223 224 225 226 227 228 229 230 231 232
/*
 * TruncateRelation --
 *                Removes all the rows from a relation
 *
 * Exceptions:
 *                BadArg if name is invalid
 *
 * Note:
 *                Rows are removed, indices are truncated and reconstructed.
 */
void
TruncateRelation(char *name)
{
233 234
	AssertArg(name);
	heap_truncate(name);
235
}
236

237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282
/*------------------------------------------------------------------
 * CommentRelation --
 *                Adds a comment to pg_description for the associated
 *                relation or relation attribute.
 *
 * Note:           
 *                The comment is dropped on the relation or attribute if
 *                the comment is an empty string.
 *------------------------------------------------------------------
 */
void 
CommentRelation(char *relname, char *attrname, char *comments) 
{

  Relation relation;
  HeapTuple attrtuple;
  Oid oid;

  /*** First ensure relname is valid ***/

  relation = heap_openr(relname, AccessShareLock);
  
  /*** Now, if an attribute was specified, fetch its oid, else use relation's oid ***/
  
  if (attrname != NULL) {
    attrtuple = SearchSysCacheTuple(ATTNAME, ObjectIdGetDatum(relation->rd_id),
				    PointerGetDatum(attrname), 0, 0);
    if (!HeapTupleIsValid(attrtuple)) {
      elog(ERROR, "CommentRelation: attribute \"%s\" is not an attribute of relation \"%s\"",
	   attrname, relname);
    }
    oid = attrtuple->t_data->t_oid;
  } else {
    oid = RelationGetRelid(relation);
  }
  
  /*** Call CreateComments() to create/drop the comments ***/

  CreateComments(oid, comments);

  /*** Now, close the heap relation ***/

  heap_close(relation, AccessShareLock); 

}

283
/*
B
Bruce Momjian 已提交
284
 * MergeAttributes
285
 *		Returns new schema given initial schema and supers.
286 287 288
 *
 *
 * 'schema' is the column/attribute definition for the table. (It's a list
289
 *		of ColumnDef's.) It is destructively changed.
290 291 292
 * 'inheritList' is the list of inherited relations (a list of Value(str)'s).
 *
 * Notes:
293 294 295 296
 *	  The order in which the attributes are inherited is very important.
 *	  Intuitively, the inherited attributes should come first. If a table
 *	  inherits from multiple parents, the order of those attributes are
 *	  according to the order of the parents specified in CREATE TABLE.
297
 *
298
 *	  Here's an example:
299
 *
300
 *		create table person (name text, age int4, location point);
301
 *		create table emp (salary int4, manager text) inherits(person);
302 303
 *		create table student (gpa float8) inherits (person);
 *		create table stud_emp (percent int4) inherits (emp, student);
304
 *
305
 *	  the order of the attributes of stud_emp is as follow:
306 307
 *
 *
308 309 310 311 312
 *							person {1:name, 2:age, 3:location}
 *							/	 \
 *			   {6:gpa}	student   emp {4:salary, 5:manager}
 *							\	 /
 *						   stud_emp {7:percent}
313
 */
314
static List *
315
MergeAttributes(List *schema, List *supers, List **supconstr)
316
{
317 318 319
	List	   *entry;
	List	   *inhSchema = NIL;
	List	   *constraints = NIL;
320 321 322 323 324 325

	/*
	 * Validates that there are no duplications. Validity checking of
	 * types occurs later.
	 */
	foreach(entry, schema)
326
	{
327
		ColumnDef  *coldef = lfirst(entry);
328
		List	   *rest;
329 330 331 332

		foreach(rest, lnext(entry))
		{
			/*
333
			 * check for duplicated names within the new relation
334
			 */
335
			ColumnDef  *restdef = lfirst(rest);
336 337 338

			if (!strcmp(coldef->colname, restdef->colname))
			{
339
				elog(ERROR, "attribute '%s' duplicated",
340 341 342
					 coldef->colname);
			}
		}
343
	}
344
	foreach(entry, supers)
345
	{
346
		List	   *rest;
347 348 349 350 351

		foreach(rest, lnext(entry))
		{
			if (!strcmp(strVal(lfirst(entry)), strVal(lfirst(rest))))
			{
352
				elog(ERROR, "relation '%s' duplicated",
353 354 355
					 strVal(lfirst(entry)));
			}
		}
356
	}
357

358
	/*
359
	 * merge the inherited attributes into the schema
360
	 */
361 362
	foreach(entry, supers)
	{
363 364 365 366 367 368
		char	   *name = strVal(lfirst(entry));
		Relation	relation;
		List	   *partialResult = NIL;
		AttrNumber	attrno;
		TupleDesc	tupleDesc;
		TupleConstr *constr;
369

370
		relation = heap_openr(name, AccessShareLock);
371
		tupleDesc = RelationGetDescr(relation);
372 373
		constr = tupleDesc->constr;

374 375 376 377
		/* XXX shouldn't this test be stricter?  No indexes, for example? */
		if (relation->rd_rel->relkind == 'S')
			elog(ERROR, "MergeAttr: Can't inherit from sequence superclass '%s'", name);

378 379
		for (attrno = relation->rd_rel->relnatts - 1; attrno >= 0; attrno--)
		{
380
			Form_pg_attribute attribute = tupleDesc->attrs[attrno];
381 382 383 384 385
			char	   *attributeName;
			char	   *attributeType;
			HeapTuple	tuple;
			ColumnDef  *def;
			TypeName   *typename;
386 387 388 389 390

			/*
			 * form name, type and constraints
			 */
			attributeName = (attribute->attname).data;
391
			tuple = SearchSysCacheTuple(TYPOID,
B
Bruce Momjian 已提交
392 393
								   ObjectIdGetDatum(attribute->atttypid),
										0, 0, 0);
394
			Assert(HeapTupleIsValid(tuple));
395
			attributeType = (((Form_pg_type) GETSTRUCT(tuple))->typname).data;
396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417

			/*
			 * check validity
			 *
			 */
			if (checkAttrExists(attributeName, attributeType, inhSchema) ||
				checkAttrExists(attributeName, attributeType, schema))
			{

				/*
				 * this entry already exists
				 */
				continue;
			}

			/*
			 * add an entry to the schema
			 */
			def = makeNode(ColumnDef);
			typename = makeNode(TypeName);
			def->colname = pstrdup(attributeName);
			typename->name = pstrdup(attributeType);
418
			typename->typmod = attribute->atttypmod;
419 420
			def->typename = typename;
			def->is_not_null = attribute->attnotnull;
421 422
			def->raw_default = NULL;
			def->cooked_default = NULL;
423 424
			if (attribute->atthasdef)
			{
425
				AttrDefault *attrdef;
426
				int			i;
427

428
				Assert(constr != NULL);
429

430
				attrdef = constr->defval;
431 432
				for (i = 0; i < constr->num_defval; i++)
				{
433 434 435 436 437
					if (attrdef[i].adnum == attrno + 1)
					{
						def->cooked_default = pstrdup(attrdef[i].adbin);
						break;
					}
438
				}
439
				Assert(def->cooked_default != NULL);
440 441 442 443 444 445
			}
			partialResult = lcons(def, partialResult);
		}

		if (constr && constr->num_check > 0)
		{
446 447
			ConstrCheck *check = constr->check;
			int			i;
448 449 450

			for (i = 0; i < constr->num_check; i++)
			{
451
				Constraint *cdef = makeNode(Constraint);
452

453
				cdef->contype = CONSTR_CHECK;
454 455 456 457
				if (check[i].ccname[0] == '$')
					cdef->name = NULL;
				else
					cdef->name = pstrdup(check[i].ccname);
458 459
				cdef->raw_expr = NULL;
				cdef->cooked_expr = pstrdup(check[i].ccbin);
460 461 462 463 464
				constraints = lappend(constraints, cdef);
			}
		}

		/*
465 466 467
		 * Close the parent rel, but keep our AccessShareLock on it until
		 * xact commit.  That will prevent someone else from deleting or
		 * ALTERing the parent before the child is committed.
468
		 */
469
		heap_close(relation, NoLock);
470 471 472 473 474 475 476

		/*
		 * wants the inherited schema to appear in the order they are
		 * specified in CREATE TABLE
		 */
		inhSchema = nconc(inhSchema, partialResult);
	}
477 478

	/*
479
	 * put the inherited schema before our the schema for this table
480
	 */
481 482
	schema = nconc(inhSchema, schema);
	*supconstr = constraints;
483
	return schema;
484 485 486
}

/*
B
Bruce Momjian 已提交
487
 * StoreCatalogInheritance
488
 *		Updates the system catalogs with proper inheritance information.
489 490
 */
static void
491
StoreCatalogInheritance(Oid relationId, List *supers)
492
{
493 494 495 496 497 498
	Relation	relation;
	TupleDesc	desc;
	int16		seqNumber;
	List	   *entry;
	List	   *idList;
	HeapTuple	tuple;
499 500 501 502 503 504 505 506 507 508 509 510 511 512

	/* ----------------
	 *	sanity checks
	 * ----------------
	 */
	AssertArg(OidIsValid(relationId));

	if (supers == NIL)
		return;

	/* ----------------
	 * Catalog INHERITS information.
	 * ----------------
	 */
513
	relation = heap_openr(InheritsRelationName, RowExclusiveLock);
514
	desc = RelationGetDescr(relation);
515 516 517 518 519

	seqNumber = 1;
	idList = NIL;
	foreach(entry, supers)
	{
520 521
		Datum		datum[Natts_pg_inherits];
		char		nullarr[Natts_pg_inherits];
522 523

		tuple = SearchSysCacheTuple(RELNAME,
524
								  PointerGetDatum(strVal(lfirst(entry))),
525 526 527 528 529 530
									0, 0, 0);
		AssertArg(HeapTupleIsValid(tuple));

		/*
		 * build idList for use below
		 */
531
		idList = lappendi(idList, tuple->t_data->t_oid);
532

B
Bruce Momjian 已提交
533 534 535
		datum[0] = ObjectIdGetDatum(relationId);		/* inhrel */
		datum[1] = ObjectIdGetDatum(tuple->t_data->t_oid);		/* inhparent */
		datum[2] = Int16GetDatum(seqNumber);	/* inhseqno */
536 537 538 539 540 541 542 543 544 545 546

		nullarr[0] = ' ';
		nullarr[1] = ' ';
		nullarr[2] = ' ';

		tuple = heap_formtuple(desc, datum, nullarr);

		heap_insert(relation, tuple);
		pfree(tuple);

		seqNumber += 1;
547
	}
548

549
	heap_close(relation, RowExclusiveLock);
550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567

	/* ----------------
	 * Catalog IPL information.
	 *
	 * Algorithm:
	 *	0. list superclasses (by Oid) in order given (see idList).
	 *	1. append after each relationId, its superclasses, recursively.
	 *	3. remove all but last of duplicates.
	 *	4. store result.
	 * ----------------
	 */

	/* ----------------
	 *	1.
	 * ----------------
	 */
	foreach(entry, idList)
	{
568 569 570 571 572
		HeapTuple	tuple;
		Oid			id;
		int16		number;
		List	   *next;
		List	   *current;
573 574 575 576 577 578 579 580 581 582 583 584 585 586 587

		id = (Oid) lfirsti(entry);
		current = entry;
		next = lnext(entry);

		for (number = 1;; number += 1)
		{
			tuple = SearchSysCacheTuple(INHRELID,
										ObjectIdGetDatum(id),
										Int16GetDatum(number),
										0, 0);

			if (!HeapTupleIsValid(tuple))
				break;

588
			lnext(current) = lconsi(((Form_pg_inherits)
B
Bruce Momjian 已提交
589 590
									 GETSTRUCT(tuple))->inhparent,
									NIL);
591 592 593 594

			current = lnext(current);
		}
		lnext(current) = next;
595
	}
596 597 598 599 600 601 602

	/* ----------------
	 *	2.
	 * ----------------
	 */
	foreach(entry, idList)
	{
603 604 605
		Oid			name;
		List	   *rest;
		bool		found = false;
606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630

again:
		name = lfirsti(entry);
		foreach(rest, lnext(entry))
		{
			if (name == lfirsti(rest))
			{
				found = true;
				break;
			}
		}
		if (found)
		{

			/*
			 * entry list must be of length >= 2 or else no match
			 *
			 * so, remove this entry.
			 */
			lfirst(entry) = lfirst(lnext(entry));
			lnext(entry) = lnext(lnext(entry));

			found = false;
			goto again;
		}
631
	}
632 633 634 635 636

	/* ----------------
	 *	3.
	 * ----------------
	 */
637
	relation = heap_openr(InheritancePrecidenceListRelationName, RowExclusiveLock);
638
	desc = RelationGetDescr(relation);
639 640 641 642 643

	seqNumber = 1;

	foreach(entry, idList)
	{
644 645
		Datum		datum[Natts_pg_ipl];
		char		nullarr[Natts_pg_ipl];
646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663

		datum[0] = ObjectIdGetDatum(relationId);		/* iplrel */
		datum[1] = ObjectIdGetDatum(lfirsti(entry));
		/* iplinherits */
		datum[2] = Int16GetDatum(seqNumber);	/* iplseqno */

		nullarr[0] = ' ';
		nullarr[1] = ' ';
		nullarr[2] = ' ';

		tuple = heap_formtuple(desc, datum, nullarr);

		heap_insert(relation, tuple);
		pfree(tuple);

		seqNumber += 1;
	}

664
	heap_close(relation, RowExclusiveLock);
665 666 667 668 669 670
}

/*
 * returns 1 if attribute already exists in schema, 0 otherwise.
 */
static int
671
checkAttrExists(char *attributeName, char *attributeType, List *schema)
672
{
673
	List	   *s;
674 675 676

	foreach(s, schema)
	{
677
		ColumnDef  *def = lfirst(s);
678 679 680 681 682 683 684 685 686

		if (!strcmp(attributeName, def->colname))
		{

			/*
			 * attribute exists. Make sure the types are the same.
			 */
			if (strcmp(attributeType, def->typename->name) != 0)
			{
687
				elog(ERROR, "%s and %s conflict for %s",
688 689 690 691
					 attributeType, def->typename->name, attributeName);
			}
			return 1;
		}
692
	}
693
	return 0;
694
}