relcache.c 122.7 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * relcache.c
4
 *	  POSTGRES relation descriptor cache code
5
 *
6
 * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
B
Add:  
Bruce Momjian 已提交
7
 * Portions Copyright (c) 1994, Regents of the University of California
8 9 10
 *
 *
 * IDENTIFICATION
11
 *	  $PostgreSQL: pgsql/src/backend/utils/cache/relcache.c,v 1.266.2.10 2010/09/02 03:17:06 tgl Exp $
12 13 14 15 16
 *
 *-------------------------------------------------------------------------
 */
/*
 * INTERFACE ROUTINES
17
 *		RelationCacheInitialize			- initialize relcache (to empty)
18
 *		RelationCacheInitializePhase2	- finish initializing relcache
19 20
 *		RelationIdGetRelation			- get a reldesc by relation id
 *		RelationClose					- close an open relation
21 22
 *
 * NOTES
23 24
 *		The following code contains many undocumented hacks.  Please be
 *		careful....
25
 */
26 27
#include "postgres.h"

28
#include <sys/file.h>
29
#include <fcntl.h>
30
#include <unistd.h>
31

32 33
#include "access/genam.h"
#include "access/heapam.h"
34
#include "access/reloptions.h"
35
#include "access/xact.h"
36
#include "catalog/catalog.h"
37
#include "catalog/index.h"
B
Bruce Momjian 已提交
38
#include "catalog/indexing.h"
39
#include "catalog/namespace.h"
40 41
#include "catalog/pg_amop.h"
#include "catalog/pg_amproc.h"
B
Bruce Momjian 已提交
42
#include "catalog/pg_attrdef.h"
43
#include "catalog/pg_authid.h"
44
#include "catalog/pg_constraint.h"
45
#include "catalog/pg_namespace.h"
46
#include "catalog/pg_opclass.h"
47
#include "catalog/pg_operator.h"
B
Bruce Momjian 已提交
48
#include "catalog/pg_proc.h"
49
#include "catalog/pg_rewrite.h"
50
#include "catalog/pg_trigger.h"
51
#include "catalog/pg_type.h"
52
#include "commands/trigger.h"
B
Bruce Momjian 已提交
53
#include "miscadmin.h"
54 55
#include "optimizer/clauses.h"
#include "optimizer/planmain.h"
56
#include "optimizer/prep.h"
57
#include "optimizer/var.h"
58
#include "rewrite/rewriteDefine.h"
59
#include "storage/fd.h"
B
Bruce Momjian 已提交
60
#include "storage/smgr.h"
61
#include "utils/builtins.h"
62
#include "utils/fmgroids.h"
63
#include "utils/inval.h"
64
#include "utils/memutils.h"
B
Bruce Momjian 已提交
65
#include "utils/relcache.h"
66
#include "utils/resowner.h"
67
#include "utils/syscache.h"
B
Bruce Momjian 已提交
68

69

70 71 72 73 74
/*
 * name of relcache init file, used to speed up backend startup
 */
#define RELCACHE_INIT_FILENAME	"pg_internal.init"

75
#define RELCACHE_INIT_FILEMAGIC		0x573264	/* version ID value */
76

77
/*
78
 *		hardcoded tuple descriptors.  see include/catalog/pg_attribute.h
79
 */
80 81 82 83
static FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
static FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
static FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
static FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
84
static FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};
85

86
/*
87
 *		Hash tables that index the relation cache
88
 *
89 90
 *		We used to index the cache by both name and OID, but now there
 *		is only an index by OID.
91
 */
92 93 94 95 96 97
typedef struct relidcacheent
{
	Oid			reloid;
	Relation	reldesc;
} RelIdCacheEnt;

98
static HTAB *RelationIdCache;
99

100 101 102 103
/*
 * This flag is false until we have prepared the critical relcache entries
 * that are needed to do indexscans on the tables read by relcache building.
 */
B
Bruce Momjian 已提交
104
bool		criticalRelcachesBuilt = false;
105 106 107

/*
 * This counter counts relcache inval events received since backend startup
B
Bruce Momjian 已提交
108
 * (but only for rels that are actually in cache).	Presently, we use it only
109 110 111 112
 * to detect whether data about to be written by write_relcache_init_file()
 * might already be obsolete.
 */
static long relcacheInvalsReceived = 0L;
113

114 115 116 117 118
/*
 * This list remembers the OIDs of the relations cached in the relcache
 * init file.
 */
static List *initFileRelationIds = NIL;
119

120
/*
121
 * This flag lets us optimize away work in AtEO(Sub)Xact_RelationCache().
122
 */
123
static bool need_eoxact_work = false;
124

125

126
/*
127
 *		macros to manipulate the lookup hashtables
128 129
 */
#define RelationCacheInsert(RELATION)	\
130
do { \
131
	RelIdCacheEnt *idhentry; bool found; \
132
	idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
133
										   (void *) &(RELATION->rd_id), \
134
										   HASH_ENTER, &found); \
135
	/* used to give notice if found -- now just keep quiet */ \
136 137 138
	idhentry->reldesc = RELATION; \
} while(0)

139
#define RelationIdCacheLookup(ID, RELATION) \
140
do { \
141 142
	RelIdCacheEnt *hentry; \
	hentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
143 144
										 (void *) &(ID), \
										 HASH_FIND, NULL); \
145
	if (hentry) \
146 147 148 149 150 151 152
		RELATION = hentry->reldesc; \
	else \
		RELATION = NULL; \
} while(0)

#define RelationCacheDelete(RELATION) \
do { \
153
	RelIdCacheEnt *idhentry; \
154
	idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
155
										   (void *) &(RELATION->rd_id), \
156
										   HASH_REMOVE, NULL); \
157
	if (idhentry == NULL) \
158
		elog(WARNING, "trying to delete a rd_id reldesc that does not exist"); \
159
} while(0)
160

161 162 163

/*
 * Special cache for opclass-related information
164
 *
165 166
 * Note: only default operators and support procs get cached, ie, those with
 * lefttype = righttype = opcintype.
167 168 169 170 171 172 173
 */
typedef struct opclasscacheent
{
	Oid			opclassoid;		/* lookup key: OID of opclass */
	bool		valid;			/* set TRUE after successful fill-in */
	StrategyNumber numStrats;	/* max # of strategies (from pg_am) */
	StrategyNumber numSupport;	/* max # of support procs (from pg_am) */
174 175
	Oid			opcfamily;		/* OID of opclass's family */
	Oid			opcintype;		/* OID of opclass's declared input type */
176
	Oid		   *operatorOids;	/* strategy operators' OIDs */
B
Bruce Momjian 已提交
177
	RegProcedure *supportProcs; /* support procs */
178 179 180 181 182
} OpClassCacheEnt;

static HTAB *OpClassCache = NULL;


183
/* non-export function prototypes */
184

185
static void RelationDestroyRelation(Relation relation);
186
static void RelationClearRelation(Relation relation, bool rebuild);
B
Bruce Momjian 已提交
187

188
static void RelationReloadIndexInfo(Relation relation);
189
static void RelationFlushRelation(Relation relation);
190 191
static bool load_relcache_init_file(void);
static void write_relcache_init_file(void);
B
Bruce Momjian 已提交
192
static void write_item(const void *data, Size len, FILE *fp);
193

194
static void formrdesc(const char *relationName, Oid relationReltype,
B
Bruce Momjian 已提交
195
		  bool hasoids, int natts, FormData_pg_attribute *att);
196

197
static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK);
198
static Relation AllocateRelationDesc(Form_pg_class relp);
199
static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
200
static void RelationBuildTupleDesc(Relation relation);
201
static Relation RelationBuildDesc(Oid targetRelId, bool insertIt);
202
static void RelationInitPhysicalAddr(Relation relation);
203
static void load_critical_index(Oid indexoid, Oid heapoid);
204
static TupleDesc GetPgClassDescriptor(void);
205
static TupleDesc GetPgIndexDescriptor(void);
206
static void AttrDefaultFetch(Relation relation);
207
static void CheckConstraintFetch(Relation relation);
208
static List *insert_ordered_oid(List *list, Oid datum);
209
static void IndexSupportInitialize(oidvector *indclass,
B
Bruce Momjian 已提交
210 211
					   Oid *indexOperator,
					   RegProcedure *indexSupport,
212 213
					   Oid *opFamily,
					   Oid *opcInType,
B
Bruce Momjian 已提交
214 215 216
					   StrategyNumber maxStrategyNumber,
					   StrategyNumber maxSupportNumber,
					   AttrNumber maxAttributeNumber);
217
static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
B
Bruce Momjian 已提交
218 219
				  StrategyNumber numStrats,
				  StrategyNumber numSupport);
220

221

222
/*
223
 *		ScanPgRelation
224
 *
225 226 227 228 229
 *		This is used by RelationBuildDesc to find a pg_class
 *		tuple matching targetRelId.  The caller must hold at least
 *		AccessShareLock on the target relid to prevent concurrent-update
 *		scenarios --- else our SnapshotNow scan might fail to find any
 *		version that it thinks is live.
230 231 232
 *
 *		NB: the returned tuple has been copied into palloc'd storage
 *		and must eventually be freed with heap_freetuple.
233
 */
234
static HeapTuple
235
ScanPgRelation(Oid targetRelId, bool indexOK)
236
{
237 238
	HeapTuple	pg_class_tuple;
	Relation	pg_class_desc;
239
	SysScanDesc pg_class_scan;
240
	ScanKeyData key[1];
241

242
	/*
B
Bruce Momjian 已提交
243
	 * form a scan key
244
	 */
245 246 247 248
	ScanKeyInit(&key[0],
				ObjectIdAttributeNumber,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(targetRelId));
249

250
	/*
B
Bruce Momjian 已提交
251
	 * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
B
Bruce Momjian 已提交
252 253 254
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).  The caller can also force a heap
	 * scan by setting indexOK == false.
255
	 */
256 257
	pg_class_desc = heap_open(RelationRelationId, AccessShareLock);
	pg_class_scan = systable_beginscan(pg_class_desc, ClassOidIndexId,
258
									   indexOK && criticalRelcachesBuilt,
259
									   SnapshotNow,
260
									   1, key);
261

262
	pg_class_tuple = systable_getnext(pg_class_scan);
B
Bruce Momjian 已提交
263

H
Hiroshi Inoue 已提交
264
	/*
265
	 * Must copy tuple before releasing buffer.
H
Hiroshi Inoue 已提交
266
	 */
267 268
	if (HeapTupleIsValid(pg_class_tuple))
		pg_class_tuple = heap_copytuple(pg_class_tuple);
269

270 271
	/* all done */
	systable_endscan(pg_class_scan);
272
	heap_close(pg_class_desc, AccessShareLock);
273

274
	return pg_class_tuple;
275 276
}

277
/*
278
 *		AllocateRelationDesc
279
 *
280
 *		This is used to allocate memory for a new relation descriptor
281
 *		and initialize the rd_rel field from the given pg_class tuple.
282
 */
283
static Relation
284
AllocateRelationDesc(Form_pg_class relp)
285
{
286
	Relation	relation;
287
	MemoryContext oldcxt;
288
	Form_pg_class relationForm;
289

290 291
	/* Relcache entries must live in CacheMemoryContext */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
292

293
	/*
294
	 * allocate and zero space for new relation descriptor
295
	 */
296
	relation = (Relation) palloc0(sizeof(RelationData));
297

298
	/*
299
	 * clear fields of reldesc that should initialize to something non-zero
300
	 */
301
	relation->rd_targblock = InvalidBlockNumber;
302

303
	/* make sure relation is marked as having no open file yet */
304
	relation->rd_smgr = NULL;
305

306
	/*
B
Bruce Momjian 已提交
307
	 * Copy the relation tuple form
308
	 *
B
Bruce Momjian 已提交
309 310
	 * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE. The
	 * variable-length fields (relacl, reloptions) are NOT stored in the
311 312
	 * relcache --- there'd be little point in it, since we don't copy the
	 * tuple's nulls bitmap and hence wouldn't know if the values are valid.
B
Bruce Momjian 已提交
313 314 315 316
	 * Bottom line is that relacl *cannot* be retrieved from the relcache. Get
	 * it from the syscache if you need it.  The same goes for the original
	 * form of reloptions (however, we do store the parsed form of reloptions
	 * in rd_options).
317 318
	 */
	relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
319

320
	memcpy(relationForm, relp, CLASS_TUPLE_SIZE);
321 322

	/* initialize relation tuple form */
323
	relation->rd_rel = relationForm;
324

325
	/* and allocate attribute tuple form storage */
326 327
	relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts,
											   relationForm->relhasoids);
328 329
	/* which we mark as a reference-counted tupdesc */
	relation->rd_att->tdrefcount = 1;
330 331 332

	MemoryContextSwitchTo(oldcxt);

333
	return relation;
334 335
}

B
Bruce Momjian 已提交
336
/*
337 338 339 340 341 342
 * RelationParseRelOptions
 *		Convert pg_class.reloptions into pre-parsed rd_options
 *
 * tuple is the real pg_class tuple (not rd_rel!) for relation
 *
 * Note: rd_rel and (if an index) rd_am must be valid already
B
Bruce Momjian 已提交
343 344
 */
static void
345
RelationParseRelOptions(Relation relation, HeapTuple tuple)
B
Bruce Momjian 已提交
346
{
347 348 349
	Datum		datum;
	bool		isnull;
	bytea	   *options;
B
Bruce Momjian 已提交
350

351
	relation->rd_options = NULL;
B
Bruce Momjian 已提交
352

353
	/* Fall out if relkind should not have options */
B
Bruce Momjian 已提交
354 355
	switch (relation->rd_rel->relkind)
	{
356 357 358 359 360 361
		case RELKIND_RELATION:
		case RELKIND_TOASTVALUE:
		case RELKIND_INDEX:
			break;
		default:
			return;
B
Bruce Momjian 已提交
362 363
	}

364
	/*
B
Bruce Momjian 已提交
365 366 367
	 * Fetch reloptions from tuple; have to use a hardwired descriptor because
	 * we might not have any other for pg_class yet (consider executing this
	 * code for pg_class itself)
368 369 370 371 372 373 374
	 */
	datum = fastgetattr(tuple,
						Anum_pg_class_reloptions,
						GetPgClassDescriptor(),
						&isnull);
	if (isnull)
		return;
B
Bruce Momjian 已提交
375

376
	/* Parse into appropriate format; don't error out here */
B
Bruce Momjian 已提交
377 378
	switch (relation->rd_rel->relkind)
	{
379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399
		case RELKIND_RELATION:
		case RELKIND_TOASTVALUE:
			options = heap_reloptions(relation->rd_rel->relkind, datum,
									  false);
			break;
		case RELKIND_INDEX:
			options = index_reloptions(relation->rd_am->amoptions, datum,
									   false);
			break;
		default:
			Assert(false);		/* can't get here */
			options = NULL;		/* keep compiler quiet */
			break;
	}

	/* Copy parsed data into CacheMemoryContext */
	if (options)
	{
		relation->rd_options = MemoryContextAlloc(CacheMemoryContext,
												  VARSIZE(options));
		memcpy(relation->rd_options, options, VARSIZE(options));
400
		pfree(options);
B
Bruce Momjian 已提交
401 402 403
	}
}

404
/*
405
 *		RelationBuildTupleDesc
406
 *
407
 *		Form the relation's tuple descriptor from information in
408
 *		the pg_attribute, pg_attrdef & pg_constraint system catalogs.
409 410
 */
static void
411
RelationBuildTupleDesc(Relation relation)
412
{
413 414
	HeapTuple	pg_attribute_tuple;
	Relation	pg_attribute_desc;
415 416
	SysScanDesc pg_attribute_scan;
	ScanKeyData skey[2];
417
	int			need;
418
	TupleConstr *constr;
H
Hiroshi Inoue 已提交
419
	AttrDefault *attrdef = NULL;
420
	int			ndef = 0;
421

422 423 424 425
	/* copy some fields from pg_class row to rd_att */
	relation->rd_att->tdtypeid = relation->rd_rel->reltype;
	relation->rd_att->tdtypmod = -1;	/* unnecessary, but... */
	relation->rd_att->tdhasoid = relation->rd_rel->relhasoids;
426

427 428
	constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext,
												sizeof(TupleConstr));
H
Hiroshi Inoue 已提交
429
	constr->has_not_null = false;
430

431
	/*
432
	 * Form a scan key that selects only user attributes (attnum > 0).
B
Bruce Momjian 已提交
433 434
	 * (Eliminating system attribute rows at the index level is lots faster
	 * than fetching them.)
435
	 */
436 437 438 439 440 441 442 443
	ScanKeyInit(&skey[0],
				Anum_pg_attribute_attrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
	ScanKeyInit(&skey[1],
				Anum_pg_attribute_attnum,
				BTGreaterStrategyNumber, F_INT2GT,
				Int16GetDatum(0));
444

445
	/*
B
Bruce Momjian 已提交
446 447 448
	 * Open pg_attribute and begin a scan.	Force heap scan if we haven't yet
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).
449
	 */
450
	pg_attribute_desc = heap_open(AttributeRelationId, AccessShareLock);
451
	pg_attribute_scan = systable_beginscan(pg_attribute_desc,
452
										   AttributeRelidNumIndexId,
453 454 455
										   criticalRelcachesBuilt,
										   SnapshotNow,
										   2, skey);
456

457
	/*
B
Bruce Momjian 已提交
458
	 * add attribute data to relation->rd_att
459
	 */
460
	need = relation->rd_rel->relnatts;
461

462
	while (HeapTupleIsValid(pg_attribute_tuple = systable_getnext(pg_attribute_scan)))
463
	{
464 465
		Form_pg_attribute attp;

466
		attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
467

468 469
		if (attp->attnum <= 0 ||
			attp->attnum > relation->rd_rel->relnatts)
470
			elog(ERROR, "invalid attribute number %d for %s",
471 472
				 attp->attnum, RelationGetRelationName(relation));

473 474
		memcpy(relation->rd_att->attrs[attp->attnum - 1],
			   attp,
475
			   ATTRIBUTE_TUPLE_SIZE);
476

477 478
		/* Update constraint/default info */
		if (attp->attnotnull)
479
			constr->has_not_null = true;
H
Hiroshi Inoue 已提交
480

481 482 483 484
		if (attp->atthasdef)
		{
			if (attrdef == NULL)
				attrdef = (AttrDefault *)
485 486 487
					MemoryContextAllocZero(CacheMemoryContext,
										   relation->rd_rel->relnatts *
										   sizeof(AttrDefault));
488 489 490
			attrdef[ndef].adnum = attp->attnum;
			attrdef[ndef].adbin = NULL;
			ndef++;
491
		}
492 493 494
		need--;
		if (need == 0)
			break;
495
	}
496

497
	/*
B
Bruce Momjian 已提交
498
	 * end the scan and close the attribute relation
499
	 */
500
	systable_endscan(pg_attribute_scan);
501
	heap_close(pg_attribute_desc, AccessShareLock);
H
Hiroshi Inoue 已提交
502

503 504 505 506
	if (need != 0)
		elog(ERROR, "catalog is missing %d attribute(s) for relid %u",
			 need, RelationGetRelid(relation));

507
	/*
B
Bruce Momjian 已提交
508 509 510
	 * The attcacheoff values we read from pg_attribute should all be -1
	 * ("unknown").  Verify this if assert checking is on.	They will be
	 * computed when and if needed during tuple access.
511 512 513
	 */
#ifdef USE_ASSERT_CHECKING
	{
B
Bruce Momjian 已提交
514
		int			i;
515 516 517 518 519 520

		for (i = 0; i < relation->rd_rel->relnatts; i++)
			Assert(relation->rd_att->attrs[i]->attcacheoff == -1);
	}
#endif

521
	/*
B
Bruce Momjian 已提交
522
	 * However, we can easily set the attcacheoff value for the first
B
Bruce Momjian 已提交
523 524
	 * attribute: it must be zero.	This eliminates the need for special cases
	 * for attnum=1 that used to exist in fastgetattr() and index_getattr().
525
	 */
526 527
	if (relation->rd_rel->relnatts > 0)
		relation->rd_att->attrs[0]->attcacheoff = 0;
528

529 530 531 532
	/*
	 * Set up constraint/default info
	 */
	if (constr->has_not_null || ndef > 0 || relation->rd_rel->relchecks)
533
	{
534
		relation->rd_att->constr = constr;
535

536
		if (ndef > 0)			/* DEFAULTs */
537
		{
538 539 540 541 542 543 544
			if (ndef < relation->rd_rel->relnatts)
				constr->defval = (AttrDefault *)
					repalloc(attrdef, ndef * sizeof(AttrDefault));
			else
				constr->defval = attrdef;
			constr->num_defval = ndef;
			AttrDefaultFetch(relation);
545
		}
546 547
		else
			constr->num_defval = 0;
548

549
		if (relation->rd_rel->relchecks > 0)	/* CHECKs */
550
		{
551 552
			constr->num_check = relation->rd_rel->relchecks;
			constr->check = (ConstrCheck *)
553
				MemoryContextAllocZero(CacheMemoryContext,
B
Bruce Momjian 已提交
554
									constr->num_check * sizeof(ConstrCheck));
555
			CheckConstraintFetch(relation);
556
		}
557 558 559 560 561 562 563
		else
			constr->num_check = 0;
	}
	else
	{
		pfree(constr);
		relation->rd_att->constr = NULL;
564
	}
565 566
}

567
/*
568
 *		RelationBuildRuleLock
569
 *
570 571
 *		Form the relation's rewrite rules from information in
 *		the pg_rewrite system catalog.
572 573 574 575 576 577 578
 *
 * Note: The rule parsetrees are potentially very complex node structures.
 * To allow these trees to be freed when the relcache entry is flushed,
 * we make a private memory context to hold the RuleLock information for
 * each relcache entry that has associated rules.  The context is used
 * just for rule info, not for any other subsidiary data of the relcache
 * entry, because that keeps the update logic in RelationClearRelation()
B
Bruce Momjian 已提交
579
 * manageable.	The other subsidiary data structures are simple enough
580
 * to be easy to free explicitly, anyway.
581 582 583 584
 */
static void
RelationBuildRuleLock(Relation relation)
{
585 586
	MemoryContext rulescxt;
	MemoryContext oldcxt;
587 588 589 590
	HeapTuple	rewrite_tuple;
	Relation	rewrite_desc;
	TupleDesc	rewrite_tupdesc;
	SysScanDesc rewrite_scan;
591 592 593 594 595
	ScanKeyData key;
	RuleLock   *rulelock;
	int			numlocks;
	RewriteRule **rules;
	int			maxlocks;
596

597
	/*
B
Bruce Momjian 已提交
598 599
	 * Make the private context.  Parameters are set on the assumption that
	 * it'll probably not contain much data.
600 601 602
	 */
	rulescxt = AllocSetContextCreate(CacheMemoryContext,
									 RelationGetRelationName(relation),
603 604 605
									 ALLOCSET_SMALL_MINSIZE,
									 ALLOCSET_SMALL_INITSIZE,
									 ALLOCSET_SMALL_MAXSIZE);
606 607
	relation->rd_rulescxt = rulescxt;

608
	/*
B
Bruce Momjian 已提交
609 610
	 * allocate an array to hold the rewrite rules (the array is extended if
	 * necessary)
611 612
	 */
	maxlocks = 4;
613 614
	rules = (RewriteRule **)
		MemoryContextAlloc(rulescxt, sizeof(RewriteRule *) * maxlocks);
615 616
	numlocks = 0;

617
	/*
B
Bruce Momjian 已提交
618
	 * form a scan key
619
	 */
620 621 622 623
	ScanKeyInit(&key,
				Anum_pg_rewrite_ev_class,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
624

625
	/*
B
Bruce Momjian 已提交
626
	 * open pg_rewrite and begin a scan
627
	 *
628 629
	 * Note: since we scan the rules using RewriteRelRulenameIndexId, we will
	 * be reading the rules in name order, except possibly during
B
Bruce Momjian 已提交
630 631
	 * emergency-recovery operations (ie, IgnoreSystemIndexes). This in turn
	 * ensures that rules will be fired in name order.
632
	 */
633
	rewrite_desc = heap_open(RewriteRelationId, AccessShareLock);
634
	rewrite_tupdesc = RelationGetDescr(rewrite_desc);
B
Bruce Momjian 已提交
635
	rewrite_scan = systable_beginscan(rewrite_desc,
636
									  RewriteRelRulenameIndexId,
637 638 639 640
									  true, SnapshotNow,
									  1, &key);

	while (HeapTupleIsValid(rewrite_tuple = systable_getnext(rewrite_scan)))
641
	{
642
		Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
643
		bool		isnull;
644 645 646
		Datum		rule_datum;
		text	   *rule_text;
		char	   *rule_str;
647
		RewriteRule *rule;
648

649 650
		rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
												  sizeof(RewriteRule));
651

652
		rule->ruleId = HeapTupleGetOid(rewrite_tuple);
653

654 655
		rule->event = rewrite_form->ev_type - '0';
		rule->attrno = rewrite_form->ev_attr;
656
		rule->enabled = rewrite_form->ev_enabled;
657 658
		rule->isInstead = rewrite_form->is_instead;

659
		/*
B
Bruce Momjian 已提交
660 661 662 663
		 * Must use heap_getattr to fetch ev_action and ev_qual.  Also, the
		 * rule strings are often large enough to be toasted.  To avoid
		 * leaking memory in the caller's context, do the detoasting here so
		 * we can free the detoasted version.
664 665
		 */
		rule_datum = heap_getattr(rewrite_tuple,
666
								  Anum_pg_rewrite_ev_action,
667
								  rewrite_tupdesc,
B
Bruce Momjian 已提交
668
								  &isnull);
B
Bruce Momjian 已提交
669
		Assert(!isnull);
670 671 672
		rule_text = DatumGetTextP(rule_datum);
		rule_str = DatumGetCString(DirectFunctionCall1(textout,
												PointerGetDatum(rule_text)));
673
		oldcxt = MemoryContextSwitchTo(rulescxt);
674
		rule->actions = (List *) stringToNode(rule_str);
675
		MemoryContextSwitchTo(oldcxt);
676 677 678
		pfree(rule_str);
		if ((Pointer) rule_text != DatumGetPointer(rule_datum))
			pfree(rule_text);
679

680 681 682 683
		rule_datum = heap_getattr(rewrite_tuple,
								  Anum_pg_rewrite_ev_qual,
								  rewrite_tupdesc,
								  &isnull);
B
Bruce Momjian 已提交
684
		Assert(!isnull);
685 686 687
		rule_text = DatumGetTextP(rule_datum);
		rule_str = DatumGetCString(DirectFunctionCall1(textout,
												PointerGetDatum(rule_text)));
688
		oldcxt = MemoryContextSwitchTo(rulescxt);
689
		rule->qual = (Node *) stringToNode(rule_str);
690
		MemoryContextSwitchTo(oldcxt);
691 692 693
		pfree(rule_str);
		if ((Pointer) rule_text != DatumGetPointer(rule_datum))
			pfree(rule_text);
694

695 696
		/*
		 * We want the rule's table references to be checked as though by the
B
Bruce Momjian 已提交
697
		 * table owner, not the user referencing the rule.	Therefore, scan
698
		 * through the rule's actions and set the checkAsUser field on all
B
Bruce Momjian 已提交
699
		 * rtable entries.	We have to look at the qual as well, in case it
700 701
		 * contains sublinks.
		 *
B
Bruce Momjian 已提交
702 703 704 705 706
		 * The reason for doing this when the rule is loaded, rather than when
		 * it is stored, is that otherwise ALTER TABLE OWNER would have to
		 * grovel through stored rules to update checkAsUser fields. Scanning
		 * the rule tree during load is relatively cheap (compared to
		 * constructing it in the first place), so we do it here.
707 708 709 710
		 */
		setRuleCheckAsUser((Node *) rule->actions, relation->rd_rel->relowner);
		setRuleCheckAsUser(rule->qual, relation->rd_rel->relowner);

711
		if (numlocks >= maxlocks)
712 713
		{
			maxlocks *= 2;
714 715
			rules = (RewriteRule **)
				repalloc(rules, sizeof(RewriteRule *) * maxlocks);
716
		}
717
		rules[numlocks++] = rule;
718
	}
719

720
	/*
B
Bruce Momjian 已提交
721
	 * end the scan and close the attribute relation
722
	 */
723 724
	systable_endscan(rewrite_scan);
	heap_close(rewrite_desc, AccessShareLock);
725

726
	/*
B
Bruce Momjian 已提交
727
	 * form a RuleLock and insert into relation
728
	 */
729
	rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
730 731 732 733
	rulelock->numLocks = numlocks;
	rulelock->rules = rules;

	relation->rd_rules = rulelock;
734 735
}

736
/*
737 738 739 740 741 742 743 744 745
 *		equalRuleLocks
 *
 *		Determine whether two RuleLocks are equivalent
 *
 *		Probably this should be in the rules code someplace...
 */
static bool
equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
{
746
	int			i;
747

748
	/*
B
Bruce Momjian 已提交
749
	 * As of 7.3 we assume the rule ordering is repeatable, because
B
Bruce Momjian 已提交
750 751
	 * RelationBuildRuleLock should read 'em in a consistent order.  So just
	 * compare corresponding slots.
752
	 */
753 754 755 756 757 758 759 760 761
	if (rlock1 != NULL)
	{
		if (rlock2 == NULL)
			return false;
		if (rlock1->numLocks != rlock2->numLocks)
			return false;
		for (i = 0; i < rlock1->numLocks; i++)
		{
			RewriteRule *rule1 = rlock1->rules[i];
762 763 764
			RewriteRule *rule2 = rlock2->rules[i];

			if (rule1->ruleId != rule2->ruleId)
765 766 767 768 769
				return false;
			if (rule1->event != rule2->event)
				return false;
			if (rule1->attrno != rule2->attrno)
				return false;
770 771
			if (rule1->enabled != rule2->enabled)
				return false;
772 773
			if (rule1->isInstead != rule2->isInstead)
				return false;
774
			if (!equal(rule1->qual, rule2->qual))
775
				return false;
776
			if (!equal(rule1->actions, rule2->actions))
777 778 779 780 781 782
				return false;
		}
	}
	else if (rlock2 != NULL)
		return false;
	return true;
783 784 785
}


786
/*
787 788
 *		RelationBuildDesc
 *
789
 *		Build a relation descriptor.  The caller must hold at least
790
 *		AccessShareLock on the target relid.
791
 *
792 793
 *		The new descriptor is inserted into the hash table if insertIt is true.
 *
794 795 796
 *		Returns NULL if no pg_class row could be found for the given relid
 *		(suggesting we are trying to access a just-deleted relation).
 *		Any other error is reported via elog.
797
 */
798
static Relation
799
RelationBuildDesc(Oid targetRelId, bool insertIt)
800
{
801 802
	Relation	relation;
	Oid			relid;
803
	HeapTuple	pg_class_tuple;
804
	Form_pg_class relp;
805

806
	/*
B
Bruce Momjian 已提交
807
	 * find the tuple in pg_class corresponding to the given relation id
808
	 */
809
	pg_class_tuple = ScanPgRelation(targetRelId, true);
810

811
	/*
B
Bruce Momjian 已提交
812
	 * if no such tuple exists, return NULL
813 814 815 816
	 */
	if (!HeapTupleIsValid(pg_class_tuple))
		return NULL;

817
	/*
B
Bruce Momjian 已提交
818
	 * get information from the pg_class_tuple
819
	 */
820
	relid = HeapTupleGetOid(pg_class_tuple);
821 822
	relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);

823
	/*
B
Bruce Momjian 已提交
824 825
	 * allocate storage for the relation descriptor, and copy pg_class_tuple
	 * to relation->rd_rel.
826
	 */
827
	relation = AllocateRelationDesc(relp);
828

829
	/*
B
Bruce Momjian 已提交
830
	 * initialize the relation's relation id (relation->rd_id)
831
	 */
832
	RelationGetRelid(relation) = relid;
833

834
	/*
B
Bruce Momjian 已提交
835 836 837
	 * normal relations are not nailed into the cache; nor can a pre-existing
	 * relation be new.  It could be temp though.  (Actually, it could be new
	 * too, but it's okay to forget that fact if forced to flush the entry.)
838
	 */
839
	relation->rd_refcnt = 0;
840
	relation->rd_isnailed = false;
841
	relation->rd_createSubid = InvalidSubTransactionId;
842
	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
843
	relation->rd_istemp = isTempOrToastNamespace(relation->rd_rel->relnamespace);
844

845
	/*
B
Bruce Momjian 已提交
846
	 * initialize the tuple descriptor (relation->rd_att).
847
	 */
848
	RelationBuildTupleDesc(relation);
849

850
	/*
B
Bruce Momjian 已提交
851
	 * Fetch rules and triggers that affect this relation
852
	 */
853
	if (relation->rd_rel->relhasrules)
854 855
		RelationBuildRuleLock(relation);
	else
856
	{
857
		relation->rd_rules = NULL;
858 859
		relation->rd_rulescxt = NULL;
	}
860

861
	if (relation->rd_rel->reltriggers > 0)
862 863 864 865
		RelationBuildTriggers(relation);
	else
		relation->trigdesc = NULL;

866
	/*
867
	 * if it's an index, initialize index-related information
868
	 */
869
	if (OidIsValid(relation->rd_rel->relam))
870
		RelationInitIndexAccessInfo(relation);
871

872 873 874
	/* extract reloptions if any */
	RelationParseRelOptions(relation, pg_class_tuple);

875
	/*
B
Bruce Momjian 已提交
876
	 * initialize the relation lock manager information
877 878 879
	 */
	RelationInitLockInfo(relation);		/* see lmgr.c */

880 881 882 883
	/*
	 * initialize physical addressing information for the relation
	 */
	RelationInitPhysicalAddr(relation);
884

885
	/* make sure relation is marked as having no open file yet */
886
	relation->rd_smgr = NULL;
887

B
Bruce Momjian 已提交
888 889 890 891 892
	/*
	 * now we can free the memory allocated for pg_class_tuple
	 */
	heap_freetuple(pg_class_tuple);

893
	/*
894
	 * Insert newly created relation into relcache hash table, if requested.
895
	 */
896 897
	if (insertIt)
		RelationCacheInsert(relation);
898

899 900 901
	/* It's fully valid */
	relation->rd_isvalid = true;

902
	return relation;
903 904
}

905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921
/*
 * Initialize the physical addressing info (RelFileNode) for a relcache entry
 */
static void
RelationInitPhysicalAddr(Relation relation)
{
	if (relation->rd_rel->reltablespace)
		relation->rd_node.spcNode = relation->rd_rel->reltablespace;
	else
		relation->rd_node.spcNode = MyDatabaseTableSpace;
	if (relation->rd_rel->relisshared)
		relation->rd_node.dbNode = InvalidOid;
	else
		relation->rd_node.dbNode = MyDatabaseId;
	relation->rd_node.relNode = relation->rd_rel->relfilenode;
}

922 923 924 925 926
/*
 * Initialize index-access-method support data for an index relation
 */
void
RelationInitIndexAccessInfo(Relation relation)
927
{
928 929
	HeapTuple	tuple;
	Form_pg_am	aform;
930
	Datum		indclassDatum;
931
	Datum		indoptionDatum;
932
	bool		isnull;
933
	oidvector  *indclass;
B
Bruce Momjian 已提交
934
	int2vector *indoption;
935
	MemoryContext indexcxt;
936
	MemoryContext oldcontext;
937
	int			natts;
938 939
	uint16		amstrategies;
	uint16		amsupport;
940 941

	/*
942
	 * Make a copy of the pg_index entry for the index.  Since pg_index
B
Bruce Momjian 已提交
943 944
	 * contains variable-length and possibly-null fields, we have to do this
	 * honestly rather than just treating it as a Form_pg_index struct.
945 946 947 948 949
	 */
	tuple = SearchSysCache(INDEXRELID,
						   ObjectIdGetDatum(RelationGetRelid(relation)),
						   0, 0, 0);
	if (!HeapTupleIsValid(tuple))
950
		elog(ERROR, "cache lookup failed for index %u",
951
			 RelationGetRelid(relation));
952 953 954 955
	oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_indextuple = heap_copytuple(tuple);
	relation->rd_index = (Form_pg_index) GETSTRUCT(relation->rd_indextuple);
	MemoryContextSwitchTo(oldcontext);
956 957 958 959 960 961 962 963 964
	ReleaseSysCache(tuple);

	/*
	 * Make a copy of the pg_am entry for the index's access method
	 */
	tuple = SearchSysCache(AMOID,
						   ObjectIdGetDatum(relation->rd_rel->relam),
						   0, 0, 0);
	if (!HeapTupleIsValid(tuple))
965
		elog(ERROR, "cache lookup failed for access method %u",
966 967 968 969 970
			 relation->rd_rel->relam);
	aform = (Form_pg_am) MemoryContextAlloc(CacheMemoryContext, sizeof *aform);
	memcpy(aform, GETSTRUCT(tuple), sizeof *aform);
	ReleaseSysCache(tuple);
	relation->rd_am = aform;
971 972

	natts = relation->rd_rel->relnatts;
973
	if (natts != relation->rd_index->indnatts)
974
		elog(ERROR, "relnatts disagrees with indnatts for index %u",
975
			 RelationGetRelid(relation));
976 977
	amstrategies = aform->amstrategies;
	amsupport = aform->amsupport;
978

979
	/*
B
Bruce Momjian 已提交
980 981 982
	 * Make the private context to hold index access info.	The reason we need
	 * a context, and not just a couple of pallocs, is so that we won't leak
	 * any subsidiary info attached to fmgr lookup records.
983 984 985 986 987 988
	 *
	 * Context parameters are set on the assumption that it'll probably not
	 * contain much data.
	 */
	indexcxt = AllocSetContextCreate(CacheMemoryContext,
									 RelationGetRelationName(relation),
989 990 991
									 ALLOCSET_SMALL_MINSIZE,
									 ALLOCSET_SMALL_INITSIZE,
									 ALLOCSET_SMALL_MAXSIZE);
992 993 994 995 996
	relation->rd_indexcxt = indexcxt;

	/*
	 * Allocate arrays to hold data
	 */
997 998 999
	relation->rd_aminfo = (RelationAmInfo *)
		MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));

1000 1001 1002 1003 1004
	relation->rd_opfamily = (Oid *)
		MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
	relation->rd_opcintype = (Oid *)
		MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));

1005
	if (amstrategies > 0)
1006
		relation->rd_operator = (Oid *)
1007 1008
			MemoryContextAllocZero(indexcxt,
								   natts * amstrategies * sizeof(Oid));
1009
	else
1010
		relation->rd_operator = NULL;
1011

1012
	if (amsupport > 0)
1013
	{
1014
		int			nsupport = natts * amsupport;
1015

1016
		relation->rd_support = (RegProcedure *)
1017
			MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
1018
		relation->rd_supportinfo = (FmgrInfo *)
1019
			MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
1020 1021
	}
	else
1022
	{
1023 1024
		relation->rd_support = NULL;
		relation->rd_supportinfo = NULL;
1025
	}
1026

1027 1028 1029
	relation->rd_indoption = (int16 *)
		MemoryContextAllocZero(indexcxt, natts * sizeof(int16));

1030 1031
	/*
	 * indclass cannot be referenced directly through the C struct, because it
B
Bruce Momjian 已提交
1032 1033
	 * comes after the variable-width indkey field.  Must extract the datum
	 * the hard way...
1034 1035 1036 1037 1038 1039 1040
	 */
	indclassDatum = fastgetattr(relation->rd_indextuple,
								Anum_pg_index_indclass,
								GetPgIndexDescriptor(),
								&isnull);
	Assert(!isnull);
	indclass = (oidvector *) DatumGetPointer(indclassDatum);
1041

1042
	/*
B
Bruce Momjian 已提交
1043 1044 1045
	 * Fill the operator and support procedure OID arrays, as well as the info
	 * about opfamilies and opclass input types.  (aminfo and supportinfo are
	 * left as zeroes, and are filled on-the-fly when used)
1046
	 */
1047 1048 1049
	IndexSupportInitialize(indclass,
						   relation->rd_operator, relation->rd_support,
						   relation->rd_opfamily, relation->rd_opcintype,
1050
						   amstrategies, amsupport, natts);
1051

1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062
	/*
	 * Similarly extract indoption and copy it to the cache entry
	 */
	indoptionDatum = fastgetattr(relation->rd_indextuple,
								 Anum_pg_index_indoption,
								 GetPgIndexDescriptor(),
								 &isnull);
	Assert(!isnull);
	indoption = (int2vector *) DatumGetPointer(indoptionDatum);
	memcpy(relation->rd_indoption, indoption->values, natts * sizeof(int16));

1063 1064 1065 1066 1067
	/*
	 * expressions and predicate cache will be filled later
	 */
	relation->rd_indexprs = NIL;
	relation->rd_indpred = NIL;
1068
	relation->rd_amcache = NULL;
1069 1070
}

1071
/*
1072
 * IndexSupportInitialize
1073
 *		Initializes an index's cached opclass information,
1074
 *		given the index's pg_index.indclass entry.
1075
 *
1076 1077
 * Data is returned into *indexOperator, *indexSupport, *opFamily, and
 * *opcInType, which are arrays allocated by the caller.
1078 1079 1080 1081 1082 1083 1084 1085
 *
 * The caller also passes maxStrategyNumber, maxSupportNumber, and
 * maxAttributeNumber, since these indicate the size of the arrays
 * it has allocated --- but in practice these numbers must always match
 * those obtainable from the system catalog entries for the index and
 * access method.
 */
static void
1086
IndexSupportInitialize(oidvector *indclass,
1087 1088
					   Oid *indexOperator,
					   RegProcedure *indexSupport,
1089 1090
					   Oid *opFamily,
					   Oid *opcInType,
1091 1092 1093 1094 1095 1096 1097 1098 1099 1100
					   StrategyNumber maxStrategyNumber,
					   StrategyNumber maxSupportNumber,
					   AttrNumber maxAttributeNumber)
{
	int			attIndex;

	for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
	{
		OpClassCacheEnt *opcentry;

1101
		if (!OidIsValid(indclass->values[attIndex]))
1102
			elog(ERROR, "bogus pg_index tuple");
1103 1104

		/* look up the info for this opclass, using a cache */
1105
		opcentry = LookupOpclassInfo(indclass->values[attIndex],
1106 1107 1108
									 maxStrategyNumber,
									 maxSupportNumber);

1109
		/* copy cached data into relcache entry */
1110 1111
		opFamily[attIndex] = opcentry->opcfamily;
		opcInType[attIndex] = opcentry->opcintype;
1112
		if (maxStrategyNumber > 0)
1113 1114 1115
			memcpy(&indexOperator[attIndex * maxStrategyNumber],
				   opcentry->operatorOids,
				   maxStrategyNumber * sizeof(Oid));
1116
		if (maxSupportNumber > 0)
1117 1118 1119
			memcpy(&indexSupport[attIndex * maxSupportNumber],
				   opcentry->supportProcs,
				   maxSupportNumber * sizeof(RegProcedure));
1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134
	}
}

/*
 * LookupOpclassInfo
 *
 * This routine maintains a per-opclass cache of the information needed
 * by IndexSupportInitialize().  This is more efficient than relying on
 * the catalog cache, because we can load all the info about a particular
 * opclass in a single indexscan of pg_amproc or pg_amop.
 *
 * The information from pg_am about expected range of strategy and support
 * numbers is passed in, rather than being looked up, mainly because the
 * caller will have it already.
 *
1135 1136 1137 1138 1139 1140 1141
 * Note there is no provision for flushing the cache.  This is OK at the
 * moment because there is no way to ALTER any interesting properties of an
 * existing opclass --- all you can do is drop it, which will result in
 * a useless but harmless dead entry in the cache.  To support altering
 * opclass membership (not the same as opfamily membership!), we'd need to
 * be able to flush this cache as well as the contents of relcache entries
 * for indexes.
1142 1143 1144 1145 1146 1147 1148 1149
 */
static OpClassCacheEnt *
LookupOpclassInfo(Oid operatorClassOid,
				  StrategyNumber numStrats,
				  StrategyNumber numSupport)
{
	OpClassCacheEnt *opcentry;
	bool		found;
1150 1151
	Relation	rel;
	SysScanDesc scan;
1152
	ScanKeyData skey[3];
1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166
	HeapTuple	htup;
	bool		indexOK;

	if (OpClassCache == NULL)
	{
		/* First time through: initialize the opclass cache */
		HASHCTL		ctl;

		if (!CacheMemoryContext)
			CreateCacheMemoryContext();

		MemSet(&ctl, 0, sizeof(ctl));
		ctl.keysize = sizeof(Oid);
		ctl.entrysize = sizeof(OpClassCacheEnt);
1167
		ctl.hash = oid_hash;
1168 1169 1170 1171 1172 1173 1174 1175
		OpClassCache = hash_create("Operator class cache", 64,
								   &ctl, HASH_ELEM | HASH_FUNCTION);
	}

	opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
											   (void *) &operatorClassOid,
											   HASH_ENTER, &found);

1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197
	if (!found)
	{
		/* Need to allocate memory for new entry */
		opcentry->valid = false;	/* until known OK */
		opcentry->numStrats = numStrats;
		opcentry->numSupport = numSupport;

		if (numStrats > 0)
			opcentry->operatorOids = (Oid *)
				MemoryContextAllocZero(CacheMemoryContext,
									   numStrats * sizeof(Oid));
		else
			opcentry->operatorOids = NULL;

		if (numSupport > 0)
			opcentry->supportProcs = (RegProcedure *)
				MemoryContextAllocZero(CacheMemoryContext,
									   numSupport * sizeof(RegProcedure));
		else
			opcentry->supportProcs = NULL;
	}
	else
1198 1199 1200 1201 1202
	{
		Assert(numStrats == opcentry->numStrats);
		Assert(numSupport == opcentry->numSupport);
	}

1203 1204 1205 1206 1207 1208 1209 1210 1211 1212
	/*
	 * When testing for cache-flush hazards, we intentionally disable the
	 * operator class cache and force reloading of the info on each call.
	 * This is helpful because we want to test the case where a cache flush
	 * occurs while we are loading the info, and it's very hard to provoke
	 * that if this happens only once per opclass per backend.
	 */
#if defined(CLOBBER_CACHE_ALWAYS)
	opcentry->valid = false;
#endif
1213

1214 1215
	if (opcentry->valid)
		return opcentry;
1216 1217

	/*
1218 1219
	 * Need to fill in new entry.
	 *
B
Bruce Momjian 已提交
1220 1221 1222
	 * To avoid infinite recursion during startup, force heap scans if we're
	 * looking up info for the opclasses used by the indexes we would like to
	 * reference here.
1223 1224 1225 1226 1227
	 */
	indexOK = criticalRelcachesBuilt ||
		(operatorClassOid != OID_BTREE_OPS_OID &&
		 operatorClassOid != INT2_BTREE_OPS_OID);

1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255
	/*
	 * We have to fetch the pg_opclass row to determine its opfamily and
	 * opcintype, which are needed to look up the operators and functions.
	 * It'd be convenient to use the syscache here, but that probably doesn't
	 * work while bootstrapping.
	 */
	ScanKeyInit(&skey[0],
				ObjectIdAttributeNumber,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(operatorClassOid));
	rel = heap_open(OperatorClassRelationId, AccessShareLock);
	scan = systable_beginscan(rel, OpclassOidIndexId, indexOK,
							  SnapshotNow, 1, skey);

	if (HeapTupleIsValid(htup = systable_getnext(scan)))
	{
		Form_pg_opclass opclassform = (Form_pg_opclass) GETSTRUCT(htup);

		opcentry->opcfamily = opclassform->opcfamily;
		opcentry->opcintype = opclassform->opcintype;
	}
	else
		elog(ERROR, "could not find tuple for opclass %u", operatorClassOid);

	systable_endscan(scan);
	heap_close(rel, AccessShareLock);


1256
	/*
B
Bruce Momjian 已提交
1257
	 * Scan pg_amop to obtain operators for the opclass.  We only fetch the
1258
	 * default ones (those with lefttype = righttype = opcintype).
1259 1260 1261
	 */
	if (numStrats > 0)
	{
1262
		ScanKeyInit(&skey[0],
1263
					Anum_pg_amop_amopfamily,
1264
					BTEqualStrategyNumber, F_OIDEQ,
1265
					ObjectIdGetDatum(opcentry->opcfamily));
1266
		ScanKeyInit(&skey[1],
1267 1268 1269 1270 1271
					Anum_pg_amop_amoplefttype,
					BTEqualStrategyNumber, F_OIDEQ,
					ObjectIdGetDatum(opcentry->opcintype));
		ScanKeyInit(&skey[2],
					Anum_pg_amop_amoprighttype,
1272
					BTEqualStrategyNumber, F_OIDEQ,
1273
					ObjectIdGetDatum(opcentry->opcintype));
1274 1275
		rel = heap_open(AccessMethodOperatorRelationId, AccessShareLock);
		scan = systable_beginscan(rel, AccessMethodStrategyIndexId, indexOK,
1276
								  SnapshotNow, 3, skey);
1277 1278

		while (HeapTupleIsValid(htup = systable_getnext(scan)))
1279 1280 1281 1282 1283
		{
			Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(htup);

			if (amopform->amopstrategy <= 0 ||
				(StrategyNumber) amopform->amopstrategy > numStrats)
1284
				elog(ERROR, "invalid amopstrategy number %d for opclass %u",
1285 1286 1287 1288 1289
					 amopform->amopstrategy, operatorClassOid);
			opcentry->operatorOids[amopform->amopstrategy - 1] =
				amopform->amopopr;
		}

1290 1291
		systable_endscan(scan);
		heap_close(rel, AccessShareLock);
1292 1293 1294
	}

	/*
B
Bruce Momjian 已提交
1295
	 * Scan pg_amproc to obtain support procs for the opclass.	We only fetch
1296
	 * the default ones (those with lefttype = righttype = opcintype).
1297 1298 1299
	 */
	if (numSupport > 0)
	{
1300
		ScanKeyInit(&skey[0],
1301
					Anum_pg_amproc_amprocfamily,
1302
					BTEqualStrategyNumber, F_OIDEQ,
1303
					ObjectIdGetDatum(opcentry->opcfamily));
1304
		ScanKeyInit(&skey[1],
1305
					Anum_pg_amproc_amproclefttype,
1306
					BTEqualStrategyNumber, F_OIDEQ,
1307 1308 1309 1310 1311
					ObjectIdGetDatum(opcentry->opcintype));
		ScanKeyInit(&skey[2],
					Anum_pg_amproc_amprocrighttype,
					BTEqualStrategyNumber, F_OIDEQ,
					ObjectIdGetDatum(opcentry->opcintype));
1312 1313
		rel = heap_open(AccessMethodProcedureRelationId, AccessShareLock);
		scan = systable_beginscan(rel, AccessMethodProcedureIndexId, indexOK,
1314
								  SnapshotNow, 3, skey);
1315 1316

		while (HeapTupleIsValid(htup = systable_getnext(scan)))
1317 1318 1319 1320 1321
		{
			Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);

			if (amprocform->amprocnum <= 0 ||
				(StrategyNumber) amprocform->amprocnum > numSupport)
1322
				elog(ERROR, "invalid amproc number %d for opclass %u",
1323 1324 1325 1326 1327 1328
					 amprocform->amprocnum, operatorClassOid);

			opcentry->supportProcs[amprocform->amprocnum - 1] =
				amprocform->amproc;
		}

1329 1330
		systable_endscan(scan);
		heap_close(rel, AccessShareLock);
1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341
	}

	opcentry->valid = true;
	return opcentry;
}


/*
 *		formrdesc
 *
 *		This is a special cut-down version of RelationBuildDesc()
1342
 *		used by RelationCacheInitializePhase2() in initializing the relcache.
1343
 *		The relation descriptor is built just from the supplied parameters,
1344 1345
 *		without actually looking at any system table entries.  We cheat
 *		quite a lot since we only need to work for a few basic system
1346 1347 1348
 *		catalogs.
 *
 * formrdesc is currently used for: pg_class, pg_attribute, pg_proc,
1349
 * and pg_type (see RelationCacheInitializePhase2).
1350
 *
1351 1352
 * Note that these catalogs can't have constraints (except attnotnull),
 * default values, rules, or triggers, since we don't cope with any of that.
1353
 *
1354
 * NOTE: we assume we are already switched into CacheMemoryContext.
1355 1356
 */
static void
1357 1358
formrdesc(const char *relationName, Oid relationReltype,
		  bool hasoids, int natts, FormData_pg_attribute *att)
1359
{
1360
	Relation	relation;
1361
	int			i;
1362
	bool		has_not_null;
1363

1364
	/*
1365
	 * allocate new relation desc, clear all fields of reldesc
1366
	 */
1367
	relation = (Relation) palloc0(sizeof(RelationData));
1368 1369 1370
	relation->rd_targblock = InvalidBlockNumber;

	/* make sure relation is marked as having no open file yet */
1371
	relation->rd_smgr = NULL;
1372

1373
	/*
1374
	 * initialize reference count: 1 because it is nailed in cache
1375
	 */
1376
	relation->rd_refcnt = 1;
1377

1378
	/*
B
Bruce Momjian 已提交
1379 1380
	 * all entries built with this routine are nailed-in-cache; none are for
	 * new or temp relations.
1381
	 */
1382
	relation->rd_isnailed = true;
1383
	relation->rd_createSubid = InvalidSubTransactionId;
1384
	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1385
	relation->rd_istemp = false;
1386

1387
	/*
B
Bruce Momjian 已提交
1388
	 * initialize relation tuple form
1389
	 *
1390 1391
	 * The data we insert here is pretty incomplete/bogus, but it'll serve to
	 * get us launched.  RelationCacheInitializePhase2() will read the real
1392 1393 1394
	 * data from pg_class and replace what we've done here.  Note in particular
	 * that relowner is left as zero; this cues RelationCacheInitializePhase2
	 * that the real data isn't there yet.
1395
	 */
1396
	relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
1397

1398 1399
	namestrcpy(&relation->rd_rel->relname, relationName);
	relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
1400
	relation->rd_rel->reltype = relationReltype;
1401 1402

	/*
B
Bruce Momjian 已提交
1403 1404 1405
	 * It's important to distinguish between shared and non-shared relations,
	 * even at bootstrap time, to make sure we know where they are stored.	At
	 * present, all relations that formrdesc is used for are not shared.
1406
	 */
1407
	relation->rd_rel->relisshared = false;
1408

1409 1410
	relation->rd_rel->relpages = 1;
	relation->rd_rel->reltuples = 1;
1411
	relation->rd_rel->relkind = RELKIND_RELATION;
1412
	relation->rd_rel->relhasoids = hasoids;
1413
	relation->rd_rel->relnatts = (int16) natts;
1414

1415
	/*
B
Bruce Momjian 已提交
1416
	 * initialize attribute tuple form
1417
	 *
B
Bruce Momjian 已提交
1418
	 * Unlike the case with the relation tuple, this data had better be right
B
Bruce Momjian 已提交
1419 1420
	 * because it will never be replaced.  The input values must be correctly
	 * defined by macros in src/include/catalog/ headers.
1421
	 */
1422
	relation->rd_att = CreateTemplateTupleDesc(natts, hasoids);
1423 1424
	relation->rd_att->tdrefcount = 1;	/* mark as refcounted */

1425 1426
	relation->rd_att->tdtypeid = relationReltype;
	relation->rd_att->tdtypmod = -1;	/* unnecessary, but... */
1427

1428
	/*
B
Bruce Momjian 已提交
1429
	 * initialize tuple desc info
1430
	 */
1431
	has_not_null = false;
1432 1433
	for (i = 0; i < natts; i++)
	{
1434 1435
		memcpy(relation->rd_att->attrs[i],
			   &att[i],
1436
			   ATTRIBUTE_TUPLE_SIZE);
1437
		has_not_null |= att[i].attnotnull;
1438 1439
		/* make sure attcacheoff is valid */
		relation->rd_att->attrs[i]->attcacheoff = -1;
1440 1441
	}

1442 1443 1444
	/* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
	relation->rd_att->attrs[0]->attcacheoff = 0;

1445 1446 1447 1448 1449 1450 1451 1452 1453
	/* mark not-null status */
	if (has_not_null)
	{
		TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

		constr->has_not_null = true;
		relation->rd_att->constr = constr;
	}

1454
	/*
1455
	 * initialize relation id from info in att array (my, this is ugly)
1456
	 */
1457
	RelationGetRelid(relation) = relation->rd_att->attrs[0]->attrelid;
1458
	relation->rd_rel->relfilenode = RelationGetRelid(relation);
1459

1460
	/*
1461
	 * initialize the relation lock manager information
1462 1463 1464
	 */
	RelationInitLockInfo(relation);		/* see lmgr.c */

1465 1466 1467 1468
	/*
	 * initialize physical addressing information for the relation
	 */
	RelationInitPhysicalAddr(relation);
1469

1470
	/*
B
Bruce Momjian 已提交
1471
	 * initialize the rel-has-index flag, using hardwired knowledge
1472
	 */
1473 1474 1475 1476 1477 1478
	if (IsBootstrapProcessingMode())
	{
		/* In bootstrap mode, we have no indexes */
		relation->rd_rel->relhasindex = false;
	}
	else
1479
	{
1480 1481
		/* Otherwise, all the rels formrdesc is used for have indexes */
		relation->rd_rel->relhasindex = true;
1482 1483
	}

1484
	/*
B
Bruce Momjian 已提交
1485
	 * add new reldesc to relcache
1486
	 */
1487
	RelationCacheInsert(relation);
1488 1489 1490

	/* It's fully valid */
	relation->rd_isvalid = true;
1491 1492 1493 1494
}


/* ----------------------------------------------------------------
1495
 *				 Relation Descriptor Lookup Interface
1496 1497 1498
 * ----------------------------------------------------------------
 */

1499
/*
1500
 *		RelationIdGetRelation
1501
 *
1502
 *		Lookup a reldesc by OID; make one if not already in cache.
1503
 *
1504 1505 1506
 *		Returns NULL if no pg_class row could be found for the given relid
 *		(suggesting we are trying to access a just-deleted relation).
 *		Any other error is reported via elog.
1507
 *
1508 1509 1510 1511
 *		NB: caller should already have at least AccessShareLock on the
 *		relation ID, else there are nasty race conditions.
 *
 *		NB: relation ref count is incremented, or set to 1 if new entry.
1512 1513
 *		Caller should eventually decrement count.  (Usually,
 *		that happens by calling RelationClose().)
1514 1515
 */
Relation
1516
RelationIdGetRelation(Oid relationId)
1517
{
1518
	Relation	rd;
1519

1520 1521 1522
	/*
	 * first try to find reldesc in the cache
	 */
1523 1524 1525
	RelationIdCacheLookup(relationId, rd);

	if (RelationIsValid(rd))
1526
	{
1527
		RelationIncrementReferenceCount(rd);
1528
		/* revalidate cache entry if necessary */
1529
		if (!rd->rd_isvalid)
1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540
		{
			/*
			 * Indexes only have a limited number of possible schema changes,
			 * and we don't want to use the full-blown procedure because it's
			 * a headache for indexes that reload itself depends on.
			 */
			if (rd->rd_rel->relkind == RELKIND_INDEX)
				RelationReloadIndexInfo(rd);
			else
				RelationClearRelation(rd, true);
		}
1541
		return rd;
1542
	}
1543

1544
	/*
B
Bruce Momjian 已提交
1545 1546
	 * no reldesc in the cache, so have RelationBuildDesc() build one and add
	 * it.
1547
	 */
1548
	rd = RelationBuildDesc(relationId, true);
1549 1550
	if (RelationIsValid(rd))
		RelationIncrementReferenceCount(rd);
1551 1552 1553 1554
	return rd;
}

/* ----------------------------------------------------------------
1555
 *				cache invalidation support routines
1556 1557 1558
 * ----------------------------------------------------------------
 */

1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588
/*
 * RelationIncrementReferenceCount
 *		Increments relation reference count.
 *
 * Note: bootstrap mode has its own weird ideas about relation refcount
 * behavior; we ought to fix it someday, but for now, just disable
 * reference count ownership tracking in bootstrap mode.
 */
void
RelationIncrementReferenceCount(Relation rel)
{
	ResourceOwnerEnlargeRelationRefs(CurrentResourceOwner);
	rel->rd_refcnt += 1;
	if (!IsBootstrapProcessingMode())
		ResourceOwnerRememberRelationRef(CurrentResourceOwner, rel);
}

/*
 * RelationDecrementReferenceCount
 *		Decrements relation reference count.
 */
void
RelationDecrementReferenceCount(Relation rel)
{
	Assert(rel->rd_refcnt > 0);
	rel->rd_refcnt -= 1;
	if (!IsBootstrapProcessingMode())
		ResourceOwnerForgetRelationRef(CurrentResourceOwner, rel);
}

1589
/*
1590 1591
 * RelationClose - close an open relation
 *
1592 1593 1594 1595 1596 1597 1598
 *	Actually, we just decrement the refcount.
 *
 *	NOTE: if compiled with -DRELCACHE_FORCE_RELEASE then relcache entries
 *	will be freed as soon as their refcount goes to zero.  In combination
 *	with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
 *	to catch references to already-released relcache entries.  It slows
 *	things down quite a bit, however.
1599 1600 1601 1602
 */
void
RelationClose(Relation relation)
{
1603 1604
	/* Note: no locking manipulations needed */
	RelationDecrementReferenceCount(relation);
1605 1606

#ifdef RELCACHE_FORCE_RELEASE
1607
	if (RelationHasReferenceCountZero(relation) &&
1608 1609
		relation->rd_createSubid == InvalidSubTransactionId &&
		relation->rd_newRelfilenodeSubid == InvalidSubTransactionId)
1610 1611
		RelationClearRelation(relation, false);
#endif
1612 1613
}

1614
/*
1615
 * RelationReloadIndexInfo - reload minimal information for an open index
1616
 *
1617 1618 1619 1620 1621 1622 1623
 *	This function is used only for indexes.  A relcache inval on an index
 *	can mean that its pg_class or pg_index row changed.  There are only
 *	very limited changes that are allowed to an existing index's schema,
 *	so we can update the relcache entry without a complete rebuild; which
 *	is fortunate because we can't rebuild an index entry that is "nailed"
 *	and/or in active use.  We support full replacement of the pg_class row,
 *	as well as updates of a few simple fields of the pg_index row.
1624
 *
1625
 *	We can't necessarily reread the catalog rows right away; we might be
1626 1627
 *	in a failed transaction when we receive the SI notification.  If so,
 *	RelationClearRelation just marks the entry as invalid by setting
1628
 *	rd_isvalid to false.  This routine is called to fix the entry when it
1629
 *	is next needed.
1630 1631 1632 1633
 *
 *	We assume that at the time we are called, we have at least AccessShareLock
 *	on the target index.  (Note: in the calls from RelationClearRelation,
 *	this is legitimate because we know the rel has positive refcount.)
H
Hiroshi Inoue 已提交
1634 1635
 */
static void
1636
RelationReloadIndexInfo(Relation relation)
H
Hiroshi Inoue 已提交
1637
{
1638
	bool		indexOK;
H
Hiroshi Inoue 已提交
1639
	HeapTuple	pg_class_tuple;
B
Bruce Momjian 已提交
1640
	Form_pg_class relp;
H
Hiroshi Inoue 已提交
1641

1642 1643 1644 1645 1646
	/* Should be called only for invalidated indexes */
	Assert(relation->rd_rel->relkind == RELKIND_INDEX &&
		   !relation->rd_isvalid);
	/* Should be closed at smgr level */
	Assert(relation->rd_smgr == NULL);
B
Bruce Momjian 已提交
1647

1648
	/*
1649 1650
	 * Read the pg_class row
	 *
1651 1652
	 * Don't try to use an indexscan of pg_class_oid_index to reload the info
	 * for pg_class_oid_index ...
1653
	 */
1654 1655
	indexOK = (RelationGetRelid(relation) != ClassOidIndexId);
	pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK);
H
Hiroshi Inoue 已提交
1656
	if (!HeapTupleIsValid(pg_class_tuple))
1657
		elog(ERROR, "could not find pg_class tuple for index %u",
1658
			 RelationGetRelid(relation));
H
Hiroshi Inoue 已提交
1659
	relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
1660
	memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
1661
	/* Reload reloptions in case they changed */
B
Bruce Momjian 已提交
1662 1663
	if (relation->rd_options)
		pfree(relation->rd_options);
1664 1665
	RelationParseRelOptions(relation, pg_class_tuple);
	/* done with pg_class tuple */
H
Hiroshi Inoue 已提交
1666
	heap_freetuple(pg_class_tuple);
1667 1668 1669
	/* We must recalculate physical address in case it changed */
	RelationInitPhysicalAddr(relation);
	/* Make sure targblock is reset in case rel was truncated */
1670
	relation->rd_targblock = InvalidBlockNumber;
1671 1672 1673 1674
	/* Must free any AM cached data, too */
	if (relation->rd_amcache)
		pfree(relation->rd_amcache);
	relation->rd_amcache = NULL;
1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692

	/*
	 * For a non-system index, there are fields of the pg_index row that are
	 * allowed to change, so re-read that row and update the relcache entry.
	 * Most of the info derived from pg_index (such as support function lookup
	 * info) cannot change, and indeed the whole point of this routine is to
	 * update the relcache entry without clobbering that data; so wholesale
	 * replacement is not appropriate.
	 */
	if (!IsSystemRelation(relation))
	{
		HeapTuple	tuple;
		Form_pg_index index;

		tuple = SearchSysCache(INDEXRELID,
							   ObjectIdGetDatum(RelationGetRelid(relation)),
							   0, 0, 0);
		if (!HeapTupleIsValid(tuple))
B
Bruce Momjian 已提交
1693 1694
			elog(ERROR, "cache lookup failed for index %u",
				 RelationGetRelid(relation));
1695 1696 1697
		index = (Form_pg_index) GETSTRUCT(tuple);

		relation->rd_index->indisvalid = index->indisvalid;
1698 1699 1700 1701
		relation->rd_index->indcheckxmin = index->indcheckxmin;
		relation->rd_index->indisready = index->indisready;
		HeapTupleHeaderSetXmin(relation->rd_indextuple->t_data,
							   HeapTupleHeaderGetXmin(tuple->t_data));
1702 1703 1704 1705

		ReleaseSysCache(tuple);
	}

1706
	/* Okay, now it's valid again */
1707
	relation->rd_isvalid = true;
H
Hiroshi Inoue 已提交
1708
}
1709

1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753
/*
 * RelationDestroyRelation
 *
 *	Physically delete a relation cache entry and all subsidiary data.
 *	Caller must already have unhooked the entry from the hash table.
 */
static void
RelationDestroyRelation(Relation relation)
{
	Assert(RelationHasReferenceCountZero(relation));

	/*
	 * Make sure smgr and lower levels close the relation's files, if they
	 * weren't closed already.  (This was probably done by caller, but let's
	 * just be real sure.)
	 */
	RelationCloseSmgr(relation);

	/*
	 * Free all the subsidiary data structures of the relcache entry,
	 * then the entry itself.
	 */
	if (relation->rd_rel)
		pfree(relation->rd_rel);
	/* can't use DecrTupleDescRefCount here */
	Assert(relation->rd_att->tdrefcount > 0);
	if (--relation->rd_att->tdrefcount == 0)
		FreeTupleDesc(relation->rd_att);
	list_free(relation->rd_indexlist);
	bms_free(relation->rd_indexattr);
	FreeTriggerDesc(relation->trigdesc);
	if (relation->rd_options)
		pfree(relation->rd_options);
	if (relation->rd_indextuple)
		pfree(relation->rd_indextuple);
	if (relation->rd_am)
		pfree(relation->rd_am);
	if (relation->rd_indexcxt)
		MemoryContextDelete(relation->rd_indexcxt);
	if (relation->rd_rulescxt)
		MemoryContextDelete(relation->rd_rulescxt);
	pfree(relation);
}

1754
/*
1755
 * RelationClearRelation
1756
 *
1757 1758
 *	 Physically blow away a relation cache entry, or reset it and rebuild
 *	 it from scratch (that is, from catalog entries).  The latter path is
1759 1760
 *	 used when we are notified of a change to an open relation (one with
 *	 refcount > 0).
1761
 *
1762 1763 1764 1765 1766 1767 1768 1769 1770 1771
 *	 NB: when rebuilding, we'd better hold some lock on the relation,
 *	 else the catalog data we need to read could be changing under us.
 *	 Also, a rel to be rebuilt had better have refcnt > 0.  This is because
 *	 an sinval reset could happen while we're accessing the catalogs, and
 *	 the rel would get blown away underneath us by RelationCacheInvalidate
 *	 if it has zero refcnt.
 *
 *	 The "rebuild" parameter is redundant in current usage because it has
 *	 to match the relation's refcnt status, but we keep it as a crosscheck
 *	 that we're doing what the caller expects.
1772
 */
1773
static void
1774
RelationClearRelation(Relation relation, bool rebuild)
1775
{
1776 1777 1778 1779 1780 1781 1782 1783
	/*
	 * As per notes above, a rel to be rebuilt MUST have refcnt > 0; while
	 * of course it would be a bad idea to blow away one with nonzero refcnt.
	 */
	Assert(rebuild ?
		   !RelationHasReferenceCountZero(relation) :
		   RelationHasReferenceCountZero(relation));

1784
	/*
1785
	 * Make sure smgr and lower levels close the relation's files, if they
B
Bruce Momjian 已提交
1786 1787 1788 1789
	 * weren't closed already.  If the relation is not getting deleted, the
	 * next smgr access should reopen the files automatically.	This ensures
	 * that the low-level file access state is updated after, say, a vacuum
	 * truncation.
1790
	 */
1791
	RelationCloseSmgr(relation);
1792

1793
	/*
B
Bruce Momjian 已提交
1794 1795 1796 1797
	 * Never, never ever blow away a nailed-in system relation, because we'd
	 * be unable to recover.  However, we must reset rd_targblock, in case we
	 * got called because of a relation cache flush that was triggered by
	 * VACUUM.
1798
	 *
1799 1800 1801
	 * If it's a nailed index, then we need to re-read the pg_class row to see
	 * if its relfilenode changed.	We can't necessarily do that here, because
	 * we might be in a failed transaction.  We assume it's okay to do it if
B
Bruce Momjian 已提交
1802 1803 1804
	 * there are open references to the relcache entry (cf notes for
	 * AtEOXact_RelationCache).  Otherwise just mark the entry as possibly
	 * invalid, and it'll be fixed when next opened.
1805 1806
	 */
	if (relation->rd_isnailed)
H
Hiroshi Inoue 已提交
1807
	{
1808
		relation->rd_targblock = InvalidBlockNumber;
1809 1810
		if (relation->rd_rel->relkind == RELKIND_INDEX)
		{
B
Bruce Momjian 已提交
1811
			relation->rd_isvalid = false;		/* needs to be revalidated */
1812
			if (relation->rd_refcnt > 1)
1813
				RelationReloadIndexInfo(relation);
1814
		}
1815
		return;
H
Hiroshi Inoue 已提交
1816
	}
1817

1818 1819 1820 1821
	/*
	 * Even non-system indexes should not be blown away if they are open and
	 * have valid index support information.  This avoids problems with active
	 * use of the index support information.  As with nailed indexes, we
B
Bruce Momjian 已提交
1822
	 * re-read the pg_class row to handle possible physical relocation of the
1823
	 * index, and we check for pg_index updates too.
1824 1825 1826 1827 1828
	 */
	if (relation->rd_rel->relkind == RELKIND_INDEX &&
		relation->rd_refcnt > 0 &&
		relation->rd_indexcxt != NULL)
	{
B
Bruce Momjian 已提交
1829
		relation->rd_isvalid = false;	/* needs to be revalidated */
1830
		RelationReloadIndexInfo(relation);
1831 1832 1833
		return;
	}

1834 1835
	/* Mark it invalid until we've finished rebuild */
	relation->rd_isvalid = false;
1836

1837
	/*
1838 1839
	 * Clear out catcache's entries for this relation.  This is a bit of
	 * a hack, but it's a convenient place to do it.
1840
	 */
1841
	CatalogCacheFlushRelation(RelationGetRelid(relation));
1842

1843
	/*
1844
	 * If we're really done with the relcache entry, blow it away. But if
B
Bruce Momjian 已提交
1845 1846 1847
	 * someone is still using it, reconstruct the whole deal without moving
	 * the physical RelationData record (so that the someone's pointer is
	 * still valid).
1848
	 */
1849
	if (!rebuild)
1850
	{
1851 1852 1853 1854 1855
		/* Remove it from the hash table */
		RelationCacheDelete(relation);

		/* And release storage */
		RelationDestroyRelation(relation);
1856 1857 1858
	}
	else
	{
1859
		/*
1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882
		 * Our strategy for rebuilding an open relcache entry is to build
		 * a new entry from scratch, swap its contents with the old entry,
		 * and finally delete the new entry (along with any infrastructure
		 * swapped over from the old entry).  This is to avoid trouble in case
		 * an error causes us to lose control partway through.  The old entry
		 * will still be marked !rd_isvalid, so we'll try to rebuild it again
		 * on next access.  Meanwhile it's not any less valid than it was
		 * before, so any code that might expect to continue accessing it
		 * isn't hurt by the rebuild failure.  (Consider for example a
		 * subtransaction that ALTERs a table and then gets cancelled partway
		 * through the cache entry rebuild.  The outer transaction should
		 * still see the not-modified cache entry as valid.)  The worst
		 * consequence of an error is leaking the necessarily-unreferenced
		 * new entry, and this shouldn't happen often enough for that to be
		 * a big problem.
		 *
		 * When rebuilding an open relcache entry, we must preserve ref count
		 * and rd_createSubid/rd_newRelfilenodeSubid state.  Also attempt to
		 * preserve the pg_class entry (rd_rel), tupledesc, and rewrite-rule
		 * substructures in place, because various places assume that these
		 * structures won't move while they are working with an open relcache
		 * entry.  (Note: the refcount mechanism for tupledescs might someday
		 * allow us to remove this hack for the tupledesc.)
1883
		 *
1884 1885
		 * Note that this process does not touch CurrentResourceOwner; which
		 * is good because whatever ref counts the entry may have do not
B
Bruce Momjian 已提交
1886
		 * necessarily belong to that resource owner.
1887
		 */
1888
		Relation	newrel;
1889
		Oid			save_relid = RelationGetRelid(relation);
1890 1891 1892 1893 1894 1895
		bool		keep_tupdesc;
		bool		keep_rules;

		/* Build temporary entry, but don't link it into hashtable */
		newrel = RelationBuildDesc(save_relid, false);
		if (newrel == NULL)
1896
		{
1897
			/* Should only get here if relation was deleted */
1898 1899
			RelationCacheDelete(relation);
			RelationDestroyRelation(relation);
1900
			elog(ERROR, "relation %u deleted while still in use", save_relid);
1901
		}
1902

1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923
		keep_tupdesc = equalTupleDescs(relation->rd_att, newrel->rd_att);
		keep_rules = equalRuleLocks(relation->rd_rules, newrel->rd_rules);

		/*
		 * Perform swapping of the relcache entry contents.  Within this
		 * process the old entry is momentarily invalid, so there *must*
		 * be no possibility of CHECK_FOR_INTERRUPTS within this sequence.
		 * Do it in all-in-line code for safety.
		 *
		 * Since the vast majority of fields should be swapped, our method
		 * is to swap the whole structures and then re-swap those few fields
		 * we didn't want swapped.
		 */
#define SWAPFIELD(fldtype, fldname) \
		do { \
			fldtype _tmp = newrel->fldname; \
			newrel->fldname = relation->fldname; \
			relation->fldname = _tmp; \
		} while (0)

		/* swap all Relation struct fields */
1924
		{
1925 1926 1927 1928 1929
			RelationData tmpstruct;

			memcpy(&tmpstruct, newrel, sizeof(RelationData));
			memcpy(newrel, relation, sizeof(RelationData));
			memcpy(relation, &tmpstruct, sizeof(RelationData));
1930
		}
1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948

		/* rd_smgr must not be swapped, due to back-links from smgr level */
		SWAPFIELD(SMgrRelation, rd_smgr);
		/* rd_refcnt must be preserved */
		SWAPFIELD(int, rd_refcnt);
		/* isnailed shouldn't change */
		Assert(newrel->rd_isnailed == relation->rd_isnailed);
		/* creation sub-XIDs must be preserved */
		SWAPFIELD(SubTransactionId, rd_createSubid);
		SWAPFIELD(SubTransactionId, rd_newRelfilenodeSubid);
		/* un-swap rd_rel pointers, swap contents instead */
		SWAPFIELD(Form_pg_class, rd_rel);
		/* ... but actually, we don't have to update newrel->rd_rel */
		memcpy(relation->rd_rel, newrel->rd_rel, CLASS_TUPLE_SIZE);
		/* preserve old tupledesc and rules if no logical change */
		if (keep_tupdesc)
			SWAPFIELD(TupleDesc, rd_att);
		if (keep_rules)
1949
		{
1950 1951
			SWAPFIELD(RuleLock *, rd_rules);
			SWAPFIELD(MemoryContext, rd_rulescxt);
1952
		}
1953 1954 1955 1956 1957 1958 1959
		/* pgstat_info must be preserved */
		SWAPFIELD(struct PgStat_TableStatus *, pgstat_info);

#undef SWAPFIELD

		/* And now we can throw away the temporary entry */
		RelationDestroyRelation(newrel);
1960
	}
1961 1962
}

1963
/*
1964 1965 1966 1967 1968
 * RelationFlushRelation
 *
 *	 Rebuild the relation if it is open (refcount > 0), else blow it away.
 */
static void
1969
RelationFlushRelation(Relation relation)
1970
{
1971 1972
	if (relation->rd_createSubid != InvalidSubTransactionId ||
		relation->rd_newRelfilenodeSubid != InvalidSubTransactionId)
1973 1974
	{
		/*
1975 1976
		 * New relcache entries are always rebuilt, not flushed; else we'd
		 * forget the "new" status of the relation, which is a useful
1977
		 * optimization to have.  Ditto for the new-relfilenode status.
1978 1979 1980 1981
		 *
		 * The rel could have zero refcnt here, so temporarily increment
		 * the refcnt to ensure it's safe to rebuild it.  We can assume that
		 * the current transaction has some lock on the rel already.
1982
		 */
1983 1984 1985
		RelationIncrementReferenceCount(relation);
		RelationClearRelation(relation, true);
		RelationDecrementReferenceCount(relation);
1986 1987 1988 1989
	}
	else
	{
		/*
1990
		 * Pre-existing rels can be dropped from the relcache if not open.
1991
		 */
1992
		bool	rebuild = !RelationHasReferenceCountZero(relation);
1993

1994 1995
		RelationClearRelation(relation, rebuild);
	}
1996 1997
}

1998
/*
1999
 * RelationForgetRelation - unconditionally remove a relcache entry
2000
 *
2001 2002
 *		   External interface for destroying a relcache entry when we
 *		   drop the relation.
2003 2004
 */
void
2005
RelationForgetRelation(Oid rid)
2006
{
2007
	Relation	relation;
2008 2009 2010

	RelationIdCacheLookup(rid, relation);

2011 2012 2013 2014
	if (!PointerIsValid(relation))
		return;					/* not in cache, nothing to do */

	if (!RelationHasReferenceCountZero(relation))
2015
		elog(ERROR, "relation %u is still open", rid);
2016 2017 2018

	/* Unconditionally destroy the relcache entry */
	RelationClearRelation(relation, false);
2019 2020
}

2021
/*
2022
 *		RelationCacheInvalidateEntry
2023 2024 2025
 *
 *		This routine is invoked for SI cache flush messages.
 *
2026 2027
 * Any relcache entry matching the relid must be flushed.  (Note: caller has
 * already determined that the relid belongs to our database or is a shared
2028
 * relation.)
2029 2030 2031 2032 2033 2034
 *
 * We used to skip local relations, on the grounds that they could
 * not be targets of cross-backend SI update messages; but it seems
 * safer to process them, so that our *own* SI update messages will
 * have the same effects during CommandCounterIncrement for both
 * local and nonlocal relations.
2035 2036
 */
void
2037
RelationCacheInvalidateEntry(Oid relationId)
2038
{
2039
	Relation	relation;
2040 2041 2042

	RelationIdCacheLookup(relationId, relation);

2043
	if (PointerIsValid(relation))
2044
	{
2045
		relcacheInvalsReceived++;
2046
		RelationFlushRelation(relation);
2047
	}
2048 2049 2050 2051
}

/*
 * RelationCacheInvalidate
2052
 *	 Blow away cached relation descriptors that have zero reference counts,
B
Bruce Momjian 已提交
2053
 *	 and rebuild those with positive reference counts.	Also reset the smgr
2054
 *	 relation cache.
2055
 *
2056
 *	 This is currently used only to recover from SI message buffer overflow,
2057
 *	 so we do not touch new-in-transaction relations; they cannot be targets
2058 2059
 *	 of cross-backend SI updates (and our own updates now go through a
 *	 separate linked list that isn't limited by the SI message buffer size).
2060 2061
 *	 Likewise, we need not discard new-relfilenode-in-transaction hints,
 *	 since any invalidation of those would be a local event.
2062 2063 2064
 *
 *	 We do this in two phases: the first pass deletes deletable items, and
 *	 the second one rebuilds the rebuildable items.  This is essential for
2065
 *	 safety, because hash_seq_search only copes with concurrent deletion of
B
Bruce Momjian 已提交
2066
 *	 the element it is currently visiting.	If a second SI overflow were to
2067 2068 2069 2070
 *	 occur while we are walking the table, resulting in recursive entry to
 *	 this routine, we could crash because the inner invocation blows away
 *	 the entry next to be visited by the outer scan.  But this way is OK,
 *	 because (a) during the first pass we won't process any more SI messages,
2071
 *	 so hash_seq_search will complete safely; (b) during the second pass we
2072
 *	 only hold onto pointers to nondeletable entries.
2073 2074 2075 2076 2077 2078
 *
 *	 The two-phase approach also makes it easy to ensure that we process
 *	 nailed-in-cache indexes before other nondeletable items, and that we
 *	 process pg_class_oid_index first of all.  In scenarios where a nailed
 *	 index has been given a new relfilenode, we have to detect that update
 *	 before the nailed index is used in reloading any other relcache entry.
2079 2080
 */
void
2081
RelationCacheInvalidate(void)
2082
{
2083
	HASH_SEQ_STATUS status;
2084
	RelIdCacheEnt *idhentry;
2085
	Relation	relation;
2086
	List	   *rebuildFirstList = NIL;
B
Bruce Momjian 已提交
2087
	List	   *rebuildList = NIL;
2088
	ListCell   *l;
2089 2090

	/* Phase 1 */
2091
	hash_seq_init(&status, RelationIdCache);
2092

2093
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2094
	{
2095
		relation = idhentry->reldesc;
2096

2097
		/* Must close all smgr references to avoid leaving dangling ptrs */
2098
		RelationCloseSmgr(relation);
2099

2100
		/* Ignore new relations, since they are never SI targets */
2101
		if (relation->rd_createSubid != InvalidSubTransactionId)
2102
			continue;
2103

2104 2105
		relcacheInvalsReceived++;

2106
		if (RelationHasReferenceCountZero(relation))
2107 2108
		{
			/* Delete this entry immediately */
2109
			Assert(!relation->rd_isnailed);
2110 2111 2112 2113
			RelationClearRelation(relation, false);
		}
		else
		{
2114 2115
			/*
			 * Add this entry to list of stuff to rebuild in second pass.
B
Bruce Momjian 已提交
2116 2117
			 * pg_class_oid_index goes on the front of rebuildFirstList, other
			 * nailed indexes on the back, and everything else into
2118 2119 2120 2121 2122
			 * rebuildList (in no particular order).
			 */
			if (relation->rd_isnailed &&
				relation->rd_rel->relkind == RELKIND_INDEX)
			{
2123
				if (RelationGetRelid(relation) == ClassOidIndexId)
2124 2125 2126 2127 2128 2129
					rebuildFirstList = lcons(relation, rebuildFirstList);
				else
					rebuildFirstList = lappend(rebuildFirstList, relation);
			}
			else
				rebuildList = lcons(relation, rebuildList);
2130
		}
2131
	}
2132

2133
	/*
B
Bruce Momjian 已提交
2134 2135 2136
	 * Now zap any remaining smgr cache entries.  This must happen before we
	 * start to rebuild entries, since that may involve catalog fetches which
	 * will re-open catalog files.
2137 2138 2139
	 */
	smgrcloseall();

2140
	/* Phase 2: rebuild the items found to need rebuild in phase 1 */
2141 2142 2143 2144 2145 2146
	foreach(l, rebuildFirstList)
	{
		relation = (Relation) lfirst(l);
		RelationClearRelation(relation, true);
	}
	list_free(rebuildFirstList);
2147
	foreach(l, rebuildList)
2148
	{
2149 2150
		relation = (Relation) lfirst(l);
		RelationClearRelation(relation, true);
2151
	}
2152
	list_free(rebuildList);
2153
}
2154

2155
/*
2156
 * AtEOXact_RelationCache
2157
 *
2158
 *	Clean up the relcache at main-transaction commit or abort.
2159 2160 2161 2162 2163
 *
 * Note: this must be called *before* processing invalidation messages.
 * In the case of abort, we don't want to try to rebuild any invalidated
 * cache entries (since we can't safely do database accesses).  Therefore
 * we must reset refcnts before handling pending invalidations.
2164 2165 2166 2167 2168 2169
 *
 * As of PostgreSQL 8.1, relcache refcnts should get released by the
 * ResourceOwner mechanism.  This routine just does a debugging
 * cross-check that no pins remain.  However, we also need to do special
 * cleanup when the current transaction created any relations or made use
 * of forced index lists.
2170 2171
 */
void
2172
AtEOXact_RelationCache(bool isCommit)
2173
{
2174
	HASH_SEQ_STATUS status;
2175
	RelIdCacheEnt *idhentry;
2176

2177 2178
	/*
	 * To speed up transaction exit, we want to avoid scanning the relcache
B
Bruce Momjian 已提交
2179 2180 2181 2182
	 * unless there is actually something for this routine to do.  Other than
	 * the debug-only Assert checks, most transactions don't create any work
	 * for us to do here, so we keep a static flag that gets set if there is
	 * anything to do.	(Currently, this means either a relation is created in
2183
	 * the current xact, or one is given a new relfilenode, or an index list
B
Bruce Momjian 已提交
2184
	 * is forced.)	For simplicity, the flag remains set till end of top-level
2185 2186
	 * transaction, even though we could clear it at subtransaction end in
	 * some cases.
2187 2188 2189 2190 2191 2192 2193 2194
	 */
	if (!need_eoxact_work
#ifdef USE_ASSERT_CHECKING
		&& !assert_enabled
#endif
		)
		return;

2195
	hash_seq_init(&status, RelationIdCache);
2196

2197
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2198
	{
2199
		Relation	relation = idhentry->reldesc;
2200 2201 2202 2203 2204

		/*
		 * The relcache entry's ref count should be back to its normal
		 * not-in-a-transaction state: 0 unless it's nailed in cache.
		 *
B
Bruce Momjian 已提交
2205 2206 2207
		 * In bootstrap mode, this is NOT true, so don't check it --- the
		 * bootstrap code expects relations to stay open across start/commit
		 * transaction calls.  (That seems bogus, but it's not worth fixing.)
2208 2209 2210 2211 2212 2213 2214 2215 2216 2217
		 */
#ifdef USE_ASSERT_CHECKING
		if (!IsBootstrapProcessingMode())
		{
			int			expected_refcnt;

			expected_refcnt = relation->rd_isnailed ? 1 : 0;
			Assert(relation->rd_refcnt == expected_refcnt);
		}
#endif
2218

2219 2220 2221
		/*
		 * Is it a relation created in the current transaction?
		 *
B
Bruce Momjian 已提交
2222 2223 2224 2225 2226 2227
		 * During commit, reset the flag to zero, since we are now out of the
		 * creating transaction.  During abort, simply delete the relcache
		 * entry --- it isn't interesting any longer.  (NOTE: if we have
		 * forgotten the new-ness of a new relation due to a forced cache
		 * flush, the entry will get deleted anyway by shared-cache-inval
		 * processing of the aborted pg_class insertion.)
2228
		 */
2229
		if (relation->rd_createSubid != InvalidSubTransactionId)
2230
		{
2231
			if (isCommit)
2232
				relation->rd_createSubid = InvalidSubTransactionId;
2233 2234 2235 2236 2237 2238
			else
			{
				RelationClearRelation(relation, false);
				continue;
			}
		}
2239 2240 2241 2242

		/*
		 * Likewise, reset the hint about the relfilenode being new.
		 */
2243
		relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
2244

2245 2246 2247 2248 2249
		/*
		 * Flush any temporary index list.
		 */
		if (relation->rd_indexvalid == 2)
		{
2250
			list_free(relation->rd_indexlist);
2251
			relation->rd_indexlist = NIL;
2252
			relation->rd_oidindex = InvalidOid;
2253 2254
			relation->rd_indexvalid = 0;
		}
2255
	}
2256

2257 2258
	/* Once done with the transaction, we can reset need_eoxact_work */
	need_eoxact_work = false;
2259
}
2260

2261 2262 2263 2264 2265 2266 2267 2268
/*
 * AtEOSubXact_RelationCache
 *
 *	Clean up the relcache at sub-transaction commit or abort.
 *
 * Note: this must be called *before* processing invalidation messages.
 */
void
2269 2270
AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
						  SubTransactionId parentSubid)
2271 2272 2273 2274
{
	HASH_SEQ_STATUS status;
	RelIdCacheEnt *idhentry;

2275
	/*
2276 2277
	 * Skip the relcache scan if nothing to do --- see notes for
	 * AtEOXact_RelationCache.
2278
	 */
2279
	if (!need_eoxact_work)
2280 2281
		return;

2282 2283 2284 2285 2286 2287 2288 2289 2290
	hash_seq_init(&status, RelationIdCache);

	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
	{
		Relation	relation = idhentry->reldesc;

		/*
		 * Is it a relation created in the current subtransaction?
		 *
2291 2292
		 * During subcommit, mark it as belonging to the parent, instead.
		 * During subabort, simply delete the relcache entry.
2293
		 */
2294
		if (relation->rd_createSubid == mySubid)
2295 2296
		{
			if (isCommit)
2297
				relation->rd_createSubid = parentSubid;
2298 2299 2300 2301 2302 2303
			else
			{
				RelationClearRelation(relation, false);
				continue;
			}
		}
2304 2305

		/*
B
Bruce Momjian 已提交
2306 2307
		 * Likewise, update or drop any new-relfilenode-in-subtransaction
		 * hint.
2308
		 */
2309 2310 2311 2312 2313
		if (relation->rd_newRelfilenodeSubid == mySubid)
		{
			if (isCommit)
				relation->rd_newRelfilenodeSubid = parentSubid;
			else
2314
				relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
2315
		}
2316 2317 2318 2319 2320 2321 2322 2323

		/*
		 * Flush any temporary index list.
		 */
		if (relation->rd_indexvalid == 2)
		{
			list_free(relation->rd_indexlist);
			relation->rd_indexlist = NIL;
2324
			relation->rd_oidindex = InvalidOid;
2325 2326 2327 2328 2329
			relation->rd_indexvalid = 0;
		}
	}
}

2330 2331 2332 2333
/*
 * RelationCacheMarkNewRelfilenode
 *
 *	Mark the rel as having been given a new relfilenode in the current
B
Bruce Momjian 已提交
2334
 *	(sub) transaction.	This is a hint that can be used to optimize
2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346
 *	later operations on the rel in the same transaction.
 */
void
RelationCacheMarkNewRelfilenode(Relation rel)
{
	/* Mark it... */
	rel->rd_newRelfilenodeSubid = GetCurrentSubTransactionId();
	/* ... and now we have eoxact cleanup work to do */
	need_eoxact_work = true;
}


2347
/*
2348 2349 2350
 *		RelationBuildLocalRelation
 *			Build a relcache entry for an about-to-be-created relation,
 *			and enter it into the relcache.
2351
 */
2352 2353
Relation
RelationBuildLocalRelation(const char *relname,
2354
						   Oid relnamespace,
2355
						   TupleDesc tupDesc,
2356 2357
						   Oid relid,
						   Oid reltablespace,
2358
						   bool shared_relation)
2359
{
2360
	Relation	rel;
2361
	MemoryContext oldcxt;
2362 2363
	int			natts = tupDesc->natts;
	int			i;
2364
	bool		has_not_null;
2365
	bool		nailit;
2366

2367
	AssertArg(natts >= 0);
2368

2369 2370 2371
	/*
	 * check for creation of a rel that must be nailed in cache.
	 *
2372
	 * XXX this list had better match RelationCacheInitializePhase2's list.
2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386
	 */
	switch (relid)
	{
		case RelationRelationId:
		case AttributeRelationId:
		case ProcedureRelationId:
		case TypeRelationId:
			nailit = true;
			break;
		default:
			nailit = false;
			break;
	}

2387 2388
	/*
	 * check that hardwired list of shared rels matches what's in the
B
Bruce Momjian 已提交
2389 2390 2391
	 * bootstrap .bki file.  If you get a failure here during initdb, you
	 * probably need to fix IsSharedRelation() to match whatever you've done
	 * to the set of shared relations.
2392 2393 2394 2395 2396
	 */
	if (shared_relation != IsSharedRelation(relid))
		elog(ERROR, "shared_relation flag for \"%s\" does not match IsSharedRelation(%u)",
			 relname, relid);

2397 2398 2399 2400 2401
	/*
	 * switch to the cache context to create the relcache entry.
	 */
	if (!CacheMemoryContext)
		CreateCacheMemoryContext();
2402

2403 2404
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

2405
	/*
2406
	 * allocate a new relation descriptor and fill in basic state fields.
2407
	 */
2408
	rel = (Relation) palloc0(sizeof(RelationData));
2409

2410 2411 2412
	rel->rd_targblock = InvalidBlockNumber;

	/* make sure relation is marked as having no open file yet */
2413
	rel->rd_smgr = NULL;
2414

2415 2416 2417
	/* mark it nailed if appropriate */
	rel->rd_isnailed = nailit;

2418
	rel->rd_refcnt = nailit ? 1 : 0;
2419

2420
	/* it's being created in this transaction */
2421
	rel->rd_createSubid = GetCurrentSubTransactionId();
2422
	rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
2423

2424
	/* must flag that we have rels created in this transaction */
2425
	need_eoxact_work = true;
2426

2427
	/* is it a temporary relation? */
2428
	rel->rd_istemp = isTempOrToastNamespace(relnamespace);
2429

2430
	/*
2431
	 * create a new tuple descriptor from the one passed in.  We do this
B
Bruce Momjian 已提交
2432 2433 2434 2435
	 * partly to copy it into the cache context, and partly because the new
	 * relation can't have any defaults or constraints yet; they have to be
	 * added in later steps, because they require additions to multiple system
	 * catalogs.  We can copy attnotnull constraints here, however.
2436
	 */
2437
	rel->rd_att = CreateTupleDescCopy(tupDesc);
2438
	rel->rd_att->tdrefcount = 1;	/* mark as refcounted */
2439
	has_not_null = false;
2440
	for (i = 0; i < natts; i++)
2441
	{
2442
		rel->rd_att->attrs[i]->attnotnull = tupDesc->attrs[i]->attnotnull;
2443 2444 2445 2446 2447 2448 2449 2450 2451 2452
		has_not_null |= tupDesc->attrs[i]->attnotnull;
	}

	if (has_not_null)
	{
		TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

		constr->has_not_null = true;
		rel->rd_att->constr = constr;
	}
2453 2454 2455 2456

	/*
	 * initialize relation tuple form (caller may add/override data later)
	 */
2457
	rel->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
2458

2459 2460
	namestrcpy(&rel->rd_rel->relname, relname);
	rel->rd_rel->relnamespace = relnamespace;
2461 2462

	rel->rd_rel->relkind = RELKIND_UNCATALOGED;
2463
	rel->rd_rel->relhasoids = rel->rd_att->tdhasoid;
2464 2465
	rel->rd_rel->relnatts = natts;
	rel->rd_rel->reltype = InvalidOid;
2466 2467
	/* needed when bootstrapping: */
	rel->rd_rel->relowner = BOOTSTRAP_SUPERUSERID;
2468 2469

	/*
B
Bruce Momjian 已提交
2470 2471 2472
	 * Insert relation physical and logical identifiers (OIDs) into the right
	 * places.	Note that the physical ID (relfilenode) is initially the same
	 * as the logical ID (OID).
2473
	 */
2474
	rel->rd_rel->relisshared = shared_relation;
2475 2476 2477 2478 2479 2480

	RelationGetRelid(rel) = relid;

	for (i = 0; i < natts; i++)
		rel->rd_att->attrs[i]->attrelid = relid;

2481 2482
	rel->rd_rel->relfilenode = relid;
	rel->rd_rel->reltablespace = reltablespace;
2483

2484
	RelationInitLockInfo(rel);	/* see lmgr.c */
2485

2486 2487
	RelationInitPhysicalAddr(rel);

2488 2489 2490 2491
	/*
	 * Okay to insert into the relcache hash tables.
	 */
	RelationCacheInsert(rel);
2492

2493 2494 2495
	/*
	 * done building relcache entry.
	 */
2496
	MemoryContextSwitchTo(oldcxt);
2497

2498 2499 2500
	/* It's fully valid */
	rel->rd_isvalid = true;

2501 2502 2503 2504 2505
	/*
	 * Caller expects us to pin the returned entry.
	 */
	RelationIncrementReferenceCount(rel);

2506
	return rel;
2507 2508
}

2509
/*
2510
 *		RelationCacheInitialize
2511
 *
2512 2513
 *		This initializes the relation descriptor cache.  At the time
 *		that this is invoked, we can't do database access yet (mainly
2514 2515 2516 2517 2518
 *		because the transaction subsystem is not up); all we are doing
 *		is making an empty cache hashtable.  This must be done before
 *		starting the initialization transaction, because otherwise
 *		AtEOXact_RelationCache would crash if that transaction aborts
 *		before we can get the relcache set up.
2519 2520
 */

2521
#define INITRELCACHESIZE		400
2522 2523

void
2524
RelationCacheInitialize(void)
2525
{
2526 2527
	MemoryContext oldcxt;
	HASHCTL		ctl;
2528

2529
	/*
B
Bruce Momjian 已提交
2530
	 * switch to cache memory context
2531
	 */
2532 2533
	if (!CacheMemoryContext)
		CreateCacheMemoryContext();
2534

2535
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2536

2537
	/*
2538
	 * create hashtable that indexes the relcache
2539
	 */
2540
	MemSet(&ctl, 0, sizeof(ctl));
2541
	ctl.keysize = sizeof(Oid);
2542
	ctl.entrysize = sizeof(RelIdCacheEnt);
2543
	ctl.hash = oid_hash;
2544 2545
	RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
								  &ctl, HASH_ELEM | HASH_FUNCTION);
2546

2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557
	MemoryContextSwitchTo(oldcxt);
}

/*
 *		RelationCacheInitializePhase2
 *
 *		This is called as soon as the catcache and transaction system
 *		are functional.  At this point we can actually read data from
 *		the system catalogs.  We first try to read pre-computed relcache
 *		entries from the pg_internal.init file.  If that's missing or
 *		broken, make phony entries for the minimum set of nailed-in-cache
B
Bruce Momjian 已提交
2558
 *		relations.	Then (unless bootstrapping) make sure we have entries
2559 2560 2561 2562 2563 2564 2565 2566 2567 2568
 *		for the critical system indexes.  Once we've done all this, we
 *		have enough infrastructure to open any system catalog or use any
 *		catcache.  The last step is to rewrite pg_internal.init if needed.
 */
void
RelationCacheInitializePhase2(void)
{
	HASH_SEQ_STATUS status;
	RelIdCacheEnt *idhentry;
	MemoryContext oldcxt;
B
Bruce Momjian 已提交
2569
	bool		needNewCacheFile = false;
2570

2571
	/*
2572 2573 2574 2575 2576 2577 2578 2579
	 * switch to cache memory context
	 */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

	/*
	 * Try to load the relcache cache file.  If unsuccessful, bootstrap the
	 * cache with pre-made descriptors for the critical "nailed-in" system
	 * catalogs.
2580
	 */
2581
	if (IsBootstrapProcessingMode() ||
B
Bruce Momjian 已提交
2582
		!load_relcache_init_file())
2583
	{
2584 2585
		needNewCacheFile = true;

2586
		formrdesc("pg_class", PG_CLASS_RELTYPE_OID,
2587
				  true, Natts_pg_class, Desc_pg_class);
2588
		formrdesc("pg_attribute", PG_ATTRIBUTE_RELTYPE_OID,
2589
				  false, Natts_pg_attribute, Desc_pg_attribute);
2590
		formrdesc("pg_proc", PG_PROC_RELTYPE_OID,
2591
				  true, Natts_pg_proc, Desc_pg_proc);
2592
		formrdesc("pg_type", PG_TYPE_RELTYPE_OID,
2593
				  true, Natts_pg_type, Desc_pg_type);
2594 2595 2596

#define NUM_CRITICAL_RELS	4	/* fix if you change list above */
	}
2597 2598

	MemoryContextSwitchTo(oldcxt);
2599

2600
	/* In bootstrap mode, the faked-up formrdesc info is all we'll have */
2601 2602 2603
	if (IsBootstrapProcessingMode())
		return;

2604
	/*
B
Bruce Momjian 已提交
2605
	 * If we didn't get the critical system indexes loaded into relcache, do
2606 2607
	 * so now.	These are critical because the catcache and/or opclass cache
	 * depend on them for fetches done during relcache load.  Thus, we have an
B
Bruce Momjian 已提交
2608 2609 2610 2611 2612 2613
	 * infinite-recursion problem.	We can break the recursion by doing
	 * heapscans instead of indexscans at certain key spots. To avoid hobbling
	 * performance, we only want to do that until we have the critical indexes
	 * loaded into relcache.  Thus, the flag criticalRelcachesBuilt is used to
	 * decide whether to do heapscan or indexscan at the key spots, and we set
	 * it true after we've loaded the critical indexes.
2614
	 *
B
Bruce Momjian 已提交
2615 2616 2617 2618 2619 2620
	 * The critical indexes are marked as "nailed in cache", partly to make it
	 * easy for load_relcache_init_file to count them, but mainly because we
	 * cannot flush and rebuild them once we've set criticalRelcachesBuilt to
	 * true.  (NOTE: perhaps it would be possible to reload them by
	 * temporarily setting criticalRelcachesBuilt to false again.  For now,
	 * though, we just nail 'em in.)
2621 2622 2623 2624
	 *
	 * RewriteRelRulenameIndexId and TriggerRelidNameIndexId are not critical
	 * in the same way as the others, because the critical catalogs don't
	 * (currently) have any rules or triggers, and so these indexes can be
B
Bruce Momjian 已提交
2625
	 * rebuilt without inducing recursion.	However they are used during
2626 2627
	 * relcache load when a rel does have rules or triggers, so we choose to
	 * nail them for performance reasons.
2628
	 */
B
Bruce Momjian 已提交
2629
	if (!criticalRelcachesBuilt)
2630
	{
2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648
		load_critical_index(ClassOidIndexId,
							RelationRelationId);
		load_critical_index(AttributeRelidNumIndexId,
							AttributeRelationId);
		load_critical_index(IndexRelidIndexId,
							IndexRelationId);
		load_critical_index(OpclassOidIndexId,
							OperatorClassRelationId);
		load_critical_index(AccessMethodStrategyIndexId,
							AccessMethodOperatorRelationId);
		load_critical_index(AccessMethodProcedureIndexId,
							AccessMethodProcedureRelationId);
		load_critical_index(OperatorOidIndexId,
							OperatorRelationId);
		load_critical_index(RewriteRelRulenameIndexId,
							RewriteRelationId);
		load_critical_index(TriggerRelidNameIndexId,
							TriggerRelationId);
2649

2650
#define NUM_CRITICAL_INDEXES	9		/* fix if you change list above */
2651 2652 2653 2654 2655

		criticalRelcachesBuilt = true;
	}

	/*
B
Bruce Momjian 已提交
2656 2657 2658 2659 2660 2661
	 * Now, scan all the relcache entries and update anything that might be
	 * wrong in the results from formrdesc or the relcache cache file. If we
	 * faked up relcache entries using formrdesc, then read the real pg_class
	 * rows and replace the fake entries with them. Also, if any of the
	 * relcache entries have rules or triggers, load that info the hard way
	 * since it isn't recorded in the cache file.
2662 2663 2664 2665 2666 2667 2668 2669
	 *
	 * Whenever we access the catalogs to read data, there is a possibility
	 * of a shared-inval cache flush causing relcache entries to be removed.
	 * Since hash_seq_search only guarantees to still work after the *current*
	 * entry is removed, it's unsafe to continue the hashtable scan afterward.
	 * We handle this by restarting the scan from scratch after each access.
	 * This is theoretically O(N^2), but the number of entries that actually
	 * need to be fixed is small enough that it doesn't matter.
2670
	 */
2671
	hash_seq_init(&status, RelationIdCache);
2672

2673
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2674
	{
2675
		Relation	relation = idhentry->reldesc;
2676 2677 2678 2679 2680 2681
		bool		restart = false;

		/*
		 * Make sure *this* entry doesn't get flushed while we work with it.
		 */
		RelationIncrementReferenceCount(relation);
2682

2683
		/*
2684
		 * If it's a faked-up entry, read the real pg_class tuple.
2685
		 */
2686
		if (relation->rd_rel->relowner == InvalidOid)
2687 2688 2689
		{
			HeapTuple	htup;
			Form_pg_class relp;
B
Bruce Momjian 已提交
2690

2691
			htup = SearchSysCache(RELOID,
B
Bruce Momjian 已提交
2692
								ObjectIdGetDatum(RelationGetRelid(relation)),
2693 2694
								  0, 0, 0);
			if (!HeapTupleIsValid(htup))
2695 2696
				elog(FATAL, "cache lookup failed for relation %u",
					 RelationGetRelid(relation));
2697
			relp = (Form_pg_class) GETSTRUCT(htup);
B
Bruce Momjian 已提交
2698

2699 2700 2701 2702 2703
			/*
			 * Copy tuple to relation->rd_rel. (See notes in
			 * AllocateRelationDesc())
			 */
			memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
2704

2705 2706 2707 2708 2709
			/* Update rd_options while we have the tuple */
			if (relation->rd_options)
				pfree(relation->rd_options);
			RelationParseRelOptions(relation, htup);

2710
			/*
2711 2712 2713 2714
			 * Check the values in rd_att were set up correctly.  (We cannot
			 * just copy them over now: formrdesc must have set up the
			 * rd_att data correctly to start with, because it may already
			 * have been copied into one or more catcache entries.)
2715
			 */
2716 2717 2718
			Assert(relation->rd_att->tdtypeid == relp->reltype);
			Assert(relation->rd_att->tdtypmod == -1);
			Assert(relation->rd_att->tdhasoid == relp->relhasoids);
2719

2720
			ReleaseSysCache(htup);
2721 2722 2723 2724 2725 2726 2727

			/* relowner had better be OK now, else we'll loop forever */
			if (relation->rd_rel->relowner == InvalidOid)
				elog(ERROR, "invalid relowner in pg_class entry for \"%s\"",
					 RelationGetRelationName(relation));

			restart = true;
2728 2729 2730 2731
		}

		/*
		 * Fix data that isn't saved in relcache cache file.
2732 2733 2734 2735 2736
		 *
		 * relhasrules or reltriggers could possibly be wrong or out of
		 * date.  If we don't actually find any rules or triggers, clear the
		 * local copy of the flag so that we don't get into an infinite loop
		 * here.  We don't make any attempt to fix the pg_class entry, though.
2737 2738
		 */
		if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
2739
		{
2740
			RelationBuildRuleLock(relation);
2741 2742 2743 2744
			if (relation->rd_rules == NULL)
				relation->rd_rel->relhasrules = false;
			restart = true;
		}
2745
		if (relation->rd_rel->reltriggers > 0 && relation->trigdesc == NULL)
2746
		{
2747
			RelationBuildTriggers(relation);
2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761
			if (relation->trigdesc == NULL)
				relation->rd_rel->reltriggers = 0;
			restart = true;
		}

		/* Release hold on the relation */
		RelationDecrementReferenceCount(relation);

		/* Now, restart the hashtable scan if needed */
		if (restart)
		{
			hash_seq_term(&status);
			hash_seq_init(&status, RelationIdCache);
		}
2762
	}
2763

2764 2765 2766
	/*
	 * Lastly, write out a new relcache cache file if one is needed.
	 */
2767 2768 2769
	if (needNewCacheFile)
	{
		/*
B
Bruce Momjian 已提交
2770 2771 2772 2773
		 * Force all the catcaches to finish initializing and thereby open the
		 * catalogs and indexes they use.  This will preload the relcache with
		 * entries for all the most important system catalogs and indexes, so
		 * that the init file will be most useful for future backends.
2774 2775 2776 2777 2778 2779 2780 2781
		 */
		InitCatalogCachePhase2();

		/* now write the file */
		write_relcache_init_file();
	}
}

2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809
/*
 * Load one critical system index into the relcache
 *
 * indexoid is the OID of the target index, heapoid is the OID of the catalog
 * it belongs to.
 */
static void
load_critical_index(Oid indexoid, Oid heapoid)
{
	Relation	ird;

	/*
	 * We must lock the underlying catalog before locking the index to avoid
	 * deadlock, since RelationBuildDesc might well need to read the catalog,
	 * and if anyone else is exclusive-locking this catalog and index they'll
	 * be doing it in that order.
	 */
	LockRelationOid(heapoid, AccessShareLock);
	LockRelationOid(indexoid, AccessShareLock);
	ird = RelationBuildDesc(indexoid, true);
	if (ird == NULL)
		elog(PANIC, "could not open critical system index %u", indexoid);
	ird->rd_isnailed = true;
	ird->rd_refcnt = 1;
	UnlockRelationOid(indexoid, AccessShareLock);
	UnlockRelationOid(heapoid, AccessShareLock);
}

2810
/*
2811
 * GetPgClassDescriptor -- get a predefined tuple descriptor for pg_class
2812 2813 2814
 * GetPgIndexDescriptor -- get a predefined tuple descriptor for pg_index
 *
 * We need this kluge because we have to be able to access non-fixed-width
2815 2816 2817 2818 2819 2820
 * fields of pg_class and pg_index before we have the standard catalog caches
 * available.  We use predefined data that's set up in just the same way as
 * the bootstrapped reldescs used by formrdesc().  The resulting tupdesc is
 * not 100% kosher: it does not have the correct rowtype OID in tdtypeid, nor
 * does it have a TupleConstr field.  But it's good enough for the purpose of
 * extracting fields.
2821 2822
 */
static TupleDesc
2823
BuildHardcodedDescriptor(int natts, Form_pg_attribute attrs, bool hasoids)
2824
{
2825
	TupleDesc	result;
2826 2827 2828 2829 2830
	MemoryContext oldcxt;
	int			i;

	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

2831
	result = CreateTemplateTupleDesc(natts, hasoids);
B
Bruce Momjian 已提交
2832
	result->tdtypeid = RECORDOID;		/* not right, but we don't care */
2833
	result->tdtypmod = -1;
2834

2835
	for (i = 0; i < natts; i++)
2836
	{
2837
		memcpy(result->attrs[i], &attrs[i], ATTRIBUTE_TUPLE_SIZE);
2838
		/* make sure attcacheoff is valid */
2839
		result->attrs[i]->attcacheoff = -1;
2840 2841 2842
	}

	/* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
2843
	result->attrs[0]->attcacheoff = 0;
2844 2845 2846 2847 2848

	/* Note: we don't bother to set up a TupleConstr entry */

	MemoryContextSwitchTo(oldcxt);

2849 2850 2851 2852 2853 2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875 2876
	return result;
}

static TupleDesc
GetPgClassDescriptor(void)
{
	static TupleDesc pgclassdesc = NULL;

	/* Already done? */
	if (pgclassdesc == NULL)
		pgclassdesc = BuildHardcodedDescriptor(Natts_pg_class,
											   Desc_pg_class,
											   true);

	return pgclassdesc;
}

static TupleDesc
GetPgIndexDescriptor(void)
{
	static TupleDesc pgindexdesc = NULL;

	/* Already done? */
	if (pgindexdesc == NULL)
		pgindexdesc = BuildHardcodedDescriptor(Natts_pg_index,
											   Desc_pg_index,
											   false);

2877 2878 2879
	return pgindexdesc;
}

2880
static void
2881
AttrDefaultFetch(Relation relation)
2882
{
2883 2884 2885
	AttrDefault *attrdef = relation->rd_att->constr->defval;
	int			ndef = relation->rd_att->constr->num_defval;
	Relation	adrel;
2886
	SysScanDesc adscan;
2887
	ScanKeyData skey;
H
Hiroshi Inoue 已提交
2888
	HeapTuple	htup;
2889
	Datum		val;
2890 2891 2892
	bool		isnull;
	int			found;
	int			i;
2893

2894 2895 2896 2897
	ScanKeyInit(&skey,
				Anum_pg_attrdef_adrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
2898

2899 2900
	adrel = heap_open(AttrDefaultRelationId, AccessShareLock);
	adscan = systable_beginscan(adrel, AttrDefaultIndexId, true,
2901
								SnapshotNow, 1, &skey);
2902
	found = 0;
2903

2904
	while (HeapTupleIsValid(htup = systable_getnext(adscan)))
2905
	{
2906
		Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
2907

2908 2909 2910 2911
		for (i = 0; i < ndef; i++)
		{
			if (adform->adnum != attrdef[i].adnum)
				continue;
2912
			if (attrdef[i].adbin != NULL)
2913
				elog(WARNING, "multiple attrdef records found for attr %s of rel %s",
B
Bruce Momjian 已提交
2914
				NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
2915
					 RelationGetRelationName(relation));
2916 2917
			else
				found++;
2918

2919 2920 2921
			val = fastgetattr(htup,
							  Anum_pg_attrdef_adbin,
							  adrel->rd_att, &isnull);
2922
			if (isnull)
2923
				elog(WARNING, "null adbin for attr %s of rel %s",
B
Bruce Momjian 已提交
2924
				NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
2925
					 RelationGetRelationName(relation));
2926 2927
			else
				attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext,
B
Bruce Momjian 已提交
2928 2929
								 DatumGetCString(DirectFunctionCall1(textout,
																	 val)));
2930 2931
			break;
		}
2932

2933
		if (i >= ndef)
2934 2935
			elog(WARNING, "unexpected attrdef record found for attr %d of rel %s",
				 adform->adnum, RelationGetRelationName(relation));
2936 2937
	}

2938
	systable_endscan(adscan);
2939
	heap_close(adrel, AccessShareLock);
2940 2941

	if (found != ndef)
2942
		elog(WARNING, "%d attrdef record(s) missing for rel %s",
2943
			 ndef - found, RelationGetRelationName(relation));
2944 2945
}

2946
static void
2947
CheckConstraintFetch(Relation relation)
2948
{
2949 2950
	ConstrCheck *check = relation->rd_att->constr->check;
	int			ncheck = relation->rd_att->constr->num_check;
2951 2952 2953
	Relation	conrel;
	SysScanDesc conscan;
	ScanKeyData skey[1];
H
Hiroshi Inoue 已提交
2954
	HeapTuple	htup;
2955
	Datum		val;
2956
	bool		isnull;
2957
	int			found = 0;
2958

2959 2960 2961 2962
	ScanKeyInit(&skey[0],
				Anum_pg_constraint_conrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
2963

2964 2965
	conrel = heap_open(ConstraintRelationId, AccessShareLock);
	conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true,
2966
								 SnapshotNow, 1, skey);
2967

2968
	while (HeapTupleIsValid(htup = systable_getnext(conscan)))
2969
	{
2970 2971 2972 2973 2974 2975
		Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);

		/* We want check constraints only */
		if (conform->contype != CONSTRAINT_CHECK)
			continue;

2976 2977
		if (found >= ncheck)
			elog(ERROR, "unexpected constraint record found for rel %s",
2978
				 RelationGetRelationName(relation));
2979

2980
		check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
B
Bruce Momjian 已提交
2981
												  NameStr(conform->conname));
2982 2983

		/* Grab and test conbin is actually set */
2984
		val = fastgetattr(htup,
2985 2986
						  Anum_pg_constraint_conbin,
						  conrel->rd_att, &isnull);
2987
		if (isnull)
2988
			elog(ERROR, "null conbin for rel %s",
2989
				 RelationGetRelationName(relation));
2990

2991
		check[found].ccbin = MemoryContextStrdup(CacheMemoryContext,
B
Bruce Momjian 已提交
2992 2993
								 DatumGetCString(DirectFunctionCall1(textout,
																	 val)));
2994 2995 2996
		found++;
	}

2997 2998
	systable_endscan(conscan);
	heap_close(conrel, AccessShareLock);
2999 3000

	if (found != ncheck)
3001
		elog(ERROR, "%d constraint record(s) missing for rel %s",
3002
			 ncheck - found, RelationGetRelationName(relation));
3003 3004
}

3005 3006 3007 3008 3009 3010
/*
 * RelationGetIndexList -- get a list of OIDs of indexes on this relation
 *
 * The index list is created only if someone requests it.  We scan pg_index
 * to find relevant indexes, and add the list to the relcache entry so that
 * we won't have to compute it again.  Note that shared cache inval of a
3011
 * relcache entry will delete the old list and set rd_indexvalid to 0,
3012 3013 3014
 * so that we must recompute the index list on next request.  This handles
 * creation or deletion of an index.
 *
3015 3016 3017 3018 3019 3020
 * The returned list is guaranteed to be sorted in order by OID.  This is
 * needed by the executor, since for index types that we obtain exclusive
 * locks on when updating the index, all backends must lock the indexes in
 * the same order or we will get deadlocks (see ExecOpenIndices()).  Any
 * consistent ordering would do, but ordering by OID is easy.
 *
3021 3022
 * Since shared cache inval causes the relcache's copy of the list to go away,
 * we return a copy of the list palloc'd in the caller's context.  The caller
B
Bruce Momjian 已提交
3023
 * may list_free() the returned list after scanning it. This is necessary
3024 3025
 * since the caller will typically be doing syscache lookups on the relevant
 * indexes, and syscache lookup could cause SI messages to be processed!
3026 3027 3028 3029 3030
 *
 * We also update rd_oidindex, which this module treats as effectively part
 * of the index list.  rd_oidindex is valid when rd_indexvalid isn't zero;
 * it is the pg_class OID of a unique index on OID when the relation has one,
 * and InvalidOid if there is no such index.
3031 3032 3033 3034 3035
 */
List *
RelationGetIndexList(Relation relation)
{
	Relation	indrel;
B
Bruce Momjian 已提交
3036
	SysScanDesc indscan;
3037
	ScanKeyData skey;
3038
	HeapTuple	htup;
3039
	List	   *result;
3040
	Oid			oidIndex;
3041 3042 3043
	MemoryContext oldcxt;

	/* Quick exit if we already computed the list. */
3044
	if (relation->rd_indexvalid != 0)
3045
		return list_copy(relation->rd_indexlist);
3046 3047

	/*
B
Bruce Momjian 已提交
3048 3049 3050 3051
	 * We build the list we intend to return (in the caller's context) while
	 * doing the scan.	After successfully completing the scan, we copy that
	 * list into the relcache entry.  This avoids cache-context memory leakage
	 * if we get some sort of error partway through.
3052 3053
	 */
	result = NIL;
3054
	oidIndex = InvalidOid;
B
Bruce Momjian 已提交
3055

3056
	/* Prepare to scan pg_index for entries having indrelid = this rel. */
3057 3058 3059 3060
	ScanKeyInit(&skey,
				Anum_pg_index_indrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
3061

3062 3063
	indrel = heap_open(IndexRelationId, AccessShareLock);
	indscan = systable_beginscan(indrel, IndexIndrelidIndexId, true,
3064
								 SnapshotNow, 1, &skey);
3065

3066 3067 3068
	while (HeapTupleIsValid(htup = systable_getnext(indscan)))
	{
		Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
3069

3070
		/* Add index's OID to result list in the proper order */
3071
		result = insert_ordered_oid(result, index->indexrelid);
3072 3073 3074 3075 3076 3077 3078 3079

		/* Check to see if it is a unique, non-partial btree index on OID */
		if (index->indnatts == 1 &&
			index->indisunique &&
			index->indkey.values[0] == ObjectIdAttributeNumber &&
			index->indclass.values[0] == OID_BTREE_OPS_OID &&
			heap_attisnull(htup, Anum_pg_index_indpred))
			oidIndex = index->indexrelid;
3080 3081
	}

3082
	systable_endscan(indscan);
3083 3084
	heap_close(indrel, AccessShareLock);

3085
	/* Now save a copy of the completed list in the relcache entry. */
3086
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3087
	relation->rd_indexlist = list_copy(result);
3088
	relation->rd_oidindex = oidIndex;
3089
	relation->rd_indexvalid = 1;
3090 3091 3092 3093 3094
	MemoryContextSwitchTo(oldcxt);

	return result;
}

3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106
/*
 * insert_ordered_oid
 *		Insert a new Oid into a sorted list of Oids, preserving ordering
 *
 * Building the ordered list this way is O(N^2), but with a pretty small
 * constant, so for the number of entries we expect it will probably be
 * faster than trying to apply qsort().  Most tables don't have very many
 * indexes...
 */
static List *
insert_ordered_oid(List *list, Oid datum)
{
B
Bruce Momjian 已提交
3107
	ListCell   *prev;
3108 3109

	/* Does the datum belong at the front? */
3110 3111
	if (list == NIL || datum < linitial_oid(list))
		return lcons_oid(datum, list);
3112
	/* No, so find the entry it belongs after */
3113
	prev = list_head(list);
3114 3115
	for (;;)
	{
B
Bruce Momjian 已提交
3116
		ListCell   *curr = lnext(prev);
3117

3118
		if (curr == NULL || datum < lfirst_oid(curr))
B
Bruce Momjian 已提交
3119
			break;				/* it belongs after 'prev', before 'curr' */
3120 3121

		prev = curr;
3122
	}
3123 3124
	/* Insert datum into list after 'prev' */
	lappend_cell_oid(list, prev, datum);
3125 3126 3127
	return list;
}

3128 3129 3130 3131
/*
 * RelationSetIndexList -- externally force the index list contents
 *
 * This is used to temporarily override what we think the set of valid
3132 3133
 * indexes is (including the presence or absence of an OID index).
 * The forcing will be valid only until transaction commit or abort.
3134 3135 3136 3137 3138 3139
 *
 * This should only be applied to nailed relations, because in a non-nailed
 * relation the hacked index list could be lost at any time due to SI
 * messages.  In practice it is only used on pg_class (see REINDEX).
 *
 * It is up to the caller to make sure the given list is correctly ordered.
3140 3141 3142 3143 3144 3145 3146
 *
 * We deliberately do not change rd_indexattr here: even when operating
 * with a temporary partial index list, HOT-update decisions must be made
 * correctly with respect to the full index set.  It is up to the caller
 * to ensure that a correct rd_indexattr set has been cached before first
 * calling RelationSetIndexList; else a subsequent inquiry might cause a
 * wrong rd_indexattr set to get computed and cached.
3147 3148
 */
void
3149
RelationSetIndexList(Relation relation, List *indexIds, Oid oidIndex)
3150 3151 3152
{
	MemoryContext oldcxt;

3153
	Assert(relation->rd_isnailed);
3154 3155
	/* Copy the list into the cache context (could fail for lack of mem) */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3156
	indexIds = list_copy(indexIds);
3157 3158
	MemoryContextSwitchTo(oldcxt);
	/* Okay to replace old list */
3159
	list_free(relation->rd_indexlist);
3160
	relation->rd_indexlist = indexIds;
3161
	relation->rd_oidindex = oidIndex;
B
Bruce Momjian 已提交
3162
	relation->rd_indexvalid = 2;	/* mark list as forced */
3163
	/* must flag that we have a forced index list */
3164
	need_eoxact_work = true;
3165 3166
}

3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177
/*
 * RelationGetOidIndex -- get the pg_class OID of the relation's OID index
 *
 * Returns InvalidOid if there is no such index.
 */
Oid
RelationGetOidIndex(Relation relation)
{
	List	   *ilist;

	/*
B
Bruce Momjian 已提交
3178 3179 3180
	 * If relation doesn't have OIDs at all, caller is probably confused. (We
	 * could just silently return InvalidOid, but it seems better to throw an
	 * assertion.)
3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194
	 */
	Assert(relation->rd_rel->relhasoids);

	if (relation->rd_indexvalid == 0)
	{
		/* RelationGetIndexList does the heavy lifting. */
		ilist = RelationGetIndexList(relation);
		list_free(ilist);
		Assert(relation->rd_indexvalid != 0);
	}

	return relation->rd_oidindex;
}

3195 3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222
/*
 * RelationGetIndexExpressions -- get the index expressions for an index
 *
 * We cache the result of transforming pg_index.indexprs into a node tree.
 * If the rel is not an index or has no expressional columns, we return NIL.
 * Otherwise, the returned tree is copied into the caller's memory context.
 * (We don't want to return a pointer to the relcache copy, since it could
 * disappear due to relcache invalidation.)
 */
List *
RelationGetIndexExpressions(Relation relation)
{
	List	   *result;
	Datum		exprsDatum;
	bool		isnull;
	char	   *exprsString;
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_indexprs)
		return (List *) copyObject(relation->rd_indexprs);

	/* Quick exit if there is nothing to do. */
	if (relation->rd_indextuple == NULL ||
		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs))
		return NIL;

	/*
B
Bruce Momjian 已提交
3223 3224 3225
	 * We build the tree we intend to return in the caller's context. After
	 * successfully completing the work, we copy it into the relcache entry.
	 * This avoids problems if we get some sort of error partway through.
3226 3227 3228 3229 3230
	 */
	exprsDatum = heap_getattr(relation->rd_indextuple,
							  Anum_pg_index_indexprs,
							  GetPgIndexDescriptor(),
							  &isnull);
3231 3232 3233 3234 3235 3236
	Assert(!isnull);
	exprsString = DatumGetCString(DirectFunctionCall1(textout, exprsDatum));
	result = (List *) stringToNode(exprsString);
	pfree(exprsString);

	/*
3237 3238 3239 3240
	 * Run the expressions through eval_const_expressions. This is not just an
	 * optimization, but is necessary, because the planner will be comparing
	 * them to similarly-processed qual clauses, and may fail to detect valid
	 * matches without this.  We don't bother with canonicalize_qual, however.
3241
	 */
3242
	result = (List *) eval_const_expressions(NULL, (Node *) result);
3243

3244 3245 3246 3247 3248 3249
	/*
	 * Also mark any coercion format fields as "don't care", so that the
	 * planner can match to both explicit and implicit coercions.
	 */
	set_coercionform_dontcare((Node *) result);

3250 3251 3252 3253
	/* May as well fix opfuncids too */
	fix_opfuncids((Node *) result);

	/* Now save a copy of the completed tree in the relcache entry. */
3254
	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
3255 3256 3257 3258 3259 3260 3261 3262 3263
	relation->rd_indexprs = (List *) copyObject(result);
	MemoryContextSwitchTo(oldcxt);

	return result;
}

/*
 * RelationGetIndexPredicate -- get the index predicate for an index
 *
3264 3265
 * We cache the result of transforming pg_index.indpred into an implicit-AND
 * node tree (suitable for ExecQual).
3266 3267 3268 3269 3270 3271 3272 3273 3274 3275 3276 3277 3278 3279 3280 3281 3282 3283 3284 3285 3286 3287 3288 3289
 * If the rel is not an index or has no predicate, we return NIL.
 * Otherwise, the returned tree is copied into the caller's memory context.
 * (We don't want to return a pointer to the relcache copy, since it could
 * disappear due to relcache invalidation.)
 */
List *
RelationGetIndexPredicate(Relation relation)
{
	List	   *result;
	Datum		predDatum;
	bool		isnull;
	char	   *predString;
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_indpred)
		return (List *) copyObject(relation->rd_indpred);

	/* Quick exit if there is nothing to do. */
	if (relation->rd_indextuple == NULL ||
		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indpred))
		return NIL;

	/*
B
Bruce Momjian 已提交
3290 3291 3292
	 * We build the tree we intend to return in the caller's context. After
	 * successfully completing the work, we copy it into the relcache entry.
	 * This avoids problems if we get some sort of error partway through.
3293 3294 3295 3296 3297
	 */
	predDatum = heap_getattr(relation->rd_indextuple,
							 Anum_pg_index_indpred,
							 GetPgIndexDescriptor(),
							 &isnull);
3298 3299 3300 3301 3302 3303
	Assert(!isnull);
	predString = DatumGetCString(DirectFunctionCall1(textout, predDatum));
	result = (List *) stringToNode(predString);
	pfree(predString);

	/*
3304 3305 3306 3307 3308
	 * Run the expression through const-simplification and canonicalization.
	 * This is not just an optimization, but is necessary, because the planner
	 * will be comparing it to similarly-processed qual clauses, and may fail
	 * to detect valid matches without this.  This must match the processing
	 * done to qual clauses in preprocess_expression()!  (We can skip the
B
Bruce Momjian 已提交
3309 3310
	 * stuff involving subqueries, however, since we don't allow any in index
	 * predicates.)
3311
	 */
3312
	result = (List *) eval_const_expressions(NULL, (Node *) result);
3313

3314 3315
	result = (List *) canonicalize_qual((Expr *) result);

3316 3317 3318 3319 3320 3321
	/*
	 * Also mark any coercion format fields as "don't care", so that the
	 * planner can match to both explicit and implicit coercions.
	 */
	set_coercionform_dontcare((Node *) result);

3322 3323 3324
	/* Also convert to implicit-AND format */
	result = make_ands_implicit((Expr *) result);

3325 3326 3327 3328
	/* May as well fix opfuncids too */
	fix_opfuncids((Node *) result);

	/* Now save a copy of the completed tree in the relcache entry. */
3329
	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
3330 3331 3332 3333 3334 3335
	relation->rd_indpred = (List *) copyObject(result);
	MemoryContextSwitchTo(oldcxt);

	return result;
}

3336 3337 3338 3339 3340 3341 3342 3343 3344 3345 3346 3347 3348 3349 3350 3351 3352
/*
 * RelationGetIndexAttrBitmap -- get a bitmap of index attribute numbers
 *
 * The result has a bit set for each attribute used anywhere in the index
 * definitions of all the indexes on this relation.  (This includes not only
 * simple index keys, but attributes used in expressions and partial-index
 * predicates.)
 *
 * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
 * we can include system attributes (e.g., OID) in the bitmap representation.
 *
 * The returned result is palloc'd in the caller's memory context and should
 * be bms_free'd when not needed anymore.
 */
Bitmapset *
RelationGetIndexAttrBitmap(Relation relation)
{
B
Bruce Momjian 已提交
3353 3354 3355
	Bitmapset  *indexattrs;
	List	   *indexoidlist;
	ListCell   *l;
3356 3357 3358 3359 3360 3361 3362 3363 3364 3365 3366 3367 3368 3369 3370 3371 3372 3373 3374 3375 3376 3377 3378 3379 3380 3381 3382 3383
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_indexattr != NULL)
		return bms_copy(relation->rd_indexattr);

	/* Fast path if definitely no indexes */
	if (!RelationGetForm(relation)->relhasindex)
		return NULL;

	/*
	 * Get cached list of index OIDs
	 */
	indexoidlist = RelationGetIndexList(relation);

	/* Fall out if no indexes (but relhasindex was set) */
	if (indexoidlist == NIL)
		return NULL;

	/*
	 * For each index, add referenced attributes to indexattrs.
	 */
	indexattrs = NULL;
	foreach(l, indexoidlist)
	{
		Oid			indexOid = lfirst_oid(l);
		Relation	indexDesc;
		IndexInfo  *indexInfo;
B
Bruce Momjian 已提交
3384
		int			i;
3385 3386 3387 3388 3389 3390 3391 3392 3393

		indexDesc = index_open(indexOid, AccessShareLock);

		/* Extract index key information from the index's pg_index row */
		indexInfo = BuildIndexInfo(indexDesc);

		/* Collect simple attribute references */
		for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
		{
B
Bruce Momjian 已提交
3394
			int			attrnum = indexInfo->ii_KeyAttrNumbers[i];
3395 3396 3397

			if (attrnum != 0)
				indexattrs = bms_add_member(indexattrs,
B
Bruce Momjian 已提交
3398
							   attrnum - FirstLowInvalidHeapAttributeNumber);
3399 3400 3401 3402 3403 3404 3405 3406 3407 3408 3409 3410 3411 3412 3413 3414 3415 3416 3417 3418 3419 3420
		}

		/* Collect all attributes used in expressions, too */
		pull_varattnos((Node *) indexInfo->ii_Expressions, &indexattrs);

		/* Collect all attributes in the index predicate, too */
		pull_varattnos((Node *) indexInfo->ii_Predicate, &indexattrs);

		index_close(indexDesc, AccessShareLock);
	}

	list_free(indexoidlist);

	/* Now save a copy of the bitmap in the relcache entry. */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_indexattr = bms_copy(indexattrs);
	MemoryContextSwitchTo(oldcxt);

	/* We return our original working copy for caller to play with */
	return indexattrs;
}

3421

3422
/*
3423
 *	load_relcache_init_file, write_relcache_init_file
3424
 *
3425 3426 3427
 *		In late 1992, we started regularly having databases with more than
 *		a thousand classes in them.  With this number of classes, it became
 *		critical to do indexed lookups on the system catalogs.
3428
 *
3429 3430 3431 3432
 *		Bootstrapping these lookups is very hard.  We want to be able to
 *		use an index on pg_attribute, for example, but in order to do so,
 *		we must have read pg_attribute for the attributes in the index,
 *		which implies that we need to use the index.
3433
 *
3434
 *		In order to get around the problem, we do the following:
3435
 *
3436
 *		   +  When the database system is initialized (at initdb time), we
3437
 *			  don't use indexes.  We do sequential scans.
3438
 *
3439 3440 3441
 *		   +  When the backend is started up in normal mode, we load an image
 *			  of the appropriate relation descriptors, in internal format,
 *			  from an initialization file in the data/base/... directory.
3442
 *
3443
 *		   +  If the initialization file isn't there, then we create the
3444
 *			  relation descriptors using sequential scans and write 'em to
3445
 *			  the initialization file for use by subsequent backends.
3446
 *
3447 3448
 *		We could dispense with the initialization file and just build the
 *		critical reldescs the hard way on every backend startup, but that
3449 3450 3451 3452 3453 3454 3455
 *		slows down backend startup noticeably.
 *
 *		We can in fact go further, and save more relcache entries than
 *		just the ones that are absolutely critical; this allows us to speed
 *		up backend startup by not having to build such entries the hard way.
 *		Presently, all the catalog and index entries that are referred to
 *		by catcaches are stored in the initialization file.
3456
 *
T
Tom Lane 已提交
3457 3458 3459 3460
 *		The same mechanism that detects when catcache and relcache entries
 *		need to be invalidated (due to catalog updates) also arranges to
 *		unlink the initialization file when its contents may be out of date.
 *		The file will then be rebuilt during the next backend startup.
3461 3462
 */

3463 3464 3465 3466
/*
 * load_relcache_init_file -- attempt to load cache from the init file
 *
 * If successful, return TRUE and set criticalRelcachesBuilt to true.
3467
 * If not successful, return FALSE.
3468 3469 3470 3471 3472
 *
 * NOTE: we assume we are already switched into CacheMemoryContext.
 */
static bool
load_relcache_init_file(void)
3473
{
3474 3475 3476 3477 3478 3479 3480
	FILE	   *fp;
	char		initfilename[MAXPGPATH];
	Relation   *rels;
	int			relno,
				num_rels,
				max_rels,
				nailed_rels,
3481 3482
				nailed_indexes,
				magic;
3483
	int			i;
3484

3485 3486 3487 3488 3489 3490
	snprintf(initfilename, sizeof(initfilename), "%s/%s",
			 DatabasePath, RELCACHE_INIT_FILENAME);

	fp = AllocateFile(initfilename, PG_BINARY_R);
	if (fp == NULL)
		return false;
3491

3492
	/*
B
Bruce Momjian 已提交
3493 3494 3495
	 * Read the index relcache entries from the file.  Note we will not enter
	 * any of them into the cache if the read fails partway through; this
	 * helps to guard against broken init files.
3496 3497 3498 3499 3500 3501 3502
	 */
	max_rels = 100;
	rels = (Relation *) palloc(max_rels * sizeof(Relation));
	num_rels = 0;
	nailed_rels = nailed_indexes = 0;
	initFileRelationIds = NIL;

3503 3504 3505 3506 3507 3508
	/* check for correct magic number (compatible version) */
	if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
		goto read_failed;
	if (magic != RELCACHE_INIT_FILEMAGIC)
		goto read_failed;

B
Bruce Momjian 已提交
3509
	for (relno = 0;; relno++)
3510
	{
3511 3512 3513 3514
		Size		len;
		size_t		nread;
		Relation	rel;
		Form_pg_class relform;
3515
		bool		has_not_null;
3516

3517
		/* first read the relation descriptor length */
3518 3519 3520 3521
		if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
		{
			if (nread == 0)
				break;			/* end of file */
3522
			goto read_failed;
3523
		}
3524

3525 3526
		/* safety check for incompatible relcache layout */
		if (len != sizeof(RelationData))
3527
			goto read_failed;
3528

3529 3530 3531 3532 3533 3534
		/* allocate another relcache header */
		if (num_rels >= max_rels)
		{
			max_rels *= 2;
			rels = (Relation *) repalloc(rels, max_rels * sizeof(Relation));
		}
3535

3536
		rel = rels[num_rels++] = (Relation) palloc(len);
3537

3538 3539
		/* then, read the Relation structure */
		if ((nread = fread(rel, 1, len, fp)) != len)
3540
			goto read_failed;
3541 3542

		/* next read the relation tuple form */
3543
		if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
3544
			goto read_failed;
3545 3546

		relform = (Form_pg_class) palloc(len);
3547
		if ((nread = fread(relform, 1, len, fp)) != len)
3548
			goto read_failed;
3549

3550
		rel->rd_rel = relform;
3551 3552

		/* initialize attribute tuple forms */
3553 3554
		rel->rd_att = CreateTemplateTupleDesc(relform->relnatts,
											  relform->relhasoids);
3555 3556
		rel->rd_att->tdrefcount = 1;	/* mark as refcounted */

3557
		rel->rd_att->tdtypeid = relform->reltype;
B
Bruce Momjian 已提交
3558
		rel->rd_att->tdtypmod = -1;		/* unnecessary, but... */
3559 3560

		/* next read all the attribute tuple form data entries */
3561
		has_not_null = false;
3562 3563
		for (i = 0; i < relform->relnatts; i++)
		{
3564
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
3565
				goto read_failed;
3566 3567
			if (len != ATTRIBUTE_TUPLE_SIZE)
				goto read_failed;
3568
			if ((nread = fread(rel->rd_att->attrs[i], 1, len, fp)) != len)
3569
				goto read_failed;
3570 3571 3572 3573

			has_not_null |= rel->rd_att->attrs[i]->attnotnull;
		}

B
Bruce Momjian 已提交
3574 3575 3576 3577 3578 3579 3580 3581
		/* next read the access method specific field */
		if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
			goto read_failed;
		if (len > 0)
		{
			rel->rd_options = palloc(len);
			if ((nread = fread(rel->rd_options, 1, len, fp)) != len)
				goto read_failed;
3582
			if (len != VARSIZE(rel->rd_options))
B
Bruce Momjian 已提交
3583
				goto read_failed;		/* sanity check */
B
Bruce Momjian 已提交
3584 3585 3586 3587 3588 3589
		}
		else
		{
			rel->rd_options = NULL;
		}

3590 3591 3592 3593 3594 3595 3596
		/* mark not-null status */
		if (has_not_null)
		{
			TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

			constr->has_not_null = true;
			rel->rd_att->constr = constr;
3597 3598
		}

3599 3600 3601 3602 3603
		/* If it's an index, there's more to do */
		if (rel->rd_rel->relkind == RELKIND_INDEX)
		{
			Form_pg_am	am;
			MemoryContext indexcxt;
3604 3605
			Oid		   *opfamily;
			Oid		   *opcintype;
3606 3607
			Oid		   *operator;
			RegProcedure *support;
3608
			int			nsupport;
3609
			int16	   *indoption;
3610 3611 3612 3613 3614

			/* Count nailed indexes to ensure we have 'em all */
			if (rel->rd_isnailed)
				nailed_indexes++;

3615
			/* next, read the pg_index tuple */
3616 3617
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;
3618

3619 3620
			rel->rd_indextuple = (HeapTuple) palloc(len);
			if ((nread = fread(rel->rd_indextuple, 1, len, fp)) != len)
3621
				goto read_failed;
3622

3623 3624 3625 3626
			/* Fix up internal pointers in the tuple -- see heap_copytuple */
			rel->rd_indextuple->t_data = (HeapTupleHeader) ((char *) rel->rd_indextuple + HEAPTUPLESIZE);
			rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);

3627 3628 3629
			/* next, read the access method tuple form */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;
3630

3631 3632 3633 3634
			am = (Form_pg_am) palloc(len);
			if ((nread = fread(am, 1, len, fp)) != len)
				goto read_failed;
			rel->rd_am = am;
3635

3636 3637 3638 3639 3640 3641
			/*
			 * prepare index info context --- parameters should match
			 * RelationInitIndexAccessInfo
			 */
			indexcxt = AllocSetContextCreate(CacheMemoryContext,
											 RelationGetRelationName(rel),
3642 3643 3644
											 ALLOCSET_SMALL_MINSIZE,
											 ALLOCSET_SMALL_INITSIZE,
											 ALLOCSET_SMALL_MAXSIZE);
3645 3646
			rel->rd_indexcxt = indexcxt;

3647 3648 3649 3650 3651 3652 3653 3654 3655 3656 3657 3658 3659 3660 3661 3662 3663 3664 3665 3666
			/* next, read the vector of opfamily OIDs */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			opfamily = (Oid *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(opfamily, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_opfamily = opfamily;

			/* next, read the vector of opcintype OIDs */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			opcintype = (Oid *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(opcintype, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_opcintype = opcintype;

3667 3668 3669 3670 3671 3672 3673 3674 3675
			/* next, read the vector of operator OIDs */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			operator = (Oid *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(operator, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_operator = operator;
3676

3677
			/* next, read the vector of support procedures */
3678 3679 3680 3681 3682 3683 3684 3685
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;
			support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(support, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_support = support;

3686 3687 3688 3689 3690 3691 3692 3693 3694 3695
			/* finally, read the vector of indoption values */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			indoption = (int16 *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(indoption, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_indoption = indoption;

3696 3697 3698
			/* set up zeroed fmgr-info vectors */
			rel->rd_aminfo = (RelationAmInfo *)
				MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));
3699 3700
			nsupport = relform->relnatts * am->amsupport;
			rel->rd_supportinfo = (FmgrInfo *)
3701
				MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
3702 3703 3704 3705 3706 3707 3708 3709
		}
		else
		{
			/* Count nailed rels to ensure we have 'em all */
			if (rel->rd_isnailed)
				nailed_rels++;

			Assert(rel->rd_index == NULL);
3710
			Assert(rel->rd_indextuple == NULL);
3711 3712
			Assert(rel->rd_am == NULL);
			Assert(rel->rd_indexcxt == NULL);
3713
			Assert(rel->rd_aminfo == NULL);
3714 3715
			Assert(rel->rd_opfamily == NULL);
			Assert(rel->rd_opcintype == NULL);
3716 3717 3718
			Assert(rel->rd_operator == NULL);
			Assert(rel->rd_support == NULL);
			Assert(rel->rd_supportinfo == NULL);
3719
			Assert(rel->rd_indoption == NULL);
3720 3721 3722 3723
		}

		/*
		 * Rules and triggers are not saved (mainly because the internal
B
Bruce Momjian 已提交
3724 3725
		 * format is complex and subject to change).  They must be rebuilt if
		 * needed by RelationCacheInitializePhase2.  This is not expected to
3726 3727
		 * be a big performance hit since few system catalogs have such. Ditto
		 * for index expressions and predicates.
3728 3729 3730 3731
		 */
		rel->rd_rules = NULL;
		rel->rd_rulescxt = NULL;
		rel->trigdesc = NULL;
3732 3733
		rel->rd_indexprs = NIL;
		rel->rd_indpred = NIL;
3734 3735 3736 3737

		/*
		 * Reset transient-state fields in the relcache entry
		 */
3738
		rel->rd_smgr = NULL;
3739 3740
		rel->rd_targblock = InvalidBlockNumber;
		if (rel->rd_isnailed)
3741
			rel->rd_refcnt = 1;
3742
		else
3743
			rel->rd_refcnt = 0;
3744
		rel->rd_indexvalid = 0;
3745
		rel->rd_indexlist = NIL;
3746
		rel->rd_indexattr = NULL;
3747
		rel->rd_oidindex = InvalidOid;
3748
		rel->rd_createSubid = InvalidSubTransactionId;
3749
		rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3750
		rel->rd_amcache = NULL;
3751
		MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
3752

3753
		/*
3754
		 * Recompute lock and physical addressing info.  This is needed in
B
Bruce Momjian 已提交
3755 3756
		 * case the pg_internal.init file was copied from some other database
		 * by CREATE DATABASE.
3757 3758
		 */
		RelationInitLockInfo(rel);
3759
		RelationInitPhysicalAddr(rel);
3760 3761 3762
	}

	/*
B
Bruce Momjian 已提交
3763 3764 3765
	 * We reached the end of the init file without apparent problem. Did we
	 * get the right number of nailed items?  (This is a useful crosscheck in
	 * case the set of critical rels or indexes changes.)
3766 3767 3768 3769 3770 3771 3772 3773 3774 3775 3776 3777 3778 3779
	 */
	if (nailed_rels != NUM_CRITICAL_RELS ||
		nailed_indexes != NUM_CRITICAL_INDEXES)
		goto read_failed;

	/*
	 * OK, all appears well.
	 *
	 * Now insert all the new relcache entries into the cache.
	 */
	for (relno = 0; relno < num_rels; relno++)
	{
		RelationCacheInsert(rels[relno]);
		/* also make a list of their OIDs, for RelationIdIsInInitFile */
3780
		initFileRelationIds = lcons_oid(RelationGetRelid(rels[relno]),
B
Bruce Momjian 已提交
3781
										initFileRelationIds);
3782
	}
3783

3784 3785 3786
	pfree(rels);
	FreeFile(fp);

3787
	criticalRelcachesBuilt = true;
3788
	return true;
3789

3790
	/*
B
Bruce Momjian 已提交
3791 3792 3793
	 * init file is broken, so do it the hard way.	We don't bother trying to
	 * free the clutter we just allocated; it's not in the relcache so it
	 * won't hurt.
3794
	 */
3795
read_failed:
3796 3797 3798 3799
	pfree(rels);
	FreeFile(fp);

	return false;
3800 3801
}

3802 3803 3804 3805
/*
 * Write out a new initialization file with the current contents
 * of the relcache.
 */
3806
static void
3807
write_relcache_init_file(void)
3808
{
3809
	FILE	   *fp;
3810 3811
	char		tempfilename[MAXPGPATH];
	char		finalfilename[MAXPGPATH];
3812
	int			magic;
3813
	HASH_SEQ_STATUS status;
3814
	RelIdCacheEnt *idhentry;
3815 3816
	MemoryContext oldcxt;
	int			i;
3817 3818

	/*
3819
	 * We must write a temporary file and rename it into place. Otherwise,
B
Bruce Momjian 已提交
3820 3821
	 * another backend starting at about the same time might crash trying to
	 * read the partially-complete file.
3822
	 */
3823 3824 3825 3826
	snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
			 DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
	snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
			 DatabasePath, RELCACHE_INIT_FILENAME);
3827

3828 3829 3830 3831
	unlink(tempfilename);		/* in case it exists w/wrong permissions */

	fp = AllocateFile(tempfilename, PG_BINARY_W);
	if (fp == NULL)
3832 3833 3834 3835 3836
	{
		/*
		 * We used to consider this a fatal error, but we might as well
		 * continue with backend startup ...
		 */
3837 3838
		ereport(WARNING,
				(errcode_for_file_access(),
3839
				 errmsg("could not create relation-cache initialization file \"%s\": %m",
3840
						tempfilename),
B
Bruce Momjian 已提交
3841
			  errdetail("Continuing anyway, but there's something wrong.")));
3842 3843
		return;
	}
3844

3845
	/*
B
Bruce Momjian 已提交
3846
	 * Write a magic number to serve as a file version identifier.	We can
3847 3848 3849 3850 3851 3852
	 * change the magic number whenever the relcache layout changes.
	 */
	magic = RELCACHE_INIT_FILEMAGIC;
	if (fwrite(&magic, 1, sizeof(magic), fp) != sizeof(magic))
		elog(FATAL, "could not write init file");

3853
	/*
3854
	 * Write all the reldescs (in no particular order).
H
Hiroshi Inoue 已提交
3855
	 */
3856
	hash_seq_init(&status, RelationIdCache);
3857

3858
	initFileRelationIds = NIL;
3859

3860
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3861
	{
3862
		Relation	rel = idhentry->reldesc;
3863
		Form_pg_class relform = rel->rd_rel;
3864

B
Bruce Momjian 已提交
3865 3866
		/* first write the relcache entry proper */
		write_item(rel, sizeof(RelationData), fp);
3867 3868

		/* next write the relation tuple form */
B
Bruce Momjian 已提交
3869
		write_item(relform, CLASS_TUPLE_SIZE, fp);
3870 3871 3872 3873

		/* next, do all the attribute tuple form data entries */
		for (i = 0; i < relform->relnatts; i++)
		{
3874
			write_item(rel->rd_att->attrs[i], ATTRIBUTE_TUPLE_SIZE, fp);
3875 3876
		}

B
Bruce Momjian 已提交
3877 3878
		/* next, do the access method specific field */
		write_item(rel->rd_options,
3879
				   (rel->rd_options ? VARSIZE(rel->rd_options) : 0),
3880
				   fp);
B
Bruce Momjian 已提交
3881

3882 3883 3884 3885
		/* If it's an index, there's more to do */
		if (rel->rd_rel->relkind == RELKIND_INDEX)
		{
			Form_pg_am	am = rel->rd_am;
3886

3887 3888
			/* write the pg_index tuple */
			/* we assume this was created by heap_copytuple! */
B
Bruce Momjian 已提交
3889
			write_item(rel->rd_indextuple,
3890 3891
					   HEAPTUPLESIZE + rel->rd_indextuple->t_len,
					   fp);
3892 3893

			/* next, write the access method tuple form */
B
Bruce Momjian 已提交
3894
			write_item(am, sizeof(FormData_pg_am), fp);
3895

3896 3897 3898 3899 3900 3901 3902 3903 3904 3905
			/* next, write the vector of opfamily OIDs */
			write_item(rel->rd_opfamily,
					   relform->relnatts * sizeof(Oid),
					   fp);

			/* next, write the vector of opcintype OIDs */
			write_item(rel->rd_opcintype,
					   relform->relnatts * sizeof(Oid),
					   fp);

3906
			/* next, write the vector of operator OIDs */
3907 3908 3909
			write_item(rel->rd_operator,
					   relform->relnatts * (am->amstrategies * sizeof(Oid)),
					   fp);
3910

3911
			/* next, write the vector of support procedures */
3912
			write_item(rel->rd_support,
B
Bruce Momjian 已提交
3913
				  relform->relnatts * (am->amsupport * sizeof(RegProcedure)),
3914
					   fp);
3915 3916 3917 3918 3919

			/* finally, write the vector of indoption values */
			write_item(rel->rd_indoption,
					   relform->relnatts * sizeof(int16),
					   fp);
3920
		}
3921

3922 3923
		/* also make a list of their OIDs, for RelationIdIsInInitFile */
		oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3924
		initFileRelationIds = lcons_oid(RelationGetRelid(rel),
B
Bruce Momjian 已提交
3925
										initFileRelationIds);
3926
		MemoryContextSwitchTo(oldcxt);
3927
	}
3928

3929 3930
	if (FreeFile(fp))
		elog(FATAL, "could not write init file");
3931

3932
	/*
3933
	 * Now we have to check whether the data we've so painstakingly
B
Bruce Momjian 已提交
3934 3935 3936 3937 3938
	 * accumulated is already obsolete due to someone else's just-committed
	 * catalog changes.  If so, we just delete the temp file and leave it to
	 * the next backend to try again.  (Our own relcache entries will be
	 * updated by SI message processing, but we can't be sure whether what we
	 * wrote out was up-to-date.)
3939
	 *
B
Bruce Momjian 已提交
3940 3941
	 * This mustn't run concurrently with RelationCacheInitFileInvalidate, so
	 * grab a serialization lock for the duration.
3942
	 */
3943 3944 3945 3946 3947 3948
	LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);

	/* Make sure we have seen all incoming SI messages */
	AcceptInvalidationMessages();

	/*
B
Bruce Momjian 已提交
3949 3950
	 * If we have received any SI relcache invals since backend start, assume
	 * we may have written out-of-date data.
3951 3952
	 */
	if (relcacheInvalsReceived == 0L)
3953 3954
	{
		/*
3955 3956
		 * OK, rename the temp file to its final name, deleting any
		 * previously-existing init file.
3957
		 *
3958 3959 3960 3961
		 * Note: a failure here is possible under Cygwin, if some other
		 * backend is holding open an unlinked-but-not-yet-gone init file. So
		 * treat this as a noncritical failure; just remove the useless temp
		 * file on failure.
3962
		 */
3963 3964
		if (rename(tempfilename, finalfilename) < 0)
			unlink(tempfilename);
3965 3966 3967 3968
	}
	else
	{
		/* Delete the already-obsolete temp file */
3969 3970
		unlink(tempfilename);
	}
3971 3972

	LWLockRelease(RelCacheInitLock);
3973 3974
}

3975 3976 3977 3978 3979 3980 3981 3982 3983 3984
/* write a chunk of data preceded by its length */
static void
write_item(const void *data, Size len, FILE *fp)
{
	if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
		elog(FATAL, "could not write init file");
	if (fwrite(data, 1, len, fp) != len)
		elog(FATAL, "could not write init file");
}

3985 3986 3987 3988 3989 3990 3991 3992 3993 3994 3995 3996
/*
 * Detect whether a given relation (identified by OID) is one of the ones
 * we store in the init file.
 *
 * Note that we effectively assume that all backends running in a database
 * would choose to store the same set of relations in the init file;
 * otherwise there are cases where we'd fail to detect the need for an init
 * file invalidation.  This does not seem likely to be a problem in practice.
 */
bool
RelationIdIsInInitFile(Oid relationId)
{
3997
	return list_member_oid(initFileRelationIds, relationId);
3998 3999 4000 4001 4002 4003 4004 4005 4006 4007 4008 4009
}

/*
 * Invalidate (remove) the init file during commit of a transaction that
 * changed one or more of the relation cache entries that are kept in the
 * init file.
 *
 * We actually need to remove the init file twice: once just before sending
 * the SI messages that include relcache inval for such relations, and once
 * just after sending them.  The unlink before ensures that a backend that's
 * currently starting cannot read the now-obsolete init file and then miss
 * the SI messages that will force it to update its relcache entries.  (This
4010
 * works because the backend startup sequence gets into the PGPROC array before
4011 4012 4013 4014 4015 4016 4017 4018 4019 4020 4021 4022 4023 4024 4025 4026 4027 4028 4029 4030 4031 4032 4033 4034
 * trying to load the init file.)  The unlink after is to synchronize with a
 * backend that may currently be trying to write an init file based on data
 * that we've just rendered invalid.  Such a backend will see the SI messages,
 * but we can't leave the init file sitting around to fool later backends.
 *
 * Ignore any failure to unlink the file, since it might not be there if
 * no backend has been started since the last removal.
 */
void
RelationCacheInitFileInvalidate(bool beforeSend)
{
	char		initfilename[MAXPGPATH];

	snprintf(initfilename, sizeof(initfilename), "%s/%s",
			 DatabasePath, RELCACHE_INIT_FILENAME);

	if (beforeSend)
	{
		/* no interlock needed here */
		unlink(initfilename);
	}
	else
	{
		/*
B
Bruce Momjian 已提交
4035 4036
		 * We need to interlock this against write_relcache_init_file, to
		 * guard against possibility that someone renames a new-but-
B
Bruce Momjian 已提交
4037 4038 4039 4040
		 * already-obsolete init file into place just after we unlink. With
		 * the interlock, it's certain that write_relcache_init_file will
		 * notice our SI inval message before renaming into place, or else
		 * that we will execute second and successfully unlink the file.
4041 4042 4043 4044 4045
		 */
		LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
		unlink(initfilename);
		LWLockRelease(RelCacheInitLock);
	}
4046
}
4047 4048 4049 4050 4051 4052 4053 4054 4055 4056 4057 4058 4059 4060 4061 4062 4063 4064 4065 4066 4067 4068

/*
 * Remove the init file for a given database during postmaster startup.
 *
 * We used to keep the init file across restarts, but that is unsafe in PITR
 * scenarios, and even in simple crash-recovery cases there are windows for
 * the init file to become out-of-sync with the database.  So now we just
 * remove it during startup and expect the first backend launch to rebuild it.
 * Of course, this has to happen in each database of the cluster.  For
 * simplicity this is driven by flatfiles.c, which has to scan pg_database
 * anyway.
 */
void
RelationCacheInitFileRemove(const char *dbPath)
{
	char		initfilename[MAXPGPATH];

	snprintf(initfilename, sizeof(initfilename), "%s/%s",
			 dbPath, RELCACHE_INIT_FILENAME);
	unlink(initfilename);
	/* ignore any error, since it might not be there at all */
}