indexcmds.c 22.7 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * indexcmds.c
4
 *	  POSTGRES define and remove index code.
5
 *
B
Bruce Momjian 已提交
6
 * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
B
Add:  
Bruce Momjian 已提交
7
 * Portions Copyright (c) 1994, Regents of the University of California
8 9 10
 *
 *
 * IDENTIFICATION
11
 *	  $Header: /cvsroot/pgsql/src/backend/commands/indexcmds.c,v 1.98 2003/05/02 20:54:33 tgl Exp $
12 13 14
 *
 *-------------------------------------------------------------------------
 */
15

16
#include "postgres.h"
17

18
#include "access/heapam.h"
19
#include "catalog/catalog.h"
H
Hiroshi Inoue 已提交
20
#include "catalog/catname.h"
21
#include "catalog/dependency.h"
22
#include "catalog/heap.h"
23
#include "catalog/index.h"
24
#include "catalog/namespace.h"
B
Bruce Momjian 已提交
25
#include "catalog/pg_opclass.h"
26
#include "catalog/pg_proc.h"
27
#include "commands/defrem.h"
28
#include "commands/tablecmds.h"
29
#include "executor/executor.h"
30
#include "miscadmin.h"
31
#include "optimizer/clauses.h"
B
Bruce Momjian 已提交
32 33
#include "optimizer/prep.h"
#include "parser/parsetree.h"
34
#include "parser/parse_coerce.h"
35
#include "parser/parse_func.h"
36
#include "utils/acl.h"
B
Bruce Momjian 已提交
37
#include "utils/builtins.h"
38
#include "utils/lsyscache.h"
B
Bruce Momjian 已提交
39
#include "utils/syscache.h"
40

41

42
#define IsFuncIndex(ATTR_LIST) (((IndexElem*)lfirst(ATTR_LIST))->funcname != NIL)
43 44

/* non-export function prototypes */
45
static void CheckPredicate(List *predList, List *rangeTable, Oid baseRelOid);
46
static void FuncIndexArgs(IndexInfo *indexInfo, Oid *classOidP,
B
Bruce Momjian 已提交
47 48 49
			  IndexElem *funcIndex,
			  Oid relId,
			  char *accessMethodName, Oid accessMethodId);
50
static void NormIndexAttrs(IndexInfo *indexInfo, Oid *classOidP,
B
Bruce Momjian 已提交
51 52 53 54 55
			   List *attList,
			   Oid relId,
			   char *accessMethodName, Oid accessMethodId);
static Oid GetAttrOpClass(IndexElem *attribute, Oid attrType,
			   char *accessMethodName, Oid accessMethodId);
56
static Oid	GetDefaultOpClass(Oid attrType, Oid accessMethodId);
57 58

/*
B
Bruce Momjian 已提交
59
 * DefineIndex
60
 *		Creates a new index.
61 62
 *
 * 'attributeList' is a list of IndexElem specifying either a functional
63
 *		index or a list of attributes to index on.
64
 * 'predicate' is the qual specified in the where clause.
65
 * 'rangetable' is needed to interpret the predicate.
66 67
 */
void
68
DefineIndex(RangeVar *heapRelation,
69 70
			char *indexRelationName,
			char *accessMethodName,
71
			List *attributeList,
72
			bool unique,
73
			bool primary,
74
			bool isconstraint,
75 76
			Expr *predicate,
			List *rangetable)
77
{
78 79 80
	Oid		   *classObjectId;
	Oid			accessMethodId;
	Oid			relationId;
81
	Oid			namespaceId;
82
	Relation	rel;
83 84
	HeapTuple	tuple;
	Form_pg_am	accessMethodForm;
85
	IndexInfo  *indexInfo;
86
	int			numberOfAttributes;
87
	List	   *cnfPred = NIL;
88 89

	/*
90
	 * count attributes in index
91 92 93
	 */
	numberOfAttributes = length(attributeList);
	if (numberOfAttributes <= 0)
94
		elog(ERROR, "DefineIndex: must specify at least one attribute");
95 96 97
	if (numberOfAttributes > INDEX_MAX_KEYS)
		elog(ERROR, "Cannot use more than %d attributes in an index",
			 INDEX_MAX_KEYS);
98 99

	/*
100
	 * Open heap relation, acquire a suitable lock on it, remember its OID
101
	 */
102
	rel = heap_openrv(heapRelation, ShareLock);
103 104 105 106 107

	/* Note: during bootstrap may see uncataloged relation */
	if (rel->rd_rel->relkind != RELKIND_RELATION &&
		rel->rd_rel->relkind != RELKIND_UNCATALOGED)
		elog(ERROR, "DefineIndex: relation \"%s\" is not a table",
108
			 heapRelation->relname);
109

110
	relationId = RelationGetRelid(rel);
111
	namespaceId = RelationGetNamespace(rel);
112 113

	if (!IsBootstrapProcessingMode() &&
114
		IsSystemRelation(rel) &&
115
		!IndexesAreActive(rel))
116 117
		elog(ERROR, "Existing indexes are inactive. REINDEX first");

118 119
	heap_close(rel, NoLock);

120 121
	/*
	 * Verify we (still) have CREATE rights in the rel's namespace.
B
Bruce Momjian 已提交
122 123 124
	 * (Presumably we did when the rel was created, but maybe not
	 * anymore.) Skip check if bootstrapping, since permissions machinery
	 * may not be working yet.
125
	 */
126
	if (!IsBootstrapProcessingMode())
127 128 129 130 131 132 133 134 135
	{
		AclResult	aclresult;

		aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
										  ACL_CREATE);
		if (aclresult != ACLCHECK_OK)
			aclcheck_error(aclresult, get_namespace_name(namespaceId));
	}

136
	/*
137 138
	 * look up the access method, verify it can handle the requested
	 * features
139
	 */
140 141 142 143
	tuple = SearchSysCache(AMNAME,
						   PointerGetDatum(accessMethodName),
						   0, 0, 0);
	if (!HeapTupleIsValid(tuple))
144
		elog(ERROR, "DefineIndex: access method \"%s\" not found",
145
			 accessMethodName);
146
	accessMethodId = HeapTupleGetOid(tuple);
147
	accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
148

149
	if (unique && !accessMethodForm->amcanunique)
150 151
		elog(ERROR, "DefineIndex: access method \"%s\" does not support UNIQUE indexes",
			 accessMethodName);
152
	if (numberOfAttributes > 1 && !accessMethodForm->amcanmulticol)
153 154
		elog(ERROR, "DefineIndex: access method \"%s\" does not support multi-column indexes",
			 accessMethodName);
155

156
	ReleaseSysCache(tuple);
157

158
	/*
159 160
	 * Convert the partial-index predicate from parsetree form to an
	 * implicit-AND qual expression, for easier evaluation at runtime.
161 162
	 * While we are at it, we reduce it to a canonical (CNF or DNF) form
	 * to simplify the task of proving implications.
163
	 */
164
	if (predicate)
165
	{
166
		cnfPred = canonicalize_qual((Expr *) copyObject(predicate), true);
167 168 169
		CheckPredicate(cnfPred, rangetable, relationId);
	}

170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
	/*
	 * Check that all of the attributes in a primary key are marked
	 * as not null, otherwise attempt to ALTER TABLE .. SET NOT NULL
	 */
	if (primary && !IsFuncIndex(attributeList))
	{
		List   *keys;

		foreach(keys, attributeList)
		{
			IndexElem   *key = (IndexElem *) lfirst(keys);
			HeapTuple	atttuple;

			/* System attributes are never null, so no problem */
			if (SystemAttributeByName(key->name, rel->rd_rel->relhasoids))
				continue;

			atttuple = SearchSysCacheAttName(relationId, key->name);
			if (HeapTupleIsValid(atttuple))
			{
				if (! ((Form_pg_attribute) GETSTRUCT(atttuple))->attnotnull)
				{
					/*
					 * Try to make it NOT NULL.
					 *
					 * XXX: Shouldn't the ALTER TABLE .. SET NOT NULL cascade
					 * to child tables?  Currently, since the PRIMARY KEY
					 * itself doesn't cascade, we don't cascade the notnull
					 * constraint either; but this is pretty debatable.
					 */
					AlterTableAlterColumnSetNotNull(relationId, false,
													key->name);
				}
				ReleaseSysCache(atttuple);
			}
			else
			{
				/* This shouldn't happen if parser did its job ... */
				elog(ERROR, "DefineIndex: column \"%s\" named in key does not exist",
					 key->name);
			}
		}
	}

214
	/*
B
Bruce Momjian 已提交
215 216
	 * Prepare arguments for index_create, primarily an IndexInfo
	 * structure
217 218
	 */
	indexInfo = makeNode(IndexInfo);
219
	indexInfo->ii_Predicate = cnfPred;
220
	indexInfo->ii_PredicateState = NIL;
221 222 223
	indexInfo->ii_FuncOid = InvalidOid;
	indexInfo->ii_Unique = unique;

224 225
	if (IsFuncIndex(attributeList))
	{
226
		IndexElem  *funcIndex = (IndexElem *) lfirst(attributeList);
227
		int			nargs;
228

229 230 231 232
		/* Parser should have given us only one list item, but check */
		if (numberOfAttributes != 1)
			elog(ERROR, "Functional index can only have one attribute");

233 234
		nargs = length(funcIndex->args);
		if (nargs > INDEX_MAX_KEYS)
235 236
			elog(ERROR, "Index function can take at most %d arguments",
				 INDEX_MAX_KEYS);
237

238 239
		indexInfo->ii_NumIndexAttrs = 1;
		indexInfo->ii_NumKeyAttrs = nargs;
240

241
		classObjectId = (Oid *) palloc(sizeof(Oid));
242

243 244
		FuncIndexArgs(indexInfo, classObjectId, funcIndex,
					  relationId, accessMethodName, accessMethodId);
245 246 247
	}
	else
	{
248 249
		indexInfo->ii_NumIndexAttrs = numberOfAttributes;
		indexInfo->ii_NumKeyAttrs = numberOfAttributes;
250

251
		classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
252

253 254
		NormIndexAttrs(indexInfo, classObjectId, attributeList,
					   relationId, accessMethodName, accessMethodId);
255
	}
256

257
	index_create(relationId, indexRelationName,
258
				 indexInfo, accessMethodId, classObjectId,
259
				 primary, isconstraint, allowSystemTableMods);
260

261 262
	/*
	 * We update the relation's pg_class tuple even if it already has
B
Bruce Momjian 已提交
263
	 * relhasindex = true.	This is needed to cause a shared-cache-inval
264 265 266 267
	 * message to be sent for the pg_class tuple, which will cause other
	 * backends to flush their relcache entries and in particular their
	 * cached lists of the indexes for this relation.
	 */
268
	setRelhasindex(relationId, true, primary, InvalidOid);
269 270 271 272 273
}


/*
 * CheckPredicate
274
 *		Checks that the given list of partial-index predicates refer
275 276 277
 *		(via the given range table) only to the given base relation oid.
 *
 * This used to also constrain the form of the predicate to forms that
278
 * indxpath.c could do something with.	However, that seems overly
279 280 281 282
 * restrictive.  One useful application of partial indexes is to apply
 * a UNIQUE constraint across a subset of a table, and in that scenario
 * any evaluatable predicate will work.  So accept any predicate here
 * (except ones requiring a plan), and let indxpath.c fend for itself.
283 284 285
 */

static void
286
CheckPredicate(List *predList, List *rangeTable, Oid baseRelOid)
287
{
288
	if (length(rangeTable) != 1 || getrelid(1, rangeTable) != baseRelOid)
289
		elog(ERROR,
290
		 "Partial-index predicates may refer only to the base relation");
291

292 293
	/*
	 * We don't currently support generation of an actual query plan for a
294 295
	 * predicate, only simple scalar expressions; hence these
	 * restrictions.
296
	 */
297 298 299 300
	if (contain_subplans((Node *) predList))
		elog(ERROR, "Cannot use subselect in index predicate");
	if (contain_agg_clause((Node *) predList))
		elog(ERROR, "Cannot use aggregate in index predicate");
301 302

	/*
B
Bruce Momjian 已提交
303 304
	 * A predicate using mutable functions is probably wrong, for the same
	 * reasons that we don't allow a functional index to use one.
305
	 */
306
	if (contain_mutable_functions((Node *) predList))
307
		elog(ERROR, "Functions in index predicate must be marked IMMUTABLE");
308 309 310
}


311
static void
312 313 314
FuncIndexArgs(IndexInfo *indexInfo,
			  Oid *classOidP,
			  IndexElem *funcIndex,
315 316 317
			  Oid relId,
			  char *accessMethodName,
			  Oid accessMethodId)
318
{
319 320
	Oid			argTypes[FUNC_MAX_ARGS];
	List	   *arglist;
321 322
	int			nargs = 0;
	int			i;
323
	FuncDetailCode fdresult;
324 325 326 327
	Oid			funcid;
	Oid			rettype;
	bool		retset;
	Oid		   *true_typeids;
328 329

	/*
330 331
	 * process the function arguments, which are a list of T_String
	 * (someday ought to allow more general expressions?)
332 333
	 *
	 * Note caller already checked that list is not too long.
334
	 */
335
	MemSet(argTypes, 0, sizeof(argTypes));
336

337
	foreach(arglist, funcIndex->args)
338
	{
339
		char	   *arg = strVal(lfirst(arglist));
340
		HeapTuple	tuple;
341
		Form_pg_attribute att;
342

343
		tuple = SearchSysCacheAttName(relId, arg);
344
		if (!HeapTupleIsValid(tuple))
345
			elog(ERROR, "DefineIndex: column \"%s\" named in key does not exist", arg);
346
		att = (Form_pg_attribute) GETSTRUCT(tuple);
347 348
		indexInfo->ii_KeyAttrNumbers[nargs] = att->attnum;
		argTypes[nargs] = att->atttypid;
349
		ReleaseSysCache(tuple);
350
		nargs++;
351
	}
352

353
	/*
354
	 * Lookup the function procedure to get its OID and result type.
355
	 *
356 357 358 359 360 361
	 * We rely on parse_func.c to find the correct function in the possible
	 * presence of binary-compatible types.  However, parse_func may do
	 * too much: it will accept a function that requires run-time coercion
	 * of input types, and the executor is not currently set up to support
	 * that.  So, check to make sure that the selected function has
	 * exact-match or binary-compatible input types.
362
	 */
363
	fdresult = func_get_detail(funcIndex->funcname, funcIndex->args,
364 365 366 367 368
							   nargs, argTypes,
							   &funcid, &rettype, &retset,
							   &true_typeids);
	if (fdresult != FUNCDETAIL_NORMAL)
	{
369 370 371
		if (fdresult == FUNCDETAIL_AGGREGATE)
			elog(ERROR, "DefineIndex: functional index may not use an aggregate function");
		else if (fdresult == FUNCDETAIL_COERCION)
372 373 374
			elog(ERROR, "DefineIndex: functional index must use a real function, not a type coercion"
				 "\n\tTry specifying the index opclass you want to use, instead");
		else
375 376
			func_error("DefineIndex", funcIndex->funcname, nargs, argTypes,
					   NULL);
377
	}
378 379 380 381 382

	if (retset)
		elog(ERROR, "DefineIndex: cannot index on a function returning a set");

	for (i = 0; i < nargs; i++)
383
	{
384 385
		if (!IsBinaryCoercible(argTypes[i], true_typeids[i]))
			func_error("DefineIndex", funcIndex->funcname, nargs, true_typeids,
386
					   "Index function must be binary-compatible with table datatype");
387 388
	}

389
	/*
390
	 * Require that the function be marked immutable.  Using a mutable
391 392 393
	 * function for a functional index is highly questionable, since if
	 * you aren't going to get the same result for the same data every
	 * time, it's not clear what the index entries mean at all.
394
	 */
395
	if (func_volatile(funcid) != PROVOLATILE_IMMUTABLE)
396
		elog(ERROR, "DefineIndex: index function must be marked IMMUTABLE");
397

398 399
	/* Process opclass, using func return type as default type */

400
	classOidP[0] = GetAttrOpClass(funcIndex, rettype,
401
								  accessMethodName, accessMethodId);
402

403
	/* OK, return results */
404

405 406
	indexInfo->ii_FuncOid = funcid;
	/* Need to do the fmgr function lookup now, too */
B
Bruce Momjian 已提交
407
	fmgr_info(funcid, &indexInfo->ii_FuncInfo);
408 409
}

410
static void
411
NormIndexAttrs(IndexInfo *indexInfo,
412
			   Oid *classOidP,
413
			   List *attList,	/* list of IndexElem's */
414 415 416
			   Oid relId,
			   char *accessMethodName,
			   Oid accessMethodId)
417
{
418
	List	   *rest;
419
	int			attn = 0;
M
Marc G. Fournier 已提交
420

421 422 423
	/*
	 * process attributeList
	 */
424
	foreach(rest, attList)
425
	{
426
		IndexElem  *attribute = (IndexElem *) lfirst(rest);
427
		HeapTuple	atttuple;
428
		Form_pg_attribute attform;
429 430

		if (attribute->name == NULL)
431
			elog(ERROR, "missing attribute for define index");
432

433
		atttuple = SearchSysCacheAttName(relId, attribute->name);
434
		if (!HeapTupleIsValid(atttuple))
435
			elog(ERROR, "DefineIndex: attribute \"%s\" not found",
436
				 attribute->name);
437
		attform = (Form_pg_attribute) GETSTRUCT(atttuple);
438

439
		indexInfo->ii_KeyAttrNumbers[attn] = attform->attnum;
440

441
		classOidP[attn] = GetAttrOpClass(attribute, attform->atttypid,
B
Bruce Momjian 已提交
442
									   accessMethodName, accessMethodId);
443

444
		ReleaseSysCache(atttuple);
445
		attn++;
446 447 448 449
	}
}

static Oid
450 451
GetAttrOpClass(IndexElem *attribute, Oid attrType,
			   char *accessMethodName, Oid accessMethodId)
452
{
453 454
	char	   *schemaname;
	char	   *opcname;
455
	HeapTuple	tuple;
456
	Oid			opClassId,
457
				opInputType;
458

459
	if (attribute->opclass == NIL)
460 461
	{
		/* no operator class specified, so find the default */
462 463 464
		opClassId = GetDefaultOpClass(attrType, accessMethodId);
		if (!OidIsValid(opClassId))
			elog(ERROR, "data type %s has no default operator class for access method \"%s\""
465 466
				 "\n\tYou must specify an operator class for the index or define a"
				 "\n\tdefault operator class for the data type",
467 468
				 format_type_be(attrType), accessMethodName);
		return opClassId;
469 470
	}

471
	/*
472
	 * Specific opclass name given, so look up the opclass.
473
	 */
474 475

	/* deconstruct the name list */
476
	DeconstructQualifiedName(attribute->opclass, &schemaname, &opcname);
477 478 479 480

	if (schemaname)
	{
		/* Look in specific schema only */
B
Bruce Momjian 已提交
481
		Oid			namespaceId;
482

483
		namespaceId = LookupExplicitNamespace(schemaname);
484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501
		tuple = SearchSysCache(CLAAMNAMENSP,
							   ObjectIdGetDatum(accessMethodId),
							   PointerGetDatum(opcname),
							   ObjectIdGetDatum(namespaceId),
							   0);
	}
	else
	{
		/* Unqualified opclass name, so search the search path */
		opClassId = OpclassnameGetOpcid(accessMethodId, opcname);
		if (!OidIsValid(opClassId))
			elog(ERROR, "DefineIndex: operator class \"%s\" not supported by access method \"%s\"",
				 opcname, accessMethodName);
		tuple = SearchSysCache(CLAOID,
							   ObjectIdGetDatum(opClassId),
							   0, 0, 0);
	}

502 503
	if (!HeapTupleIsValid(tuple))
		elog(ERROR, "DefineIndex: operator class \"%s\" not supported by access method \"%s\"",
504 505 506
			 NameListToString(attribute->opclass), accessMethodName);

	/*
B
Bruce Momjian 已提交
507 508
	 * Verify that the index operator class accepts this datatype.	Note
	 * we will accept binary compatibility.
509
	 */
510
	opClassId = HeapTupleGetOid(tuple);
511
	opInputType = ((Form_pg_opclass) GETSTRUCT(tuple))->opcintype;
512

513
	if (!IsBinaryCoercible(attrType, opInputType))
514
		elog(ERROR, "operator class \"%s\" does not accept data type %s",
B
Bruce Momjian 已提交
515
		 NameListToString(attribute->opclass), format_type_be(attrType));
516 517

	ReleaseSysCache(tuple);
518

519 520 521 522 523 524
	return opClassId;
}

static Oid
GetDefaultOpClass(Oid attrType, Oid accessMethodId)
{
525
	OpclassCandidateList opclass;
526 527 528 529
	int			nexact = 0;
	int			ncompatible = 0;
	Oid			exactOid = InvalidOid;
	Oid			compatibleOid = InvalidOid;
530

531 532 533
	/* If it's a domain, look at the base type instead */
	attrType = getBaseType(attrType);

534
	/*
535 536 537 538 539
	 * We scan through all the opclasses available for the access method,
	 * looking for one that is marked default and matches the target type
	 * (either exactly or binary-compatibly, but prefer an exact match).
	 *
	 * We could find more than one binary-compatible match, in which case we
540 541
	 * require the user to specify which one he wants.	If we find more
	 * than one exact match, then someone put bogus entries in pg_opclass.
542
	 *
543 544
	 * The initial search is done by namespace.c so that we only consider
	 * opclasses visible in the current namespace search path.
545
	 */
546 547 548
	for (opclass = OpclassGetCandidates(accessMethodId);
		 opclass != NULL;
		 opclass = opclass->next)
549
	{
550
		if (opclass->opcdefault)
551
		{
552 553 554
			if (opclass->opcintype == attrType)
			{
				nexact++;
555
				exactOid = opclass->oid;
556
			}
557
			else if (IsBinaryCoercible(attrType, opclass->opcintype))
558 559
			{
				ncompatible++;
560
				compatibleOid = opclass->oid;
561
			}
562 563 564
		}
	}

565 566 567
	if (nexact == 1)
		return exactOid;
	if (nexact != 0)
568
		elog(ERROR, "pg_opclass contains multiple default opclasses for data type %s",
569 570 571
			 format_type_be(attrType));
	if (ncompatible == 1)
		return compatibleOid;
572

573
	return InvalidOid;
M
Marc G. Fournier 已提交
574 575
}

576
/*
B
Bruce Momjian 已提交
577
 * RemoveIndex
578
 *		Deletes an index.
579 580
 */
void
581
RemoveIndex(RangeVar *relation, DropBehavior behavior)
582
{
583
	Oid			indOid;
584
	char		relkind;
585
	ObjectAddress object;
586

587
	indOid = RangeVarGetRelid(relation, false);
588 589
	relkind = get_rel_relkind(indOid);
	if (relkind != RELKIND_INDEX)
590
		elog(ERROR, "relation \"%s\" is of type \"%c\"",
591
			 relation->relname, relkind);
592

593 594 595 596 597
	object.classId = RelOid_pg_class;
	object.objectId = indOid;
	object.objectSubId = 0;

	performDeletion(&object, behavior);
598
}
H
Hiroshi Inoue 已提交
599 600

/*
601
 * ReindexIndex
H
Hiroshi Inoue 已提交
602 603 604
 *		Recreate an index.
 */
void
605
ReindexIndex(RangeVar *indexRelation, bool force /* currently unused */ )
H
Hiroshi Inoue 已提交
606
{
607
	Oid			indOid;
H
Hiroshi Inoue 已提交
608
	HeapTuple	tuple;
609
	bool		overwrite;
H
Hiroshi Inoue 已提交
610

611 612
	/* Choose in-place-or-not mode */
	overwrite = IsIgnoringSystemIndexes();
613

614 615 616
	indOid = RangeVarGetRelid(indexRelation, false);
	tuple = SearchSysCache(RELOID,
						   ObjectIdGetDatum(indOid),
617
						   0, 0, 0);
H
Hiroshi Inoue 已提交
618
	if (!HeapTupleIsValid(tuple))
619
		elog(ERROR, "index \"%s\" does not exist", indexRelation->relname);
H
Hiroshi Inoue 已提交
620 621 622

	if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX)
		elog(ERROR, "relation \"%s\" is of type \"%c\"",
623 624 625
			 indexRelation->relname,
			 ((Form_pg_class) GETSTRUCT(tuple))->relkind);

626 627
	if (IsSystemClass((Form_pg_class) GETSTRUCT(tuple)) &&
		!IsToastClass((Form_pg_class) GETSTRUCT(tuple)))
628 629 630 631 632 633 634 635 636
	{
		if (!allowSystemTableMods)
			elog(ERROR, "\"%s\" is a system index. call REINDEX under standalone postgres with -O -P options",
				 indexRelation->relname);
		if (!IsIgnoringSystemIndexes())
			elog(ERROR, "\"%s\" is a system index. call REINDEX under standalone postgres with -P -O options",
				 indexRelation->relname);
	}

637
	ReleaseSysCache(tuple);
H
Hiroshi Inoue 已提交
638

639 640 641 642 643 644 645 646
	/*
	 * In-place REINDEX within a transaction block is dangerous, because
	 * if the transaction is later rolled back we have no way to undo
	 * truncation of the index's physical file.  Disallow it.
	 */
	if (overwrite)
		PreventTransactionChain((void *) indexRelation, "REINDEX");

647 648
	if (!reindex_index(indOid, force, overwrite))
		elog(WARNING, "index \"%s\" wasn't reindexed", indexRelation->relname);
H
Hiroshi Inoue 已提交
649 650 651 652 653 654 655
}

/*
 * ReindexTable
 *		Recreate indexes of a table.
 */
void
656
ReindexTable(RangeVar *relation, bool force)
H
Hiroshi Inoue 已提交
657
{
658
	Oid			heapOid;
659
	char		relkind;
H
Hiroshi Inoue 已提交
660

661
	heapOid = RangeVarGetRelid(relation, false);
662
	relkind = get_rel_relkind(heapOid);
H
Hiroshi Inoue 已提交
663

664
	if (relkind != RELKIND_RELATION && relkind != RELKIND_TOASTVALUE)
H
Hiroshi Inoue 已提交
665
		elog(ERROR, "relation \"%s\" is of type \"%c\"",
666
			 relation->relname, relkind);
667

668 669 670 671 672 673 674 675 676 677 678
	/*
	 * In-place REINDEX within a transaction block is dangerous, because
	 * if the transaction is later rolled back we have no way to undo
	 * truncation of the index's physical file.  Disallow it.
	 *
	 * XXX we assume that in-place reindex will only be done if
	 * IsIgnoringSystemIndexes() is true.
	 */
	if (IsIgnoringSystemIndexes())
		PreventTransactionChain((void *) relation, "REINDEX");

679 680
	if (!reindex_relation(heapOid, force))
		elog(WARNING, "table \"%s\" wasn't reindexed", relation->relname);
H
Hiroshi Inoue 已提交
681 682 683 684 685 686 687 688 689
}

/*
 * ReindexDatabase
 *		Recreate indexes of a database.
 */
void
ReindexDatabase(const char *dbname, bool force, bool all)
{
690
	Relation	relationRelation;
691
	HeapScanDesc scan;
692
	HeapTuple	tuple;
693
	MemoryContext private_context;
694 695 696 697 698 699
	MemoryContext old;
	int			relcnt,
				relalc,
				i,
				oncealc = 200;
	Oid		   *relids = (Oid *) NULL;
H
Hiroshi Inoue 已提交
700 701 702

	AssertArg(dbname);

703 704
	if (strcmp(dbname, DatabaseName) != 0)
		elog(ERROR, "REINDEX DATABASE: Can be executed only on the currently open database.");
705

706
	if (!(superuser() || is_dbadmin(MyDatabaseId)))
H
Hiroshi Inoue 已提交
707 708
		elog(ERROR, "REINDEX DATABASE: Permission denied.");

709 710 711 712 713
	if (!allowSystemTableMods)
		elog(ERROR, "must be called under standalone postgres with -O -P options");
	if (!IsIgnoringSystemIndexes())
		elog(ERROR, "must be called under standalone postgres with -P -O options");

714
	/*
B
Bruce Momjian 已提交
715 716 717
	 * We cannot run inside a user transaction block; if we were inside a
	 * transaction, then our commit- and start-transaction-command calls
	 * would not have the intended effect!
718
	 */
719
	PreventTransactionChain((void *) dbname, "REINDEX");
720

721
	/*
B
Bruce Momjian 已提交
722
	 * Create a memory context that will survive forced transaction
723
	 * commits we do below.  Since it is a child of PortalContext, it will
B
Bruce Momjian 已提交
724 725
	 * go away eventually even if we suffer an error; there's no need for
	 * special abort cleanup logic.
726
	 */
727
	private_context = AllocSetContextCreate(PortalContext,
728 729 730 731
											"ReindexDatabase",
											ALLOCSET_DEFAULT_MINSIZE,
											ALLOCSET_DEFAULT_INITSIZE,
											ALLOCSET_DEFAULT_MAXSIZE);
H
Hiroshi Inoue 已提交
732

733 734 735
	/*
	 * Scan pg_class to build a list of the relations we need to reindex.
	 */
H
Hiroshi Inoue 已提交
736
	relationRelation = heap_openr(RelationRelationName, AccessShareLock);
737
	scan = heap_beginscan(relationRelation, SnapshotNow, 0, NULL);
H
Hiroshi Inoue 已提交
738
	relcnt = relalc = 0;
739
	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
H
Hiroshi Inoue 已提交
740
	{
B
Bruce Momjian 已提交
741
		char		relkind;
742

H
Hiroshi Inoue 已提交
743 744
		if (!all)
		{
745 746
			if (!(IsSystemClass((Form_pg_class) GETSTRUCT(tuple)) &&
				  !IsToastClass((Form_pg_class) GETSTRUCT(tuple))))
H
Hiroshi Inoue 已提交
747 748
				continue;
		}
749 750
		relkind = ((Form_pg_class) GETSTRUCT(tuple))->relkind;
		if (relkind == RELKIND_RELATION || relkind == RELKIND_TOASTVALUE)
H
Hiroshi Inoue 已提交
751
		{
752
			old = MemoryContextSwitchTo(private_context);
H
Hiroshi Inoue 已提交
753 754 755 756 757 758 759 760 761 762 763
			if (relcnt == 0)
			{
				relalc = oncealc;
				relids = palloc(sizeof(Oid) * relalc);
			}
			else if (relcnt >= relalc)
			{
				relalc *= 2;
				relids = repalloc(relids, sizeof(Oid) * relalc);
			}
			MemoryContextSwitchTo(old);
764
			relids[relcnt] = HeapTupleGetOid(tuple);
H
Hiroshi Inoue 已提交
765 766 767 768 769 770
			relcnt++;
		}
	}
	heap_endscan(scan);
	heap_close(relationRelation, AccessShareLock);

771
	/* Now reindex each rel in a separate transaction */
772
	CommitTransactionCommand(true);
H
Hiroshi Inoue 已提交
773 774
	for (i = 0; i < relcnt; i++)
	{
775
		StartTransactionCommand(true);
776
		SetQuerySnapshot();		/* might be needed for functional index */
777
		if (reindex_relation(relids[i], force))
778
			elog(NOTICE, "relation %u was reindexed", relids[i]);
779
		CommitTransactionCommand(true);
H
Hiroshi Inoue 已提交
780
	}
781 782
	/* Tell xact.c not to chain the upcoming commit */
	StartTransactionCommand(true);
783 784

	MemoryContextDelete(private_context);
H
Hiroshi Inoue 已提交
785
}