relcache.c 146.8 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * relcache.c
4
 *	  POSTGRES relation descriptor cache code
5
 *
6 7
 * Portions Copyright (c) 2005-2009, Greenplum inc.
 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
B
Add:  
Bruce Momjian 已提交
8
 * Portions Copyright (c) 1994, Regents of the University of California
9 10 11
 *
 *
 * IDENTIFICATION
12
 *	  $PostgreSQL: pgsql/src/backend/utils/cache/relcache.c,v 1.266.2.10 2010/09/02 03:17:06 tgl Exp $
13 14 15 16 17
 *
 *-------------------------------------------------------------------------
 */
/*
 * INTERFACE ROUTINES
18
 *		RelationCacheInitialize			- initialize relcache (to empty)
19 20
 *		RelationCacheInitializePhase2	- initialize shared-catalog entries
 *		RelationCacheInitializePhase3	- finish initializing relcache
21 22
 *		RelationIdGetRelation			- get a reldesc by relation id
 *		RelationClose					- close an open relation
23 24
 *
 * NOTES
25 26
 *		The following code contains many undocumented hacks.  Please be
 *		careful....
27
 */
28 29
#include "postgres.h"

30
#include <sys/file.h>
31
#include <fcntl.h>
32
#include <unistd.h>
33

34 35
#include "access/genam.h"
#include "access/heapam.h"
36
#include "access/reloptions.h"
37
#include "access/sysattr.h"
38
#include "access/xact.h"
39
#include "catalog/catalog.h"
40
#include "catalog/catquery.h"
41
#include "catalog/index.h"
B
Bruce Momjian 已提交
42
#include "catalog/indexing.h"
43
#include "catalog/namespace.h"
44 45
#include "catalog/pg_amop.h"
#include "catalog/pg_amproc.h"
B
Bruce Momjian 已提交
46
#include "catalog/pg_attrdef.h"
47
#include "catalog/pg_authid.h"
48
#include "catalog/pg_auth_members.h"
49
#include "catalog/pg_constraint.h"
50
#include "catalog/pg_database.h"
51
#include "catalog/pg_namespace.h"
52
#include "catalog/pg_opclass.h"
53
#include "catalog/pg_operator.h"
B
Bruce Momjian 已提交
54
#include "catalog/pg_proc.h"
55
#include "catalog/pg_rewrite.h"
56 57
#include "catalog/pg_tablespace.h"
#include "catalog/pg_trigger.h"
58
#include "catalog/pg_type.h"
59
#include "commands/trigger.h"
B
Bruce Momjian 已提交
60
#include "miscadmin.h"
61 62
#include "optimizer/clauses.h"
#include "optimizer/planmain.h"
63
#include "optimizer/prep.h"
64
#include "optimizer/var.h"
65
#include "rewrite/rewriteDefine.h"
66
#include "storage/fd.h"
B
Bruce Momjian 已提交
67
#include "storage/smgr.h"
68
#include "utils/builtins.h"
69
#include "utils/fmgroids.h"
70
#include "utils/inval.h"
71
#include "utils/memutils.h"
B
Bruce Momjian 已提交
72
#include "utils/relcache.h"
73
#include "utils/relationnode.h"
74
#include "utils/resowner.h"
75
#include "utils/syscache.h"
B
Bruce Momjian 已提交
76

77
#include "catalog/gp_policy.h"         /* GpPolicy */
78 79 80 81 82 83
#include "cdb/cdbtm.h"
#include "cdb/cdbvars.h"        /* Gp_role */
#include "cdb/cdbmirroredflatfile.h"
#include "cdb/cdbpersistentfilesysobj.h"
#include "cdb/cdbsreh.h"

84

85 86 87 88 89
/*
 * name of relcache init file, used to speed up backend startup
 */
#define RELCACHE_INIT_FILENAME	"pg_internal.init"

90
#define RELCACHE_INIT_FILEMAGIC		0x773264	/* version ID value */
91

92
/*
93
 *		hardcoded tuple descriptors.  see include/catalog/pg_attribute.h
94
 */
95 96 97 98 99 100 101 102 103
static const FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
static const FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
static const FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
static const FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
static const FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};

static const FormData_pg_attribute Desc_pg_database[Natts_pg_database] = {Schema_pg_database};
static const FormData_pg_attribute Desc_pg_authid[Natts_pg_authid] = {Schema_pg_authid};
static const FormData_pg_attribute Desc_pg_auth_members[Natts_pg_auth_members] = {Schema_pg_auth_members};
104

105
/*
106
 *		Hash tables that index the relation cache
107
 *
108 109
 *		We used to index the cache by both name and OID, but now there
 *		is only an index by OID.
110
 */
111 112 113 114 115 116
typedef struct relidcacheent
{
	Oid			reloid;
	Relation	reldesc;
} RelIdCacheEnt;

117
static HTAB *RelationIdCache;
118

119 120 121 122
/*
 * This flag is false until we have prepared the critical relcache entries
 * that are needed to do indexscans on the tables read by relcache building.
 */
B
Bruce Momjian 已提交
123
bool		criticalRelcachesBuilt = false;
124

125 126 127 128 129 130
/*
 * This flag is false until we have prepared the critical relcache entries
 * for shared catalogs (which are the tables needed for login).
 */
bool		criticalSharedRelcachesBuilt = false;

131 132
/*
 * This counter counts relcache inval events received since backend startup
B
Bruce Momjian 已提交
133
 * (but only for rels that are actually in cache).	Presently, we use it only
134 135 136 137
 * to detect whether data about to be written by write_relcache_init_file()
 * might already be obsolete.
 */
static long relcacheInvalsReceived = 0L;
138

139
/*
140 141 142 143
 * This list remembers the OIDs of the non-shared relations cached in the
 * database's local relcache init file.  Note that there is no corresponding
 * list for the shared relcache init file, for reasons explained in the
 * comments for RelationCacheInitFileRemove.
144 145
 */
static List *initFileRelationIds = NIL;
146

147
/*
148
 * This flag lets us optimize away work in AtEO(Sub)Xact_RelationCache().
149
 */
150
static bool need_eoxact_work = false;
151

152

153
/*
154
 *		macros to manipulate the lookup hashtables
155 156
 */
#define RelationCacheInsert(RELATION)	\
157
do { \
158
	RelIdCacheEnt *idhentry; bool found; \
159
	idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
160
										   (void *) &(RELATION->rd_id), \
161
										   HASH_ENTER, &found); \
162
	/* used to give notice if found -- now just keep quiet */ \
163 164 165
	idhentry->reldesc = RELATION; \
} while(0)

166
#define RelationIdCacheLookup(ID, RELATION) \
167
do { \
168 169
	RelIdCacheEnt *hentry; \
	hentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
170 171
										 (void *) &(ID), \
										 HASH_FIND, NULL); \
172
	if (hentry) \
173 174 175 176 177 178 179
		RELATION = hentry->reldesc; \
	else \
		RELATION = NULL; \
} while(0)

#define RelationCacheDelete(RELATION) \
do { \
180
	RelIdCacheEnt *idhentry; \
181
	idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
182
										   (void *) &(RELATION->rd_id), \
183
										   HASH_REMOVE, NULL); \
184
	if (idhentry == NULL) \
185
		elog(WARNING, "trying to delete a rd_id reldesc that does not exist"); \
186
} while(0)
187

188 189 190

/*
 * Special cache for opclass-related information
191
 *
192 193
 * Note: only default operators and support procs get cached, ie, those with
 * lefttype = righttype = opcintype.
194 195 196 197 198 199 200
 */
typedef struct opclasscacheent
{
	Oid			opclassoid;		/* lookup key: OID of opclass */
	bool		valid;			/* set TRUE after successful fill-in */
	StrategyNumber numStrats;	/* max # of strategies (from pg_am) */
	StrategyNumber numSupport;	/* max # of support procs (from pg_am) */
201 202
	Oid			opcfamily;		/* OID of opclass's family */
	Oid			opcintype;		/* OID of opclass's declared input type */
203
	Oid		   *operatorOids;	/* strategy operators' OIDs */
B
Bruce Momjian 已提交
204
	RegProcedure *supportProcs; /* support procs */
205 206 207 208 209
} OpClassCacheEnt;

static HTAB *OpClassCache = NULL;


210
/* non-export function prototypes */
211

212
static void RelationDestroyRelation(Relation relation);
213
static void RelationClearRelation(Relation relation, bool rebuild);
B
Bruce Momjian 已提交
214

215
static void RelationReloadIndexInfo(Relation relation);
216
static void RelationFlushRelation(Relation relation);
217 218
static bool load_relcache_init_file(bool shared);
static void write_relcache_init_file(bool shared);
B
Bruce Momjian 已提交
219
static void write_item(const void *data, Size len, FILE *fp);
220

221
static void formrdesc(const char *relationName, Oid relationReltype,
222 223
		  bool isshared, bool hasoids,
		  int natts, const FormData_pg_attribute *att);
224

225 226
static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK, Relation *pg_class_relation);
static Relation AllocateRelationDesc(Form_pg_class relp);
227
static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
228
static void RelationBuildTupleDesc(Relation relation);
229
static Relation RelationBuildDesc(Oid targetRelId, bool insertIt);
230
static void RelationInitPhysicalAddr(Relation relation);
231
static void load_critical_index(Oid indexoid, Oid heapoid);
232
static TupleDesc GetPgClassDescriptor(void);
233
static TupleDesc GetPgIndexDescriptor(void);
234
static void AttrDefaultFetch(Relation relation);
235
static void CheckConstraintFetch(Relation relation);
236
static List *insert_ordered_oid(List *list, Oid datum);
237
static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
B
Bruce Momjian 已提交
238 239
				  StrategyNumber numStrats,
				  StrategyNumber numSupport);
240 241
static void RelationCacheInitFileRemoveInDir(const char *tblspcpath);
static void unlink_initfile(const char *initfilename);
242

243

244
/*
245
 *		ScanPgRelation
246
 *
247 248 249 250 251
 *		This is used by RelationBuildDesc to find a pg_class
 *		tuple matching targetRelId.  The caller must hold at least
 *		AccessShareLock on the target relid to prevent concurrent-update
 *		scenarios --- else our SnapshotNow scan might fail to find any
 *		version that it thinks is live.
252 253 254
 *
 *		NB: the returned tuple has been copied into palloc'd storage
 *		and must eventually be freed with heap_freetuple.
255
 */
256
static HeapTuple
257
ScanPgRelation(Oid targetRelId, bool indexOK, Relation *pg_class_relation)
258
{
259 260
	HeapTuple	pg_class_tuple;
	Relation	pg_class_desc;
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394
	cqContext	cqc;

	/*
	 * If something goes wrong during backend startup, we might find ourselves
	 * trying to read pg_class before we've selected a database.  That ain't
	 * gonna work, so bail out with a useful error message.  If this happens,
	 * it probably means a relcache entry that needs to be nailed isn't.
	 */
	if (!OidIsValid(MyDatabaseId))
		elog(FATAL, "cannot read pg_class without having selected a database");

	/*
	 * form a scan key
	 */

	/*
	 * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).  The caller can also force a heap
	 * scan by setting indexOK == false.
	 */
	pg_class_desc = heap_open(RelationRelationId, AccessShareLock);

	pg_class_tuple = caql_getfirst(
			caql_syscache(
					caql_indexOK(caql_addrel(cqclr(&cqc), pg_class_desc), 
								 (indexOK && criticalRelcachesBuilt)),
					false),
			cql("SELECT * FROM pg_class "
				" WHERE oid = :1 ",
				ObjectIdGetDatum(targetRelId)));

	/*
	 * Must copy tuple before releasing buffer. -- already a copy
	 */

	/* all done */

	if (pg_class_relation == NULL)
		heap_close(pg_class_desc, AccessShareLock);
	else
		*pg_class_relation = pg_class_desc;

	return pg_class_tuple;
}

void
GpRelationNodeBeginScan(
	Snapshot	snapshot,
	Relation 	gp_relation_node,
	Oid		relationId,
	Oid 		relfilenode,
	GpRelationNodeScan 	*gpRelationNodeScan)
{
	Assert (relfilenode != 0);

	MemSet(gpRelationNodeScan, 0, sizeof(GpRelationNodeScan));

	/*
	 * form a scan key
	 */
	/* XXX XXX: break this out -- find callers - jic 2011/12/09 */
	/* maybe it's ok - return a cql context ? */

	/* XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX */
	/* no json defs for persistent tables ? */
/*
	cqxx("SELECT * FROM gp_relation_node_relfilenode "
		 " WHERE oid = :1 ",
		 ObjectIdGetDatum(relfilenode));
*/
	/* XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX */

	ScanKeyInit(&gpRelationNodeScan->scankey[0],
				Anum_gp_relation_node_relfilenode_oid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(relfilenode));

	/*
	 * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).  The caller can also force a heap
	 * scan by setting indexOK == false.
	 */
	gpRelationNodeScan->scan = \
		systable_beginscan(gp_relation_node, GpRelationNodeOidIndexId,
						   /* indexOK */ true,
						   snapshot,
						   /* nKeys */ 1, 
						   gpRelationNodeScan->scankey);

	gpRelationNodeScan->gp_relation_node = gp_relation_node;
	gpRelationNodeScan->relationId = relationId;
	gpRelationNodeScan->relfilenode = relfilenode;
}

HeapTuple
GpRelationNodeGetNext(
	GpRelationNodeScan 	*gpRelationNodeScan,
	int32				*segmentFileNum,
	ItemPointer			persistentTid,
	int64				*persistentSerialNum)
{
	HeapTuple tuple;
	
	bool			nulls[Natts_gp_relation_node];
	Datum			values[Natts_gp_relation_node];
	
	Oid actualRelationNode;
	
	int64 createMirrorDataLossTrackingSessionNum;

	tuple = systable_getnext((SysScanDesc)gpRelationNodeScan->scan);

	/*
	 * if no such tuple exists, return NULL
	 */
	if (!HeapTupleIsValid(tuple))
	{
		MemSet(persistentTid, 0, sizeof(ItemPointerData));
		*persistentSerialNum = 0;
		return tuple;
	}
	
	heap_deform_tuple(tuple, RelationGetDescr(gpRelationNodeScan->gp_relation_node), values, nulls);
		
	GpRelationNode_GetValues(
						values,
						&actualRelationNode,
						segmentFileNum,
						&createMirrorDataLossTrackingSessionNum,
						persistentTid,
						persistentSerialNum);
	if (actualRelationNode != gpRelationNodeScan->relfilenode)
395 396
		elog(FATAL, "Index on gp_relation_node broken."
			   "Mismatch in node tuple for gp_relation_node for relation %u, relfilenode %u, relation node %u",
397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
			 gpRelationNodeScan->relationId, 
			 gpRelationNodeScan->relfilenode,
			 actualRelationNode);

	return tuple;
}


void
GpRelationNodeEndScan(
	GpRelationNodeScan 	*gpRelationNodeScan)
{
	/* all done */
	systable_endscan((SysScanDesc)gpRelationNodeScan->scan);
}

413
static HeapTuple
414 415 416 417 418 419 420 421 422 423
ScanGpRelationNodeTuple(
	Relation 	gp_relation_node,
	Oid 		relfilenode,
	int32		segmentFileNum)
{
	HeapTuple	tuple;
	SysScanDesc scan;
	ScanKeyData key[2];

	Assert (relfilenode != 0);
424

425
	/*
B
Bruce Momjian 已提交
426
	 * form a scan key
427
	 */
428 429 430 431 432 433 434 435 436 437 438

	/* XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX */
/*
	cqxx("SELECT * FROM gp_relation_node "
		 " WHERE relfilenode_oid = :1 "
		 " AND segment_file_num = :2 ",
		 ObjectIdGetDatum(relfilenode),
		 Int32GetDatum(segmentFileNum));
*/
	/* XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX */

439
	ScanKeyInit(&key[0],
440
				Anum_gp_relation_node_relfilenode_oid,
441
				BTEqualStrategyNumber, F_OIDEQ,
442 443 444 445 446
				ObjectIdGetDatum(relfilenode));
	ScanKeyInit(&key[1],
				Anum_gp_relation_node_segment_file_num,
				BTEqualStrategyNumber, F_INT4EQ,
				Int32GetDatum(segmentFileNum));
447

448
	/*
B
Bruce Momjian 已提交
449
	 * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
B
Bruce Momjian 已提交
450 451 452
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).  The caller can also force a heap
	 * scan by setting indexOK == false.
453
	 */
454 455
	scan = systable_beginscan(gp_relation_node, GpRelationNodeOidIndexId,
									   /* indexOK */ true,
456
									   SnapshotNow,
457
									   2, key);
458

459
	tuple = systable_getnext(scan);
B
Bruce Momjian 已提交
460

H
Hiroshi Inoue 已提交
461
	/*
462
	 * Must copy tuple before releasing buffer.
H
Hiroshi Inoue 已提交
463
	 */
464 465
	if (HeapTupleIsValid(tuple))
		tuple = heap_copytuple(tuple);
466

467
	/* all done */
468
	systable_endscan(scan);
469

470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517
	return tuple;
}

HeapTuple
FetchGpRelationNodeTuple(
	Relation 		gp_relation_node,
	Oid 			relfilenode,
	int32			segmentFileNum,
	ItemPointer		persistentTid,
	int64			*persistentSerialNum)
{
	HeapTuple tuple;
	
	bool			nulls[Natts_gp_relation_node];
	Datum			values[Natts_gp_relation_node];
	
	Oid actualRelationNode;
	int32 actualSegmentFileNum;

	int64 createMirrorDataLossTrackingSessionNum;

	Assert (relfilenode != 0);
	
	tuple = ScanGpRelationNodeTuple(
					gp_relation_node,
					relfilenode,
					segmentFileNum);
	
	/*
	 * if no such tuple exists, return NULL
	 */
	if (!HeapTupleIsValid(tuple))
	{
		MemSet(persistentTid, 0, sizeof(ItemPointerData));
		*persistentSerialNum = 0;
		return tuple;
	}
	
	heap_deform_tuple(tuple, RelationGetDescr(gp_relation_node), values, nulls);
		
	GpRelationNode_GetValues(
						values,
						&actualRelationNode,
						&actualSegmentFileNum,
						&createMirrorDataLossTrackingSessionNum,
						persistentTid,
						persistentSerialNum);
	
518 519 520 521 522 523 524 525
	if (actualRelationNode != relfilenode)
	{
		elog(ERROR, "Index on gp_relation_node broken."
			   "Mismatch in node tuple for gp_relation_node intended relfilenode %u, fetched relfilenode %u",
			 relfilenode,
			 actualRelationNode);
	}

526 527 528 529 530 531 532 533 534 535 536 537 538 539
	return tuple;
}

/*
 * Deletes the gp relation node entry for the
 * given segment file.
 */ 
void
DeleteGpRelationNodeTuple(
	Relation 	relation,
	int32		segmentFileNum)
{
	Relation	gp_relation_node;
	HeapTuple	tuple;
540 541
	ItemPointerData     persistentTid;
	int64               persistentSerialNum;
542 543 544

	gp_relation_node = heap_open(GpRelationNodeRelationId, RowExclusiveLock);

545 546 547 548 549 550
	tuple = FetchGpRelationNodeTuple(gp_relation_node,
				relation->rd_rel->relfilenode,
				segmentFileNum,
				&persistentTid,
				&persistentSerialNum);

551 552 553 554 555 556
	if (!HeapTupleIsValid(tuple))
		elog(ERROR, "could not find node tuple for relation %u, relation file node %u, segment file #%d",
			 RelationGetRelid(relation),
			 relation->rd_rel->relfilenode,
			 segmentFileNum);

557
	/* delete the relation tuple from gp_relation_node, and finish up */
558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657
	simple_heap_delete(gp_relation_node, &tuple->t_self);
	heap_freetuple(tuple);

	heap_close(gp_relation_node, RowExclusiveLock);
}

bool
ReadGpRelationNode(
	Oid 			relfilenode,
	
	int32			segmentFileNum,

	ItemPointer		persistentTid,

	int64			*persistentSerialNum)
{
	Relation gp_relation_node;
	HeapTuple tuple;
	bool found;

	MemSet(persistentTid, 0, sizeof(ItemPointerData));
	*persistentSerialNum = 0;

	gp_relation_node = heap_open(GpRelationNodeRelationId, AccessShareLock);

	tuple = FetchGpRelationNodeTuple(
						gp_relation_node,
						relfilenode,
						segmentFileNum,
						persistentTid,
						persistentSerialNum);

	/*
	 * if no such tuple exists, return NULL
	 */
	if (!HeapTupleIsValid(tuple))
	{
		found = false;
	}
	else
	{
		if (Debug_persistent_print)
		{
			TupleVisibilitySummary tupleVisibilitySummary;
			char *tupleVisibilitySummaryString;
			
			GetTupleVisibilitySummary(
									tuple,
									&tupleVisibilitySummary);
			tupleVisibilitySummaryString = GetTupleVisibilitySummaryString(&tupleVisibilitySummary);
			
			elog(Persistent_DebugPrintLevel(), 
				 "ReadGpRelationNode: For relfilenode %u, segment file #%d found persistent serial number " INT64_FORMAT ", TID %s (gp_relation_node tuple visibility: %s)",
				 relfilenode,
				 segmentFileNum,
				 *persistentSerialNum,
				 ItemPointerToString(persistentTid),
				 tupleVisibilitySummaryString);
			pfree(tupleVisibilitySummaryString);
		}

		found = true;
		heap_freetuple(tuple);
	}

	heap_close(gp_relation_node, AccessShareLock);

	return found;
}

void
RelationFetchSegFile0GpRelationNode(
	Relation relation)
{
	if (!relation->rd_segfile0_relationnodeinfo.isPresent)
	{
		if (Persistent_BeforePersistenceWork() || InRecovery)
		{
			MemSet(&relation->rd_segfile0_relationnodeinfo.persistentTid, 0, sizeof(ItemPointerData));
			relation->rd_segfile0_relationnodeinfo.persistentSerialNum = 0;
		
			relation->rd_segfile0_relationnodeinfo.isPresent = true;
			relation->rd_segfile0_relationnodeinfo.tidAllowedToBeZero = true;
			
			return; // The initdb process will load the persistent table once we out of bootstrap mode.
		}

		if (!ReadGpRelationNode(
					relation->rd_node.relNode,
					/* segmentFileNum */ 0,
					&relation->rd_segfile0_relationnodeinfo.persistentTid,
					&relation->rd_segfile0_relationnodeinfo.persistentSerialNum))
		{
			elog(ERROR, "Did not find gp_relation_node entry for relation name %s, relation id %u, relfilenode %u",
				 relation->rd_rel->relname.data,
				 relation->rd_id,
				 relation->rd_node.relNode);
		}

		Assert(!Persistent_BeforePersistenceWork());
658
		if (PersistentStore_IsZeroTid(&relation->rd_segfile0_relationnodeinfo.persistentTid))
659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723
		{	
			elog(ERROR, 
				 "RelationFetchSegFile0GpRelationNode has invalid TID (0,0) into relation %u/%u/%u '%s', serial number " INT64_FORMAT,
				 relation->rd_node.spcNode,
				 relation->rd_node.dbNode,
				 relation->rd_node.relNode,
				 NameStr(relation->rd_rel->relname),
				 relation->rd_segfile0_relationnodeinfo.persistentSerialNum);
		}

		relation->rd_segfile0_relationnodeinfo.isPresent = true;
		
	}

}

// UNDONE: Temporary
void
RelationFetchGpRelationNodeForXLog_Index(
	Relation relation)
{
	static int countInThisBackend = 0;
	static int deep = 0;
	
	deep++;

	countInThisBackend++;

	if (deep >= 2)
	{
		int saveDeep;

		if (Debug_gp_relation_node_fetch_wait_for_debugging)
		{
			/* Code for investigating MPP-16395, will be removed as part of the fix */
			elog(WARNING, "RelationFetchGpRelationNodeForXLog_Index [%d] for non-heap %u/%u/%u (deep %d) -- waiting for debug attach...",
				 countInThisBackend,
				 relation->rd_node.spcNode,
				 relation->rd_node.dbNode,
				 relation->rd_node.relNode,
				 deep);

			for (int i=0; i < 24 * 60; i++)
			{
				pg_usleep(60000000L); /* 60 sec */
			}
		}

		/*
		 * Reset counter in case the user continues to use the session.
		 */
		saveDeep = deep;
		deep = 0;

		elog(ERROR, "RelationFetchGpRelationNodeForXLog_Index [%d] for non-heap %u/%u/%u (deep %d)",
			 countInThisBackend,
			 relation->rd_node.spcNode,
			 relation->rd_node.dbNode,
			 relation->rd_node.relNode,
			 saveDeep);
	}

	RelationFetchSegFile0GpRelationNode(relation);

	deep--;
724 725
}

726
/*
727
 *		AllocateRelationDesc
728
 *
729
 *		This is used to allocate memory for a new relation descriptor
730
 *		and initialize the rd_rel field from the given pg_class tuple.
731
 */
732
static Relation
733
AllocateRelationDesc(Form_pg_class relp)
734
{
735
	Relation	relation;
736
	MemoryContext oldcxt;
737
	Form_pg_class relationForm;
738

739 740
	/* Relcache entries must live in CacheMemoryContext */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
741

742
	/*
743
	 * allocate and zero space for new relation descriptor
744
	 */
745
	relation = (Relation) palloc0(sizeof(RelationData));
746

747
	/*
748
	 * clear fields of reldesc that should initialize to something non-zero
749
	 */
750
	relation->rd_targblock = InvalidBlockNumber;
751

752
	/* make sure relation is marked as having no open file yet */
753
	relation->rd_smgr = NULL;
754

755
	/*
B
Bruce Momjian 已提交
756
	 * Copy the relation tuple form
757
	 *
B
Bruce Momjian 已提交
758 759
	 * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE. The
	 * variable-length fields (relacl, reloptions) are NOT stored in the
760 761
	 * relcache --- there'd be little point in it, since we don't copy the
	 * tuple's nulls bitmap and hence wouldn't know if the values are valid.
B
Bruce Momjian 已提交
762 763 764 765
	 * Bottom line is that relacl *cannot* be retrieved from the relcache. Get
	 * it from the syscache if you need it.  The same goes for the original
	 * form of reloptions (however, we do store the parsed form of reloptions
	 * in rd_options).
766 767
	 */
	relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
768

769
	memcpy(relationForm, relp, CLASS_TUPLE_SIZE);
770 771

	/* initialize relation tuple form */
772
	relation->rd_rel = relationForm;
773

774 775 776 777 778 779 780
	/*
	 * This part MUST be remain as a fetch on demand, otherwise you end up
	 * needing it to open pg_class and then relation_open does infinite recursion...
	 */
	relation->rd_segfile0_relationnodeinfo.isPresent = false;
	relation->rd_segfile0_relationnodeinfo.tidAllowedToBeZero = false;

781
	/* and allocate attribute tuple form storage */
782 783
	relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts,
											   relationForm->relhasoids);
784 785
	/* which we mark as a reference-counted tupdesc */
	relation->rd_att->tdrefcount = 1;
786 787 788

	MemoryContextSwitchTo(oldcxt);

789
	return relation;
790 791
}

B
Bruce Momjian 已提交
792
/*
793 794 795 796 797 798
 * RelationParseRelOptions
 *		Convert pg_class.reloptions into pre-parsed rd_options
 *
 * tuple is the real pg_class tuple (not rd_rel!) for relation
 *
 * Note: rd_rel and (if an index) rd_am must be valid already
B
Bruce Momjian 已提交
799 800
 */
static void
801
RelationParseRelOptions(Relation relation, HeapTuple tuple)
B
Bruce Momjian 已提交
802
{
803 804 805
	Datum		datum;
	bool		isnull;
	bytea	   *options;
B
Bruce Momjian 已提交
806

807
	relation->rd_options = NULL;
B
Bruce Momjian 已提交
808

809
	/* Fall out if relkind should not have options */
B
Bruce Momjian 已提交
810 811
	switch (relation->rd_rel->relkind)
	{
812 813
		case RELKIND_RELATION:
		case RELKIND_TOASTVALUE:
814 815 816
		case RELKIND_AOSEGMENTS:
		case RELKIND_AOBLOCKDIR:
		case RELKIND_AOVISIMAP:
817 818 819 820
		case RELKIND_INDEX:
			break;
		default:
			return;
B
Bruce Momjian 已提交
821 822
	}

823
	/*
B
Bruce Momjian 已提交
824 825 826
	 * Fetch reloptions from tuple; have to use a hardwired descriptor because
	 * we might not have any other for pg_class yet (consider executing this
	 * code for pg_class itself)
827 828 829 830 831 832 833
	 */
	datum = fastgetattr(tuple,
						Anum_pg_class_reloptions,
						GetPgClassDescriptor(),
						&isnull);
	if (isnull)
		return;
B
Bruce Momjian 已提交
834

835
	/* Parse into appropriate format; don't error out here */
B
Bruce Momjian 已提交
836 837
	switch (relation->rd_rel->relkind)
	{
838 839
		case RELKIND_RELATION:
		case RELKIND_TOASTVALUE:
840 841 842
		case RELKIND_AOSEGMENTS:
		case RELKIND_AOBLOCKDIR:
		case RELKIND_AOVISIMAP:
843 844 845 846 847 848 849 850 851 852 853 854 855 856
		case RELKIND_UNCATALOGED:
			options = heap_reloptions(relation->rd_rel->relkind, datum,
									  false);
			break;
		case RELKIND_INDEX:
			options = index_reloptions(relation->rd_am->amoptions, datum,
									   false);
			break;
		default:
			Assert(false);		/* can't get here */
			options = NULL;		/* keep compiler quiet */
			break;
	}

857 858 859 860 861 862
	/*
	 * Copy parsed data into CacheMemoryContext.  To guard against the
	 * possibility of leaks in the reloptions code, we want to do the actual
	 * parsing in the caller's memory context and copy the results into
	 * CacheMemoryContext after the fact.
	 */
863 864 865 866 867
	if (options)
	{
		relation->rd_options = MemoryContextAlloc(CacheMemoryContext,
												  VARSIZE(options));
		memcpy(relation->rd_options, options, VARSIZE(options));
868
		pfree(options);
B
Bruce Momjian 已提交
869 870 871
	}
}

872
/*
873
 *		RelationBuildTupleDesc
874
 *
875
 *		Form the relation's tuple descriptor from information in
876
 *		the pg_attribute, pg_attrdef & pg_constraint system catalogs.
877 878
 */
static void
879
RelationBuildTupleDesc(Relation relation)
880
{
881 882
	HeapTuple	pg_attribute_tuple;
	Relation	pg_attribute_desc;
883 884
	cqContext	cqc;
	cqContext  *pcqCtx;
885
	int			need;
886
	TupleConstr *constr;
H
Hiroshi Inoue 已提交
887
	AttrDefault *attrdef = NULL;
888
	int			ndef = 0;
889

890 891 892 893
	/* copy some fields from pg_class row to rd_att */
	relation->rd_att->tdtypeid = relation->rd_rel->reltype;
	relation->rd_att->tdtypmod = -1;	/* unnecessary, but... */
	relation->rd_att->tdhasoid = relation->rd_rel->relhasoids;
894

895 896
	constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext,
												sizeof(TupleConstr));
H
Hiroshi Inoue 已提交
897
	constr->has_not_null = false;
898

899
	/*
900
	 * Form a scan key that selects only user attributes (attnum > 0).
B
Bruce Momjian 已提交
901 902
	 * (Eliminating system attribute rows at the index level is lots faster
	 * than fetching them.)
903 904
	 */

905
	/*
B
Bruce Momjian 已提交
906 907 908
	 * Open pg_attribute and begin a scan.	Force heap scan if we haven't yet
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).
909
	 */
910
	pg_attribute_desc = heap_open(AttributeRelationId, AccessShareLock);
911 912 913 914 915 916 917 918 919 920 921

	pcqCtx = caql_beginscan(
			caql_syscache(
					caql_indexOK(caql_addrel(cqclr(&cqc), pg_attribute_desc), 
								 criticalRelcachesBuilt),
					false),
			cql("SELECT * FROM pg_attribute "
				" WHERE attrelid = :1 "
				" AND attnum > :2 ",
				ObjectIdGetDatum(RelationGetRelid(relation)),
				Int16GetDatum(0)));
922

923
	/*
B
Bruce Momjian 已提交
924
	 * add attribute data to relation->rd_att
925
	 */
926
	need = relation->rd_rel->relnatts;
927

928
	while (HeapTupleIsValid(pg_attribute_tuple = caql_getnext(pcqCtx)))
929
	{
930 931
		Form_pg_attribute attp;

932
		attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
933

934 935
		if (attp->attnum <= 0 ||
			attp->attnum > relation->rd_rel->relnatts)
936
			elog(ERROR, "invalid attribute number %d for %s",
937 938
				 attp->attnum, RelationGetRelationName(relation));

939 940
		memcpy(relation->rd_att->attrs[attp->attnum - 1],
			   attp,
941
			   ATTRIBUTE_FIXED_PART_SIZE);
942

943 944
		/* Update constraint/default info */
		if (attp->attnotnull)
945
			constr->has_not_null = true;
H
Hiroshi Inoue 已提交
946

947 948 949 950
		if (attp->atthasdef)
		{
			if (attrdef == NULL)
				attrdef = (AttrDefault *)
951 952 953
					MemoryContextAllocZero(CacheMemoryContext,
										   relation->rd_rel->relnatts *
										   sizeof(AttrDefault));
954 955 956
			attrdef[ndef].adnum = attp->attnum;
			attrdef[ndef].adbin = NULL;
			ndef++;
957
		}
958 959 960
		need--;
		if (need == 0)
			break;
961
	}
962

963
	/*
B
Bruce Momjian 已提交
964
	 * end the scan and close the attribute relation
965
	 */
966
	caql_endscan(pcqCtx);
967
	heap_close(pg_attribute_desc, AccessShareLock);
H
Hiroshi Inoue 已提交
968

969 970 971 972
	if (need != 0)
		elog(ERROR, "catalog is missing %d attribute(s) for relid %u",
			 need, RelationGetRelid(relation));

973
	/*
B
Bruce Momjian 已提交
974 975 976
	 * The attcacheoff values we read from pg_attribute should all be -1
	 * ("unknown").  Verify this if assert checking is on.	They will be
	 * computed when and if needed during tuple access.
977 978 979
	 */
#ifdef USE_ASSERT_CHECKING
	{
B
Bruce Momjian 已提交
980
		int			i;
981 982 983 984 985 986

		for (i = 0; i < relation->rd_rel->relnatts; i++)
			Assert(relation->rd_att->attrs[i]->attcacheoff == -1);
	}
#endif

987
	/*
B
Bruce Momjian 已提交
988
	 * However, we can easily set the attcacheoff value for the first
B
Bruce Momjian 已提交
989 990
	 * attribute: it must be zero.	This eliminates the need for special cases
	 * for attnum=1 that used to exist in fastgetattr() and index_getattr().
991
	 */
992 993
	if (relation->rd_rel->relnatts > 0)
		relation->rd_att->attrs[0]->attcacheoff = 0;
994

995 996 997 998
	/*
	 * Set up constraint/default info
	 */
	if (constr->has_not_null || ndef > 0 || relation->rd_rel->relchecks)
999
	{
1000
		relation->rd_att->constr = constr;
1001

1002
		if (ndef > 0)			/* DEFAULTs */
1003
		{
1004 1005 1006 1007 1008 1009 1010
			if (ndef < relation->rd_rel->relnatts)
				constr->defval = (AttrDefault *)
					repalloc(attrdef, ndef * sizeof(AttrDefault));
			else
				constr->defval = attrdef;
			constr->num_defval = ndef;
			AttrDefaultFetch(relation);
1011
		}
1012 1013
		else
			constr->num_defval = 0;
1014

1015
		if (relation->rd_rel->relchecks > 0)	/* CHECKs */
1016
		{
1017 1018
			constr->num_check = relation->rd_rel->relchecks;
			constr->check = (ConstrCheck *)
1019
				MemoryContextAllocZero(CacheMemoryContext,
B
Bruce Momjian 已提交
1020
									constr->num_check * sizeof(ConstrCheck));
1021
			CheckConstraintFetch(relation);
1022
		}
1023 1024 1025 1026 1027 1028 1029
		else
			constr->num_check = 0;
	}
	else
	{
		pfree(constr);
		relation->rd_att->constr = NULL;
1030
	}
1031 1032
}

1033
/*
1034
 *		RelationBuildRuleLock
1035
 *
1036 1037
 *		Form the relation's rewrite rules from information in
 *		the pg_rewrite system catalog.
1038 1039 1040 1041 1042 1043 1044
 *
 * Note: The rule parsetrees are potentially very complex node structures.
 * To allow these trees to be freed when the relcache entry is flushed,
 * we make a private memory context to hold the RuleLock information for
 * each relcache entry that has associated rules.  The context is used
 * just for rule info, not for any other subsidiary data of the relcache
 * entry, because that keeps the update logic in RelationClearRelation()
B
Bruce Momjian 已提交
1045
 * manageable.	The other subsidiary data structures are simple enough
1046
 * to be easy to free explicitly, anyway.
1047 1048 1049 1050
 */
static void
RelationBuildRuleLock(Relation relation)
{
1051 1052
	MemoryContext rulescxt;
	MemoryContext oldcxt;
1053 1054 1055
	HeapTuple	rewrite_tuple;
	Relation	rewrite_desc;
	TupleDesc	rewrite_tupdesc;
1056 1057
	cqContext	cqc;
	cqContext  *pcqCtx;
1058 1059 1060 1061
	RuleLock   *rulelock;
	int			numlocks;
	RewriteRule **rules;
	int			maxlocks;
1062

1063
	/*
B
Bruce Momjian 已提交
1064 1065
	 * Make the private context.  Parameters are set on the assumption that
	 * it'll probably not contain much data.
1066 1067 1068
	 */
	rulescxt = AllocSetContextCreate(CacheMemoryContext,
									 RelationGetRelationName(relation),
1069 1070 1071
									 ALLOCSET_SMALL_MINSIZE,
									 ALLOCSET_SMALL_INITSIZE,
									 ALLOCSET_SMALL_MAXSIZE);
1072 1073
	relation->rd_rulescxt = rulescxt;

1074
	/*
B
Bruce Momjian 已提交
1075 1076
	 * allocate an array to hold the rewrite rules (the array is extended if
	 * necessary)
1077 1078
	 */
	maxlocks = 4;
1079 1080
	rules = (RewriteRule **)
		MemoryContextAlloc(rulescxt, sizeof(RewriteRule *) * maxlocks);
1081 1082
	numlocks = 0;

1083
	/*
B
Bruce Momjian 已提交
1084
	 * open pg_rewrite and begin a scan
1085
	 *
1086 1087
	 * Note: since we scan the rules using RewriteRelRulenameIndexId, we will
	 * be reading the rules in name order, except possibly during
B
Bruce Momjian 已提交
1088 1089
	 * emergency-recovery operations (ie, IgnoreSystemIndexes). This in turn
	 * ensures that rules will be fired in name order.
1090
	 */
1091
	rewrite_desc = heap_open(RewriteRelationId, AccessShareLock);
1092 1093
	rewrite_tupdesc = RelationGetDescr(rewrite_desc);

1094 1095 1096 1097 1098 1099 1100 1101 1102 1103
	pcqCtx = caql_beginscan(
			caql_syscache(
					caql_indexOK(caql_addrel(cqclr(&cqc), rewrite_desc), 
								 true),
					false),
			cql("SELECT * FROM pg_rewrite "
				" WHERE ev_class = :1 ",
				ObjectIdGetDatum(RelationGetRelid(relation))));

	while (HeapTupleIsValid(rewrite_tuple = caql_getnext(pcqCtx)))
1104
	{
1105
		Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
1106
		bool		isnull;
1107 1108
		Datum		rule_datum;
		char	   *rule_str;
1109
		RewriteRule *rule;
1110

1111 1112
		rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
												  sizeof(RewriteRule));
1113

1114
		rule->ruleId = HeapTupleGetOid(rewrite_tuple);
1115

1116 1117
		rule->event = rewrite_form->ev_type - '0';
		rule->attrno = rewrite_form->ev_attr;
1118
		rule->enabled = rewrite_form->ev_enabled;
1119 1120
		rule->isInstead = rewrite_form->is_instead;

1121
		/*
B
Bruce Momjian 已提交
1122 1123 1124 1125
		 * Must use heap_getattr to fetch ev_action and ev_qual.  Also, the
		 * rule strings are often large enough to be toasted.  To avoid
		 * leaking memory in the caller's context, do the detoasting here so
		 * we can free the detoasted version.
1126 1127
		 */
		rule_datum = heap_getattr(rewrite_tuple,
1128
								  Anum_pg_rewrite_ev_action,
1129
								  rewrite_tupdesc,
B
Bruce Momjian 已提交
1130
								  &isnull);
B
Bruce Momjian 已提交
1131
		Assert(!isnull);
1132
		rule_str = TextDatumGetCString(rule_datum);
1133
		oldcxt = MemoryContextSwitchTo(rulescxt);
1134
		rule->actions = (List *) stringToNode(rule_str);
1135
		MemoryContextSwitchTo(oldcxt);
1136
		pfree(rule_str);
1137

1138 1139 1140 1141
		rule_datum = heap_getattr(rewrite_tuple,
								  Anum_pg_rewrite_ev_qual,
								  rewrite_tupdesc,
								  &isnull);
B
Bruce Momjian 已提交
1142
		Assert(!isnull);
1143
		rule_str = TextDatumGetCString(rule_datum);
1144
		oldcxt = MemoryContextSwitchTo(rulescxt);
1145
		rule->qual = (Node *) stringToNode(rule_str);
1146
		MemoryContextSwitchTo(oldcxt);
1147
		pfree(rule_str);
1148

1149 1150
		/*
		 * We want the rule's table references to be checked as though by the
B
Bruce Momjian 已提交
1151
		 * table owner, not the user referencing the rule.	Therefore, scan
1152
		 * through the rule's actions and set the checkAsUser field on all
B
Bruce Momjian 已提交
1153
		 * rtable entries.	We have to look at the qual as well, in case it
1154 1155
		 * contains sublinks.
		 *
B
Bruce Momjian 已提交
1156 1157 1158 1159 1160
		 * The reason for doing this when the rule is loaded, rather than when
		 * it is stored, is that otherwise ALTER TABLE OWNER would have to
		 * grovel through stored rules to update checkAsUser fields. Scanning
		 * the rule tree during load is relatively cheap (compared to
		 * constructing it in the first place), so we do it here.
1161 1162 1163 1164
		 */
		setRuleCheckAsUser((Node *) rule->actions, relation->rd_rel->relowner);
		setRuleCheckAsUser(rule->qual, relation->rd_rel->relowner);

1165
		if (numlocks >= maxlocks)
1166 1167
		{
			maxlocks *= 2;
1168 1169
			rules = (RewriteRule **)
				repalloc(rules, sizeof(RewriteRule *) * maxlocks);
1170
		}
1171
		rules[numlocks++] = rule;
1172
	}
1173

1174
	/*
B
Bruce Momjian 已提交
1175
	 * end the scan and close the attribute relation
1176
	 */
1177
	caql_endscan(pcqCtx);
1178
	heap_close(rewrite_desc, AccessShareLock);
1179

1180
	/*
B
Bruce Momjian 已提交
1181
	 * form a RuleLock and insert into relation
1182
	 */
1183
	rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
1184 1185 1186 1187
	rulelock->numLocks = numlocks;
	rulelock->rules = rules;

	relation->rd_rules = rulelock;
1188 1189
}

1190
/*
1191 1192 1193 1194 1195 1196 1197 1198 1199
 *		equalRuleLocks
 *
 *		Determine whether two RuleLocks are equivalent
 *
 *		Probably this should be in the rules code someplace...
 */
static bool
equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
{
1200
	int			i;
1201

1202
	/*
B
Bruce Momjian 已提交
1203
	 * As of 7.3 we assume the rule ordering is repeatable, because
B
Bruce Momjian 已提交
1204 1205
	 * RelationBuildRuleLock should read 'em in a consistent order.  So just
	 * compare corresponding slots.
1206
	 */
1207 1208 1209 1210 1211 1212 1213 1214 1215
	if (rlock1 != NULL)
	{
		if (rlock2 == NULL)
			return false;
		if (rlock1->numLocks != rlock2->numLocks)
			return false;
		for (i = 0; i < rlock1->numLocks; i++)
		{
			RewriteRule *rule1 = rlock1->rules[i];
1216 1217 1218
			RewriteRule *rule2 = rlock2->rules[i];

			if (rule1->ruleId != rule2->ruleId)
1219 1220 1221 1222 1223
				return false;
			if (rule1->event != rule2->event)
				return false;
			if (rule1->attrno != rule2->attrno)
				return false;
1224 1225
			if (rule1->enabled != rule2->enabled)
				return false;
1226 1227
			if (rule1->isInstead != rule2->isInstead)
				return false;
1228
			if (!equal(rule1->qual, rule2->qual))
1229
				return false;
1230
			if (!equal(rule1->actions, rule2->actions))
1231 1232 1233 1234 1235 1236
				return false;
		}
	}
	else if (rlock2 != NULL)
		return false;
	return true;
1237 1238 1239
}


1240
/*
1241 1242
 *		RelationBuildDesc
 *
1243 1244 1245 1246
 *		Build a relation descriptor.  The caller must hold at least
 *		AccessShareLock on the target relid.
 *
 *		The new descriptor is inserted into the hash table if insertIt is true.
1247 1248 1249 1250
 *
 *		Returns NULL if no pg_class row could be found for the given relid
 *		(suggesting we are trying to access a just-deleted relation).
 *		Any other error is reported via elog.
1251
 */
1252
static Relation
1253
RelationBuildDesc(Oid targetRelId, bool insertIt)
1254
{
1255 1256
	Relation	relation;
	Oid			relid;
1257
	Relation    pg_class_relation;
1258
	HeapTuple	pg_class_tuple;
1259
	Form_pg_class relp;
1260

1261
	/*
B
Bruce Momjian 已提交
1262
	 * find the tuple in pg_class corresponding to the given relation id
1263
	 */
1264
	pg_class_tuple = ScanPgRelation(targetRelId, true, &pg_class_relation);
1265

1266
	/*
B
Bruce Momjian 已提交
1267
	 * if no such tuple exists, return NULL
1268 1269 1270 1271
	 */
	if (!HeapTupleIsValid(pg_class_tuple))
		return NULL;

1272
	/*
B
Bruce Momjian 已提交
1273
	 * get information from the pg_class_tuple
1274
	 */
1275
	relid = HeapTupleGetOid(pg_class_tuple);
1276
	relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
1277
	heap_close(pg_class_relation, AccessShareLock);
1278

1279
	/*
B
Bruce Momjian 已提交
1280
	 * allocate storage for the relation descriptor, and copy pg_class_tuple
1281
	 * to relation->rd_rel and new fields into relation->rd_newfields.
1282
	 */
1283
	relation = AllocateRelationDesc(relp);
1284

1285
	/*
B
Bruce Momjian 已提交
1286
	 * initialize the relation's relation id (relation->rd_id)
1287
	 */
1288
	RelationGetRelid(relation) = relid;
1289

1290
	/*
B
Bruce Momjian 已提交
1291 1292 1293
	 * normal relations are not nailed into the cache; nor can a pre-existing
	 * relation be new.  It could be temp though.  (Actually, it could be new
	 * too, but it's okay to forget that fact if forced to flush the entry.)
1294
	 */
1295
	relation->rd_refcnt = 0;
1296
	relation->rd_isnailed = false;
1297
	relation->rd_createSubid = InvalidSubTransactionId;
1298
	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1299
	relation->rd_istemp = isTempOrToastNamespace(relation->rd_rel->relnamespace);
1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312
	relation->rd_issyscat = (strncmp(relation->rd_rel->relname.data, "pg_", 3) == 0);

    /*
     * CDB: On QEs, temp relations must use shared buffer cache so data
     * will be visible to all segmates.  On QD, sequence objects must
     * use shared buffer cache so data will be visible to sequence server.
     */
    if (relation->rd_istemp &&
        relation->rd_rel->relkind != RELKIND_SEQUENCE &&
        Gp_role != GP_ROLE_EXECUTE)
        relation->rd_isLocalBuf = true;
    else
        relation->rd_isLocalBuf = false;
1313

1314
	/*
B
Bruce Momjian 已提交
1315
	 * initialize the tuple descriptor (relation->rd_att).
1316
	 */
1317
	RelationBuildTupleDesc(relation);
1318

1319
	/*
B
Bruce Momjian 已提交
1320
	 * Fetch rules and triggers that affect this relation
1321
	 */
1322
	if (relation->rd_rel->relhasrules)
1323 1324
		RelationBuildRuleLock(relation);
	else
1325
	{
1326
		relation->rd_rules = NULL;
1327 1328
		relation->rd_rulescxt = NULL;
	}
1329

1330
	if (relation->rd_rel->reltriggers > 0)
1331 1332 1333 1334
		RelationBuildTriggers(relation);
	else
		relation->trigdesc = NULL;

1335
	/*
1336
	 * if it's an index, initialize index-related information
1337
	 */
1338
	if (OidIsValid(relation->rd_rel->relam))
1339
		RelationInitIndexAccessInfo(relation);
1340

1341 1342 1343
	/* extract reloptions if any */
	RelationParseRelOptions(relation, pg_class_tuple);

1344
	/*
B
Bruce Momjian 已提交
1345
	 * initialize the relation lock manager information
1346 1347 1348
	 */
	RelationInitLockInfo(relation);		/* see lmgr.c */

1349 1350 1351 1352
	/*
	 * initialize physical addressing information for the relation
	 */
	RelationInitPhysicalAddr(relation);
1353

1354
	/* make sure relation is marked as having no open file yet */
1355
	relation->rd_smgr = NULL;
1356

1357 1358 1359 1360 1361 1362 1363 1364 1365
    /*
     * initialize Greenplum Database partitioning info
     */
    if (relation->rd_rel->relkind == RELKIND_RELATION &&
        !IsSystemRelation(relation))
        relation->rd_cdbpolicy = GpPolicyFetch(CacheMemoryContext, targetRelId);

    relation->rd_cdbDefaultStatsWarningIssued = false;

B
Bruce Momjian 已提交
1366 1367 1368 1369 1370
	/*
	 * now we can free the memory allocated for pg_class_tuple
	 */
	heap_freetuple(pg_class_tuple);

1371
	/*
1372
	 * Insert newly created relation into relcache hash table, if requested.
1373
	 */
1374 1375
	if (insertIt)
		RelationCacheInsert(relation);
1376

1377 1378 1379
	/* It's fully valid */
	relation->rd_isvalid = true;

1380
	return relation;
1381 1382
}

1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399
/*
 * Initialize the physical addressing info (RelFileNode) for a relcache entry
 */
static void
RelationInitPhysicalAddr(Relation relation)
{
	if (relation->rd_rel->reltablespace)
		relation->rd_node.spcNode = relation->rd_rel->reltablespace;
	else
		relation->rd_node.spcNode = MyDatabaseTableSpace;
	if (relation->rd_rel->relisshared)
		relation->rd_node.dbNode = InvalidOid;
	else
		relation->rd_node.dbNode = MyDatabaseId;
	relation->rd_node.relNode = relation->rd_rel->relfilenode;
}

1400 1401 1402 1403 1404
/*
 * Initialize index-access-method support data for an index relation
 */
void
RelationInitIndexAccessInfo(Relation relation)
1405
{
1406 1407
	HeapTuple	tuple;
	Form_pg_am	aform;
1408
	Datum		indclassDatum;
1409
	Datum		indoptionDatum;
1410
	bool		isnull;
1411
	oidvector  *indclass;
B
Bruce Momjian 已提交
1412
	int2vector *indoption;
1413
	MemoryContext indexcxt;
1414
	MemoryContext oldcontext;
1415
	int			natts;
1416 1417
	uint16		amstrategies;
	uint16		amsupport;
1418 1419

	/*
1420
	 * Make a copy of the pg_index entry for the index.  Since pg_index
B
Bruce Momjian 已提交
1421 1422
	 * contains variable-length and possibly-null fields, we have to do this
	 * honestly rather than just treating it as a Form_pg_index struct.
1423 1424 1425 1426 1427
	 */
	tuple = SearchSysCache(INDEXRELID,
						   ObjectIdGetDatum(RelationGetRelid(relation)),
						   0, 0, 0);
	if (!HeapTupleIsValid(tuple))
1428
		elog(ERROR, "cache lookup failed for index %u",
1429
			 RelationGetRelid(relation));
1430 1431 1432 1433
	oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_indextuple = heap_copytuple(tuple);
	relation->rd_index = (Form_pg_index) GETSTRUCT(relation->rd_indextuple);
	MemoryContextSwitchTo(oldcontext);
1434 1435 1436 1437 1438 1439 1440 1441 1442
	ReleaseSysCache(tuple);

	/*
	 * Make a copy of the pg_am entry for the index's access method
	 */
	tuple = SearchSysCache(AMOID,
						   ObjectIdGetDatum(relation->rd_rel->relam),
						   0, 0, 0);
	if (!HeapTupleIsValid(tuple))
1443
		elog(ERROR, "cache lookup failed for access method %u",
1444 1445 1446 1447 1448
			 relation->rd_rel->relam);
	aform = (Form_pg_am) MemoryContextAlloc(CacheMemoryContext, sizeof *aform);
	memcpy(aform, GETSTRUCT(tuple), sizeof *aform);
	ReleaseSysCache(tuple);
	relation->rd_am = aform;
1449 1450

	natts = relation->rd_rel->relnatts;
1451
	if (natts != relation->rd_index->indnatts)
1452
		elog(ERROR, "relnatts disagrees with indnatts for index %u",
1453
			 RelationGetRelid(relation));
1454 1455
	amstrategies = aform->amstrategies;
	amsupport = aform->amsupport;
1456

1457
	/*
B
Bruce Momjian 已提交
1458 1459 1460
	 * Make the private context to hold index access info.	The reason we need
	 * a context, and not just a couple of pallocs, is so that we won't leak
	 * any subsidiary info attached to fmgr lookup records.
1461 1462 1463 1464 1465 1466
	 *
	 * Context parameters are set on the assumption that it'll probably not
	 * contain much data.
	 */
	indexcxt = AllocSetContextCreate(CacheMemoryContext,
									 RelationGetRelationName(relation),
1467 1468 1469
									 ALLOCSET_SMALL_MINSIZE,
									 ALLOCSET_SMALL_INITSIZE,
									 ALLOCSET_SMALL_MAXSIZE);
1470 1471 1472 1473 1474
	relation->rd_indexcxt = indexcxt;

	/*
	 * Allocate arrays to hold data
	 */
1475 1476 1477
	relation->rd_aminfo = (RelationAmInfo *)
		MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));

1478 1479 1480 1481 1482
	relation->rd_opfamily = (Oid *)
		MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
	relation->rd_opcintype = (Oid *)
		MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));

1483
	if (amstrategies > 0)
1484
		relation->rd_operator = (Oid *)
1485 1486
			MemoryContextAllocZero(indexcxt,
								   natts * amstrategies * sizeof(Oid));
1487
	else
1488
		relation->rd_operator = NULL;
1489

1490
	if (amsupport > 0)
1491
	{
1492
		int			nsupport = natts * amsupport;
1493

1494
		relation->rd_support = (RegProcedure *)
1495
			MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
1496
		relation->rd_supportinfo = (FmgrInfo *)
1497
			MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
1498 1499
	}
	else
1500
	{
1501 1502
		relation->rd_support = NULL;
		relation->rd_supportinfo = NULL;
1503
	}
1504

1505 1506 1507
	relation->rd_indoption = (int16 *)
		MemoryContextAllocZero(indexcxt, natts * sizeof(int16));

1508 1509
	/*
	 * indclass cannot be referenced directly through the C struct, because it
B
Bruce Momjian 已提交
1510 1511
	 * comes after the variable-width indkey field.  Must extract the datum
	 * the hard way...
1512 1513 1514 1515 1516 1517 1518
	 */
	indclassDatum = fastgetattr(relation->rd_indextuple,
								Anum_pg_index_indclass,
								GetPgIndexDescriptor(),
								&isnull);
	Assert(!isnull);
	indclass = (oidvector *) DatumGetPointer(indclassDatum);
1519

1520
	/*
B
Bruce Momjian 已提交
1521 1522 1523
	 * Fill the operator and support procedure OID arrays, as well as the info
	 * about opfamilies and opclass input types.  (aminfo and supportinfo are
	 * left as zeroes, and are filled on-the-fly when used)
1524
	 */
1525 1526 1527
	IndexSupportInitialize(indclass,
						   relation->rd_operator, relation->rd_support,
						   relation->rd_opfamily, relation->rd_opcintype,
1528
						   amstrategies, amsupport, natts);
1529

1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540
	/*
	 * Similarly extract indoption and copy it to the cache entry
	 */
	indoptionDatum = fastgetattr(relation->rd_indextuple,
								 Anum_pg_index_indoption,
								 GetPgIndexDescriptor(),
								 &isnull);
	Assert(!isnull);
	indoption = (int2vector *) DatumGetPointer(indoptionDatum);
	memcpy(relation->rd_indoption, indoption->values, natts * sizeof(int16));

1541 1542 1543 1544 1545
	/*
	 * expressions and predicate cache will be filled later
	 */
	relation->rd_indexprs = NIL;
	relation->rd_indpred = NIL;
1546
	relation->rd_amcache = NULL;
1547 1548
}

1549
/*
1550
 * IndexSupportInitialize
1551
 *		Initializes an index's cached opclass information,
1552
 *		given the index's pg_index.indclass entry.
1553
 *
1554 1555
 * Data is returned into *indexOperator, *indexSupport, *opFamily, and
 * *opcInType, which are arrays allocated by the caller.
1556 1557 1558 1559 1560 1561 1562
 *
 * The caller also passes maxStrategyNumber, maxSupportNumber, and
 * maxAttributeNumber, since these indicate the size of the arrays
 * it has allocated --- but in practice these numbers must always match
 * those obtainable from the system catalog entries for the index and
 * access method.
 */
1563
void
1564
IndexSupportInitialize(oidvector *indclass,
1565 1566
					   Oid *indexOperator,
					   RegProcedure *indexSupport,
1567 1568
					   Oid *opFamily,
					   Oid *opcInType,
1569 1570 1571 1572 1573 1574 1575 1576 1577 1578
					   StrategyNumber maxStrategyNumber,
					   StrategyNumber maxSupportNumber,
					   AttrNumber maxAttributeNumber)
{
	int			attIndex;

	for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
	{
		OpClassCacheEnt *opcentry;

1579
		if (!OidIsValid(indclass->values[attIndex]))
1580
			elog(ERROR, "bogus pg_index tuple");
1581 1582

		/* look up the info for this opclass, using a cache */
1583
		opcentry = LookupOpclassInfo(indclass->values[attIndex],
1584 1585 1586
									 maxStrategyNumber,
									 maxSupportNumber);

1587
		/* copy cached data into relcache entry */
1588 1589
		opFamily[attIndex] = opcentry->opcfamily;
		opcInType[attIndex] = opcentry->opcintype;
1590
		if (maxStrategyNumber > 0)
1591 1592 1593
			memcpy(&indexOperator[attIndex * maxStrategyNumber],
				   opcentry->operatorOids,
				   maxStrategyNumber * sizeof(Oid));
1594
		if (maxSupportNumber > 0)
1595 1596 1597
			memcpy(&indexSupport[attIndex * maxSupportNumber],
				   opcentry->supportProcs,
				   maxSupportNumber * sizeof(RegProcedure));
1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612
	}
}

/*
 * LookupOpclassInfo
 *
 * This routine maintains a per-opclass cache of the information needed
 * by IndexSupportInitialize().  This is more efficient than relying on
 * the catalog cache, because we can load all the info about a particular
 * opclass in a single indexscan of pg_amproc or pg_amop.
 *
 * The information from pg_am about expected range of strategy and support
 * numbers is passed in, rather than being looked up, mainly because the
 * caller will have it already.
 *
1613 1614 1615
 * Note there is no provision for flushing the cache.  This is OK at the
 * moment because there is no way to ALTER any interesting properties of an
 * existing opclass --- all you can do is drop it, which will result in
1616
 * a useless but harmless dead entry in the cache.  To support altering
1617 1618 1619
 * opclass membership (not the same as opfamily membership!), we'd need to
 * be able to flush this cache as well as the contents of relcache entries
 * for indexes.
1620 1621 1622 1623 1624 1625 1626 1627
 */
static OpClassCacheEnt *
LookupOpclassInfo(Oid operatorClassOid,
				  StrategyNumber numStrats,
				  StrategyNumber numSupport)
{
	OpClassCacheEnt *opcentry;
	bool		found;
1628 1629
	Relation	rel;
	SysScanDesc scan;
1630
	ScanKeyData skey[3];
1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644
	HeapTuple	htup;
	bool		indexOK;

	if (OpClassCache == NULL)
	{
		/* First time through: initialize the opclass cache */
		HASHCTL		ctl;

		if (!CacheMemoryContext)
			CreateCacheMemoryContext();

		MemSet(&ctl, 0, sizeof(ctl));
		ctl.keysize = sizeof(Oid);
		ctl.entrysize = sizeof(OpClassCacheEnt);
1645
		ctl.hash = oid_hash;
1646 1647 1648 1649 1650 1651 1652 1653
		OpClassCache = hash_create("Operator class cache", 64,
								   &ctl, HASH_ELEM | HASH_FUNCTION);
	}

	opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
											   (void *) &operatorClassOid,
											   HASH_ENTER, &found);

1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675
	if (!found)
	{
		/* Need to allocate memory for new entry */
		opcentry->valid = false;	/* until known OK */
		opcentry->numStrats = numStrats;
		opcentry->numSupport = numSupport;

		if (numStrats > 0)
			opcentry->operatorOids = (Oid *)
				MemoryContextAllocZero(CacheMemoryContext,
									   numStrats * sizeof(Oid));
		else
			opcentry->operatorOids = NULL;

		if (numSupport > 0)
			opcentry->supportProcs = (RegProcedure *)
				MemoryContextAllocZero(CacheMemoryContext,
									   numSupport * sizeof(RegProcedure));
		else
			opcentry->supportProcs = NULL;
	}
	else
1676 1677 1678 1679 1680
	{
		Assert(numStrats == opcentry->numStrats);
		Assert(numSupport == opcentry->numSupport);
	}

1681 1682 1683 1684 1685 1686 1687 1688 1689 1690
	/*
	 * When testing for cache-flush hazards, we intentionally disable the
	 * operator class cache and force reloading of the info on each call.
	 * This is helpful because we want to test the case where a cache flush
	 * occurs while we are loading the info, and it's very hard to provoke
	 * that if this happens only once per opclass per backend.
	 */
#if defined(CLOBBER_CACHE_ALWAYS)
	opcentry->valid = false;
#endif
1691

1692 1693
	if (opcentry->valid)
		return opcentry;
1694 1695

	/*
1696 1697
	 * Need to fill in new entry.
	 *
B
Bruce Momjian 已提交
1698 1699 1700
	 * To avoid infinite recursion during startup, force heap scans if we're
	 * looking up info for the opclasses used by the indexes we would like to
	 * reference here.
1701 1702 1703 1704 1705
	 */
	indexOK = criticalRelcachesBuilt ||
		(operatorClassOid != OID_BTREE_OPS_OID &&
		 operatorClassOid != INT2_BTREE_OPS_OID);

1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733
	/*
	 * We have to fetch the pg_opclass row to determine its opfamily and
	 * opcintype, which are needed to look up the operators and functions.
	 * It'd be convenient to use the syscache here, but that probably doesn't
	 * work while bootstrapping.
	 */
	ScanKeyInit(&skey[0],
				ObjectIdAttributeNumber,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(operatorClassOid));
	rel = heap_open(OperatorClassRelationId, AccessShareLock);
	scan = systable_beginscan(rel, OpclassOidIndexId, indexOK,
							  SnapshotNow, 1, skey);

	if (HeapTupleIsValid(htup = systable_getnext(scan)))
	{
		Form_pg_opclass opclassform = (Form_pg_opclass) GETSTRUCT(htup);

		opcentry->opcfamily = opclassform->opcfamily;
		opcentry->opcintype = opclassform->opcintype;
	}
	else
		elog(ERROR, "could not find tuple for opclass %u", operatorClassOid);

	systable_endscan(scan);
	heap_close(rel, AccessShareLock);


1734
	/*
B
Bruce Momjian 已提交
1735
	 * Scan pg_amop to obtain operators for the opclass.  We only fetch the
1736
	 * default ones (those with lefttype = righttype = opcintype).
1737 1738 1739
	 */
	if (numStrats > 0)
	{
1740
		ScanKeyInit(&skey[0],
1741
					Anum_pg_amop_amopfamily,
1742
					BTEqualStrategyNumber, F_OIDEQ,
1743
					ObjectIdGetDatum(opcentry->opcfamily));
1744
		ScanKeyInit(&skey[1],
1745 1746 1747 1748 1749
					Anum_pg_amop_amoplefttype,
					BTEqualStrategyNumber, F_OIDEQ,
					ObjectIdGetDatum(opcentry->opcintype));
		ScanKeyInit(&skey[2],
					Anum_pg_amop_amoprighttype,
1750
					BTEqualStrategyNumber, F_OIDEQ,
1751
					ObjectIdGetDatum(opcentry->opcintype));
1752 1753
		rel = heap_open(AccessMethodOperatorRelationId, AccessShareLock);
		scan = systable_beginscan(rel, AccessMethodStrategyIndexId, indexOK,
1754
								  SnapshotNow, 3, skey);
1755 1756

		while (HeapTupleIsValid(htup = systable_getnext(scan)))
1757 1758 1759 1760 1761
		{
			Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(htup);

			if (amopform->amopstrategy <= 0 ||
				(StrategyNumber) amopform->amopstrategy > numStrats)
1762
				elog(ERROR, "invalid amopstrategy number %d for opclass %u",
1763 1764 1765 1766 1767
					 amopform->amopstrategy, operatorClassOid);
			opcentry->operatorOids[amopform->amopstrategy - 1] =
				amopform->amopopr;
		}

1768 1769
		systable_endscan(scan);
		heap_close(rel, AccessShareLock);
1770 1771 1772
	}

	/*
B
Bruce Momjian 已提交
1773
	 * Scan pg_amproc to obtain support procs for the opclass.	We only fetch
1774
	 * the default ones (those with lefttype = righttype = opcintype).
1775 1776 1777
	 */
	if (numSupport > 0)
	{
1778
		ScanKeyInit(&skey[0],
1779
					Anum_pg_amproc_amprocfamily,
1780
					BTEqualStrategyNumber, F_OIDEQ,
1781
					ObjectIdGetDatum(opcentry->opcfamily));
1782
		ScanKeyInit(&skey[1],
1783
					Anum_pg_amproc_amproclefttype,
1784
					BTEqualStrategyNumber, F_OIDEQ,
1785 1786 1787 1788 1789
					ObjectIdGetDatum(opcentry->opcintype));
		ScanKeyInit(&skey[2],
					Anum_pg_amproc_amprocrighttype,
					BTEqualStrategyNumber, F_OIDEQ,
					ObjectIdGetDatum(opcentry->opcintype));
1790 1791
		rel = heap_open(AccessMethodProcedureRelationId, AccessShareLock);
		scan = systable_beginscan(rel, AccessMethodProcedureIndexId, indexOK,
1792
								  SnapshotNow, 3, skey);
1793 1794

		while (HeapTupleIsValid(htup = systable_getnext(scan)))
1795 1796 1797 1798 1799
		{
			Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);

			if (amprocform->amprocnum <= 0 ||
				(StrategyNumber) amprocform->amprocnum > numSupport)
1800
				elog(ERROR, "invalid amproc number %d for opclass %u",
1801 1802 1803 1804 1805 1806
					 amprocform->amprocnum, operatorClassOid);

			opcentry->supportProcs[amprocform->amprocnum - 1] =
				amprocform->amproc;
		}

1807 1808
		systable_endscan(scan);
		heap_close(rel, AccessShareLock);
1809 1810 1811 1812 1813 1814 1815 1816 1817 1818
	}

	opcentry->valid = true;
	return opcentry;
}


/*
 *		formrdesc
 *
1819 1820
 *		This is a special cut-down version of RelationBuildDesc(),
 *		used while initializing the relcache.
1821
 *		The relation descriptor is built just from the supplied parameters,
1822 1823
 *		without actually looking at any system table entries.  We cheat
 *		quite a lot since we only need to work for a few basic system
1824 1825
 *		catalogs.
 *
1826 1827 1828
 * formrdesc is currently used for: pg_database, pg_authid, pg_auth_members,
 * pg_class, pg_attribute, pg_proc, and pg_type
 * (see RelationCacheInitializePhase2/3).
1829
 *
1830 1831
 * Note that these catalogs can't have constraints (except attnotnull),
 * default values, rules, or triggers, since we don't cope with any of that.
1832 1833 1834
 * (Well, actually, this only matters for properties that need to be valid
 * during bootstrap or before RelationCacheInitializePhase3 runs, and none of
 * these properties matter then...)
1835
 *
1836
 * NOTE: we assume we are already switched into CacheMemoryContext.
1837 1838
 */
static void
1839
formrdesc(const char *relationName, Oid relationReltype,
1840 1841
		  bool isshared, bool hasoids,
		  int natts, const FormData_pg_attribute *attrs)
1842
{
1843
	Relation	relation;
1844
	int			i;
1845
	bool		has_not_null;
1846

1847
	/*
1848
	 * allocate new relation desc, clear all fields of reldesc
1849
	 */
1850
	relation = (Relation) palloc0(sizeof(RelationData));
1851 1852 1853
	relation->rd_targblock = InvalidBlockNumber;

	/* make sure relation is marked as having no open file yet */
1854
	relation->rd_smgr = NULL;
1855

1856
	/*
1857
	 * initialize reference count: 1 because it is nailed in cache
1858
	 */
1859
	relation->rd_refcnt = 1;
1860

1861
	/*
B
Bruce Momjian 已提交
1862 1863
	 * all entries built with this routine are nailed-in-cache; none are for
	 * new or temp relations.
1864
	 */
1865
	relation->rd_isnailed = true;
1866
	relation->rd_createSubid = InvalidSubTransactionId;
1867
	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1868
	relation->rd_istemp = false;
1869 1870
	relation->rd_issyscat = (strncmp(relationName, "pg_", 3) == 0);	/* GP */
    relation->rd_isLocalBuf = false;    /*CDB*/
1871

1872
	/*
B
Bruce Momjian 已提交
1873
	 * initialize relation tuple form
1874
	 *
1875 1876
	 * The data we insert here is pretty incomplete/bogus, but it'll serve to
	 * get us launched.  RelationCacheInitializePhase2() will read the real
1877 1878 1879
	 * data from pg_class and replace what we've done here.  Note in particular
	 * that relowner is left as zero; this cues RelationCacheInitializePhase2
	 * that the real data isn't there yet.
1880
	 */
1881
	relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
1882

1883 1884
	namestrcpy(&relation->rd_rel->relname, relationName);
	relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
1885
	relation->rd_rel->reltype = relationReltype;
1886 1887

	/*
B
Bruce Momjian 已提交
1888
	 * It's important to distinguish between shared and non-shared relations,
1889
	 * even at bootstrap time, to make sure we know where they are stored.
1890
	 */
1891 1892 1893
	relation->rd_rel->relisshared = isshared;
	if (isshared)
		relation->rd_rel->reltablespace = GLOBALTABLESPACE_OID;
1894

1895 1896
	relation->rd_rel->relpages = 0;
	relation->rd_rel->reltuples = 0;
1897
	relation->rd_rel->relkind = RELKIND_RELATION;
1898
	relation->rd_rel->relstorage = RELSTORAGE_HEAP;
1899
	relation->rd_rel->relhasoids = hasoids;
1900
	relation->rd_rel->relnatts = (int16) natts;
1901

1902 1903 1904 1905 1906 1907
	/*
	 * Physical file-system information.
	 */
	relation->rd_segfile0_relationnodeinfo.isPresent = false;
	relation->rd_segfile0_relationnodeinfo.tidAllowedToBeZero = false;
	
1908
	/*
B
Bruce Momjian 已提交
1909
	 * initialize attribute tuple form
1910
	 *
B
Bruce Momjian 已提交
1911
	 * Unlike the case with the relation tuple, this data had better be right
B
Bruce Momjian 已提交
1912 1913
	 * because it will never be replaced.  The input values must be correctly
	 * defined by macros in src/include/catalog/ headers.
1914
	 */
1915
	relation->rd_att = CreateTemplateTupleDesc(natts, hasoids);
1916 1917
	relation->rd_att->tdrefcount = 1;	/* mark as refcounted */

1918 1919
	relation->rd_att->tdtypeid = relationReltype;
	relation->rd_att->tdtypmod = -1;	/* unnecessary, but... */
1920

1921
	/*
B
Bruce Momjian 已提交
1922
	 * initialize tuple desc info
1923
	 */
1924
	has_not_null = false;
1925 1926
	for (i = 0; i < natts; i++)
	{
1927
		memcpy(relation->rd_att->attrs[i],
1928 1929 1930
			   &attrs[i],
			   ATTRIBUTE_FIXED_PART_SIZE);
		has_not_null |= attrs[i].attnotnull;
1931 1932
		/* make sure attcacheoff is valid */
		relation->rd_att->attrs[i]->attcacheoff = -1;
1933 1934
	}

1935 1936 1937
	/* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
	relation->rd_att->attrs[0]->attcacheoff = 0;

1938 1939 1940 1941 1942 1943 1944 1945 1946
	/* mark not-null status */
	if (has_not_null)
	{
		TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

		constr->has_not_null = true;
		relation->rd_att->constr = constr;
	}

1947
	/*
1948
	 * initialize relation id from info in att array (my, this is ugly)
1949
	 */
1950
	RelationGetRelid(relation) = relation->rd_att->attrs[0]->attrelid;
1951
	relation->rd_rel->relfilenode = RelationGetRelid(relation);
1952

1953
	/*
1954
	 * initialize the relation lock manager information
1955 1956 1957
	 */
	RelationInitLockInfo(relation);		/* see lmgr.c */

1958 1959 1960 1961
	/*
	 * initialize physical addressing information for the relation
	 */
	RelationInitPhysicalAddr(relation);
1962

1963
	/*
B
Bruce Momjian 已提交
1964
	 * initialize the rel-has-index flag, using hardwired knowledge
1965
	 */
1966 1967 1968 1969 1970 1971
	if (IsBootstrapProcessingMode())
	{
		/* In bootstrap mode, we have no indexes */
		relation->rd_rel->relhasindex = false;
	}
	else
1972
	{
1973 1974
		/* Otherwise, all the rels formrdesc is used for have indexes */
		relation->rd_rel->relhasindex = true;
1975 1976
	}

1977
	/*
B
Bruce Momjian 已提交
1978
	 * add new reldesc to relcache
1979
	 */
1980
	RelationCacheInsert(relation);
1981 1982 1983

	/* It's fully valid */
	relation->rd_isvalid = true;
1984 1985 1986 1987
}


/* ----------------------------------------------------------------
1988
 *				 Relation Descriptor Lookup Interface
1989 1990 1991
 * ----------------------------------------------------------------
 */

1992
/*
1993
 *		RelationIdGetRelation
1994
 *
1995
 *		Lookup a reldesc by OID; make one if not already in cache.
1996
 *
1997 1998 1999
 *		Returns NULL if no pg_class row could be found for the given relid
 *		(suggesting we are trying to access a just-deleted relation).
 *		Any other error is reported via elog.
2000
 *
2001 2002 2003 2004
 *		NB: caller should already have at least AccessShareLock on the
 *		relation ID, else there are nasty race conditions.
 *
 *		NB: relation ref count is incremented, or set to 1 if new entry.
2005 2006
 *		Caller should eventually decrement count.  (Usually,
 *		that happens by calling RelationClose().)
2007 2008
 */
Relation
2009
RelationIdGetRelation(Oid relationId)
2010
{
2011
	Relation	rd;
2012

2013 2014 2015
	/*
	 * first try to find reldesc in the cache
	 */
2016 2017 2018
	RelationIdCacheLookup(relationId, rd);

	if (RelationIsValid(rd))
2019
	{
2020
		RelationIncrementReferenceCount(rd);
2021
		/* revalidate cache entry if necessary */
2022
		if (!rd->rd_isvalid)
2023 2024 2025 2026 2027 2028 2029
		{
			/*
			 * Indexes only have a limited number of possible schema changes,
			 * and we don't want to use the full-blown procedure because it's
			 * a headache for indexes that reload itself depends on.
			 */
			if (rd->rd_rel->relkind == RELKIND_INDEX)
2030
				RelationReloadIndexInfo(rd);
2031 2032
			else
				RelationClearRelation(rd, true);
2033
		}
2034
		return rd;
2035
	}
2036

2037
	/*
B
Bruce Momjian 已提交
2038 2039
	 * no reldesc in the cache, so have RelationBuildDesc() build one and add
	 * it.
2040
	 */
2041
	rd = RelationBuildDesc(relationId, true);
2042 2043
	if (RelationIsValid(rd))
		RelationIncrementReferenceCount(rd);
2044

2045 2046 2047 2048
	return rd;
}

/* ----------------------------------------------------------------
2049
 *				cache invalidation support routines
2050 2051 2052
 * ----------------------------------------------------------------
 */

2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076
/*
 * RelationIncrementReferenceCount
 *		Increments relation reference count.
 *
 * Note: bootstrap mode has its own weird ideas about relation refcount
 * behavior; we ought to fix it someday, but for now, just disable
 * reference count ownership tracking in bootstrap mode.
 */
void
RelationIncrementReferenceCount(Relation rel)
{
	ResourceOwnerEnlargeRelationRefs(CurrentResourceOwner);
	rel->rd_refcnt += 1;
	if (!IsBootstrapProcessingMode())
		ResourceOwnerRememberRelationRef(CurrentResourceOwner, rel);
}

/*
 * RelationDecrementReferenceCount
 *		Decrements relation reference count.
 */
void
RelationDecrementReferenceCount(Relation rel)
{
2077 2078 2079 2080 2081 2082 2083 2084 2085 2086
	if (rel->rd_refcnt <= 0)
	{
		elog(ERROR,
			 "Relation decrement reference count found relation %u/%u/%u with bad count (reference count %d)",
			 rel->rd_node.spcNode,
			 rel->rd_node.dbNode,
			 rel->rd_node.relNode,
			 rel->rd_refcnt);
	}
	
2087 2088 2089 2090 2091
	rel->rd_refcnt -= 1;
	if (!IsBootstrapProcessingMode())
		ResourceOwnerForgetRelationRef(CurrentResourceOwner, rel);
}

2092
/*
2093 2094
 * RelationClose - close an open relation
 *
2095 2096 2097 2098 2099 2100 2101
 *	Actually, we just decrement the refcount.
 *
 *	NOTE: if compiled with -DRELCACHE_FORCE_RELEASE then relcache entries
 *	will be freed as soon as their refcount goes to zero.  In combination
 *	with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
 *	to catch references to already-released relcache entries.  It slows
 *	things down quite a bit, however.
2102 2103 2104 2105
 */
void
RelationClose(Relation relation)
{
2106 2107
	/* Note: no locking manipulations needed */
	RelationDecrementReferenceCount(relation);
2108 2109

#ifdef RELCACHE_FORCE_RELEASE
2110
	if (RelationHasReferenceCountZero(relation) &&
2111 2112
		relation->rd_createSubid == InvalidSubTransactionId &&
		relation->rd_newRelfilenodeSubid == InvalidSubTransactionId)
2113 2114
		RelationClearRelation(relation, false);
#endif
2115 2116
}

2117
/*
2118
 * RelationReloadIndexInfo - reload minimal information for an open index
2119
 *
2120 2121 2122 2123 2124 2125 2126
 *	This function is used only for indexes.  A relcache inval on an index
 *	can mean that its pg_class or pg_index row changed.  There are only
 *	very limited changes that are allowed to an existing index's schema,
 *	so we can update the relcache entry without a complete rebuild; which
 *	is fortunate because we can't rebuild an index entry that is "nailed"
 *	and/or in active use.  We support full replacement of the pg_class row,
 *	as well as updates of a few simple fields of the pg_index row.
2127
 *
2128
 *	We can't necessarily reread the catalog rows right away; we might be
2129 2130
 *	in a failed transaction when we receive the SI notification.  If so,
 *	RelationClearRelation just marks the entry as invalid by setting
2131
 *	rd_isvalid to false.  This routine is called to fix the entry when it
2132
 *	is next needed.
2133 2134 2135 2136
 *
 *	We assume that at the time we are called, we have at least AccessShareLock
 *	on the target index.  (Note: in the calls from RelationClearRelation,
 *	this is legitimate because we know the rel has positive refcount.)
2137 2138 2139 2140 2141 2142
 *
 *	If the target index is an index on pg_class or pg_index, we'd better have
 *	previously gotten at least AccessShareLock on its underlying catalog,
 *	else we are at risk of deadlock against someone trying to exclusive-lock
 *	the heap and index in that order.  This is ensured in current usage by
 *	only applying this to indexes being opened or having positive refcount.
H
Hiroshi Inoue 已提交
2143 2144
 */
static void
2145
RelationReloadIndexInfo(Relation relation)
H
Hiroshi Inoue 已提交
2146
{
2147
	bool		indexOK;
H
Hiroshi Inoue 已提交
2148
	HeapTuple	pg_class_tuple;
B
Bruce Momjian 已提交
2149
	Form_pg_class relp;
H
Hiroshi Inoue 已提交
2150

2151 2152 2153 2154 2155
	/* Should be called only for invalidated indexes */
	Assert(relation->rd_rel->relkind == RELKIND_INDEX &&
		   !relation->rd_isvalid);
	/* Should be closed at smgr level */
	Assert(relation->rd_smgr == NULL);
B
Bruce Momjian 已提交
2156

2157 2158 2159 2160 2161 2162
	/* Make sure targblock is reset in case rel was truncated */
	relation->rd_targblock = InvalidBlockNumber;
	/* Must free any AM cached data, too */
	if (relation->rd_amcache)
		pfree(relation->rd_amcache);
	relation->rd_amcache = NULL;
2163

2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176
	/*
	 * If it's a shared index, we might be called before backend startup has
	 * finished selecting a database, in which case we have no way to read
	 * pg_class yet.  However, a shared index can never have any significant
	 * schema updates, so it's okay to ignore the invalidation signal.  Just
	 * mark it valid and return without doing anything more.
	 */
	if (relation->rd_rel->relisshared && !criticalRelcachesBuilt)
	{
		relation->rd_isvalid = true;
		return;
	}

2177
	/*
2178 2179
	 * Read the pg_class row
	 *
2180 2181
	 * Don't try to use an indexscan of pg_class_oid_index to reload the info
	 * for pg_class_oid_index ...
2182
	 */
2183
	indexOK = (RelationGetRelid(relation) != ClassOidIndexId);
2184
	pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK, NULL);
H
Hiroshi Inoue 已提交
2185
	if (!HeapTupleIsValid(pg_class_tuple))
2186
		elog(ERROR, "could not find pg_class tuple for index %u",
2187
			 RelationGetRelid(relation));
H
Hiroshi Inoue 已提交
2188
	relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
2189
	memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
2190
	/* Reload reloptions in case they changed */
B
Bruce Momjian 已提交
2191 2192
	if (relation->rd_options)
		pfree(relation->rd_options);
2193 2194
	RelationParseRelOptions(relation, pg_class_tuple);
	/* done with pg_class tuple */
H
Hiroshi Inoue 已提交
2195
	heap_freetuple(pg_class_tuple);
2196 2197
	/* We must recalculate physical address in case it changed */
	RelationInitPhysicalAddr(relation);
2198 2199 2200 2201

	/* Forget gp_relation_node information -- it may have changed. */
	MemSet(&relation->rd_segfile0_relationnodeinfo, 0, sizeof(RelationNodeInfo));

2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218
	/*
	 * For a non-system index, there are fields of the pg_index row that are
	 * allowed to change, so re-read that row and update the relcache entry.
	 * Most of the info derived from pg_index (such as support function lookup
	 * info) cannot change, and indeed the whole point of this routine is to
	 * update the relcache entry without clobbering that data; so wholesale
	 * replacement is not appropriate.
	 */
	if (!IsSystemRelation(relation))
	{
		HeapTuple	tuple;
		Form_pg_index index;

		tuple = SearchSysCache(INDEXRELID,
							   ObjectIdGetDatum(RelationGetRelid(relation)),
							   0, 0, 0);
		if (!HeapTupleIsValid(tuple))
B
Bruce Momjian 已提交
2219 2220
			elog(ERROR, "cache lookup failed for index %u",
				 RelationGetRelid(relation));
2221 2222
		index = (Form_pg_index) GETSTRUCT(tuple);

2223 2224 2225 2226 2227 2228 2229 2230 2231
		/*
		 * Basically, let's just copy all the bool fields.  There are one or
		 * two of these that can't actually change in the current code, but
		 * it's not worth it to track exactly which ones they are.  None of
		 * the array fields are allowed to change, though.
		 */
		relation->rd_index->indisunique = index->indisunique;
		relation->rd_index->indisprimary = index->indisprimary;
		relation->rd_index->indisclustered = index->indisclustered;
2232
		relation->rd_index->indisvalid = index->indisvalid;
2233 2234
		relation->rd_index->indcheckxmin = index->indcheckxmin;
		relation->rd_index->indisready = index->indisready;
2235 2236

		/* Copy xmin too, as that is needed to make sense of indcheckxmin */
2237 2238
		HeapTupleHeaderSetXmin(relation->rd_indextuple->t_data,
							   HeapTupleHeaderGetXmin(tuple->t_data));
2239 2240 2241

		ReleaseSysCache(tuple);
	}
2242

2243
	/* Okay, now it's valid again */
2244
	relation->rd_isvalid = true;
H
Hiroshi Inoue 已提交
2245
}
2246

2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275
/*
 * RelationDestroyRelation
 *
 *	Physically delete a relation cache entry and all subsidiary data.
 *	Caller must already have unhooked the entry from the hash table.
 */
static void
RelationDestroyRelation(Relation relation)
{
	Assert(RelationHasReferenceCountZero(relation));

	/*
	 * Make sure smgr and lower levels close the relation's files, if they
	 * weren't closed already.  (This was probably done by caller, but let's
	 * just be real sure.)
	 */
	RelationCloseSmgr(relation);

	/*
	 * Free all the subsidiary data structures of the relcache entry,
	 * then the entry itself.
	 */
	if (relation->rd_rel)
		pfree(relation->rd_rel);
	/* can't use DecrTupleDescRefCount here */
	Assert(relation->rd_att->tdrefcount > 0);
	if (--relation->rd_att->tdrefcount == 0)
		FreeTupleDesc(relation->rd_att);
	list_free(relation->rd_indexlist);
2276
	bms_free(relation->rd_indexattr);
2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293
	FreeTriggerDesc(relation->trigdesc);
	if (relation->rd_options)
		pfree(relation->rd_options);
	if (relation->rd_indextuple)
		pfree(relation->rd_indextuple);
	if (relation->rd_am)
		pfree(relation->rd_am);
	if (relation->rd_indexcxt)
		MemoryContextDelete(relation->rd_indexcxt);
	if (relation->rd_rulescxt)
		MemoryContextDelete(relation->rd_rulescxt);
	if (relation->rd_cdbpolicy)
		pfree(relation->rd_cdbpolicy);

	pfree(relation);
}

2294
/*
2295
 * RelationClearRelation
2296
 *
2297 2298
 *	 Physically blow away a relation cache entry, or reset it and rebuild
 *	 it from scratch (that is, from catalog entries).  The latter path is
2299 2300
 *	 used when we are notified of a change to an open relation (one with
 *	 refcount > 0).
2301
 *
2302 2303 2304 2305 2306 2307
 *	 NB: when rebuilding, we'd better hold some lock on the relation,
 *	 else the catalog data we need to read could be changing under us.
 *	 Also, a rel to be rebuilt had better have refcnt > 0.  This is because
 *	 an sinval reset could happen while we're accessing the catalogs, and
 *	 the rel would get blown away underneath us by RelationCacheInvalidate
 *	 if it has zero refcnt.
2308
 *
2309 2310 2311
 *	 The "rebuild" parameter is redundant in current usage because it has
 *	 to match the relation's refcnt status, but we keep it as a crosscheck
 *	 that we're doing what the caller expects.
2312
 */
2313
static void
2314
RelationClearRelation(Relation relation, bool rebuild)
2315
{
2316 2317 2318 2319 2320 2321 2322
	/*
	 * As per notes above, a rel to be rebuilt MUST have refcnt > 0; while
	 * of course it would be a bad idea to blow away one with nonzero refcnt.
	 */
	Assert(rebuild ?
		   !RelationHasReferenceCountZero(relation) :
		   RelationHasReferenceCountZero(relation));
2323 2324

	/*
2325
	 * Make sure smgr and lower levels close the relation's files, if they
B
Bruce Momjian 已提交
2326 2327 2328 2329
	 * weren't closed already.  If the relation is not getting deleted, the
	 * next smgr access should reopen the files automatically.	This ensures
	 * that the low-level file access state is updated after, say, a vacuum
	 * truncation.
2330
	 */
2331
	RelationCloseSmgr(relation);
2332

2333
	/*
B
Bruce Momjian 已提交
2334 2335 2336
	 * Never, never ever blow away a nailed-in system relation, because we'd
	 * be unable to recover.  However, we must reset rd_targblock, in case we
	 * got called because of a relation cache flush that was triggered by
2337
	 * VACUUM.  Likewise reset the fsm and vm size info.
2338
	 *
2339 2340 2341
	 * If it's a nailed index, then we need to re-read the pg_class row to see
	 * if its relfilenode changed.	We can't necessarily do that here, because
	 * we might be in a failed transaction.  We assume it's okay to do it if
B
Bruce Momjian 已提交
2342 2343 2344
	 * there are open references to the relcache entry (cf notes for
	 * AtEOXact_RelationCache).  Otherwise just mark the entry as possibly
	 * invalid, and it'll be fixed when next opened.
2345 2346
	 */
	if (relation->rd_isnailed)
H
Hiroshi Inoue 已提交
2347
	{
2348
		relation->rd_targblock = InvalidBlockNumber;
2349 2350
		if (relation->rd_rel->relkind == RELKIND_INDEX)
		{
B
Bruce Momjian 已提交
2351
			relation->rd_isvalid = false;		/* needs to be revalidated */
2352
			if (relation->rd_refcnt > 1)
2353
				RelationReloadIndexInfo(relation);
2354
		}
2355
		return;
H
Hiroshi Inoue 已提交
2356
	}
2357

2358 2359 2360 2361
	/*
	 * Even non-system indexes should not be blown away if they are open and
	 * have valid index support information.  This avoids problems with active
	 * use of the index support information.  As with nailed indexes, we
B
Bruce Momjian 已提交
2362
	 * re-read the pg_class row to handle possible physical relocation of the
2363
	 * index, and we check for pg_index updates too.
2364 2365 2366 2367 2368
	 */
	if (relation->rd_rel->relkind == RELKIND_INDEX &&
		relation->rd_refcnt > 0 &&
		relation->rd_indexcxt != NULL)
	{
B
Bruce Momjian 已提交
2369
		relation->rd_isvalid = false;	/* needs to be revalidated */
2370
		RelationReloadIndexInfo(relation);
2371 2372 2373
		return;
	}

2374 2375
	/* Mark it invalid until we've finished rebuild */
	relation->rd_isvalid = false;
2376

2377
	/*
2378
	 * If we're really done with the relcache entry, blow it away. But if
B
Bruce Momjian 已提交
2379 2380 2381
	 * someone is still using it, reconstruct the whole deal without moving
	 * the physical RelationData record (so that the someone's pointer is
	 * still valid).
2382
	 */
2383
	if (!rebuild)
2384
	{
2385 2386 2387 2388 2389
		/* Remove it from the hash table */
		RelationCacheDelete(relation);

		/* And release storage */
		RelationDestroyRelation(relation);
2390 2391 2392
	}
	else
	{
2393
		/*
2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408
		 * Our strategy for rebuilding an open relcache entry is to build
		 * a new entry from scratch, swap its contents with the old entry,
		 * and finally delete the new entry (along with any infrastructure
		 * swapped over from the old entry).  This is to avoid trouble in case
		 * an error causes us to lose control partway through.  The old entry
		 * will still be marked !rd_isvalid, so we'll try to rebuild it again
		 * on next access.  Meanwhile it's not any less valid than it was
		 * before, so any code that might expect to continue accessing it
		 * isn't hurt by the rebuild failure.  (Consider for example a
		 * subtransaction that ALTERs a table and then gets cancelled partway
		 * through the cache entry rebuild.  The outer transaction should
		 * still see the not-modified cache entry as valid.)  The worst
		 * consequence of an error is leaking the necessarily-unreferenced
		 * new entry, and this shouldn't happen often enough for that to be
		 * a big problem.
2409
		 *
2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448
		 * When rebuilding an open relcache entry, we must preserve ref count
		 * and rd_createSubid/rd_newRelfilenodeSubid state.  Also attempt to
		 * preserve the pg_class entry (rd_rel), tupledesc, and rewrite-rule
		 * substructures in place, because various places assume that these
		 * structures won't move while they are working with an open relcache
		 * entry.  (Note: the refcount mechanism for tupledescs might someday
		 * allow us to remove this hack for the tupledesc.)
 		 *
 		 * Note that this process does not touch CurrentResourceOwner; which
 		 * is good because whatever ref counts the entry may have do not
 		 * necessarily belong to that resource owner.
 		 */
		Relation	newrel;
 		Oid			save_relid = RelationGetRelid(relation);
		bool		keep_tupdesc;
		bool		keep_rules;

		/* Build temporary entry, but don't link it into hashtable */
		newrel = RelationBuildDesc(save_relid, false);
		if (newrel == NULL)
 		{
 			/* Should only get here if relation was deleted */
			RelationCacheDelete(relation);
			RelationDestroyRelation(relation);
 			elog(ERROR, "relation %u deleted while still in use", save_relid);
 		}
 
		keep_tupdesc = equalTupleDescs(relation->rd_att, newrel->rd_att, true);
		keep_rules = equalRuleLocks(relation->rd_rules, newrel->rd_rules);

		/*
		 * Perform swapping of the relcache entry contents.  Within this
		 * process the old entry is momentarily invalid, so there *must*
		 * be no possibility of CHECK_FOR_INTERRUPTS within this sequence.
		 * Do it in all-in-line code for safety.
		 *
		 * Since the vast majority of fields should be swapped, our method
		 * is to swap the whole structures and then re-swap those few fields
		 * we didn't want swapped.
2449
		 */
2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463
#define SWAPFIELD(fldtype, fldname) \
		do { \
			fldtype _tmp = newrel->fldname; \
			newrel->fldname = relation->fldname; \
			relation->fldname = _tmp; \
		} while (0)

		/* swap all Relation struct fields */
 		{
			RelationData tmpstruct;

			memcpy(&tmpstruct, newrel, sizeof(RelationData));
			memcpy(newrel, relation, sizeof(RelationData));
			memcpy(relation, &tmpstruct, sizeof(RelationData));
2464
		}
2465 2466 2467 2468 2469 2470 2471 2472 2473

		/* rd_smgr must not be swapped, due to back-links from smgr level */
		SWAPFIELD(SMgrRelation, rd_smgr);
		/* rd_refcnt must be preserved */
		SWAPFIELD(int, rd_refcnt);
		/* isnailed shouldn't change */
		Assert(newrel->rd_isnailed == relation->rd_isnailed);
		/* creation sub-XIDs must be preserved */
		SWAPFIELD(SubTransactionId, rd_createSubid);
2474
		SWAPFIELD(SubTransactionId, rd_newRelfilenodeSubid);
2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496
		/* un-swap rd_rel pointers, swap contents instead */
		SWAPFIELD(Form_pg_class, rd_rel);
		/* ... but actually, we don't have to update newrel->rd_rel */
		memcpy(relation->rd_rel, newrel->rd_rel, CLASS_TUPLE_SIZE);
		/* preserve old tupledesc and rules if no logical change */
		if (keep_tupdesc)
			SWAPFIELD(TupleDesc, rd_att);
		if (keep_rules)
 		{
			SWAPFIELD(RuleLock *, rd_rules);
			SWAPFIELD(MemoryContext, rd_rulescxt);
 		}
		/* pgstat_info must be preserved */
		SWAPFIELD(struct PgStat_TableStatus *, pgstat_info);

		/* preserve persistent table information for the relation  */
		SWAPFIELD(struct RelationNodeInfo, rd_segfile0_relationnodeinfo);

#undef SWAPFIELD

		/* And now we can throw away the temporary entry */
		RelationDestroyRelation(newrel);
2497
	}
2498 2499
}

2500
/*
2501 2502 2503 2504 2505
 * RelationFlushRelation
 *
 *	 Rebuild the relation if it is open (refcount > 0), else blow it away.
 */
static void
2506
RelationFlushRelation(Relation relation)
2507
{
2508 2509
	if (relation->rd_createSubid != InvalidSubTransactionId ||
		relation->rd_newRelfilenodeSubid != InvalidSubTransactionId)
2510 2511
	{
		/*
2512 2513
		 * New relcache entries are always rebuilt, not flushed; else we'd
		 * forget the "new" status of the relation, which is a useful
2514
		 * optimization to have.  Ditto for the new-relfilenode status.
2515 2516 2517 2518
		 *
		 * The rel could have zero refcnt here, so temporarily increment
		 * the refcnt to ensure it's safe to rebuild it.  We can assume that
		 * the current transaction has some lock on the rel already.
2519
		 */
2520 2521 2522
		RelationIncrementReferenceCount(relation);
		RelationClearRelation(relation, true);
		RelationDecrementReferenceCount(relation);
2523 2524 2525 2526
	}
	else
	{
		/*
2527
		 * Pre-existing rels can be dropped from the relcache if not open.
2528
		 */
2529
		bool	rebuild = !RelationHasReferenceCountZero(relation);
2530

2531 2532
		RelationClearRelation(relation, rebuild);
	}
2533 2534
}

2535
/*
2536
 * RelationForgetRelation - unconditionally remove a relcache entry
2537
 *
2538 2539
 *		   External interface for destroying a relcache entry when we
 *		   drop the relation.
2540 2541
 */
void
2542
RelationForgetRelation(Oid rid)
2543
{
2544
	Relation	relation;
2545 2546 2547

	RelationIdCacheLookup(rid, relation);

2548 2549 2550 2551
	if (!PointerIsValid(relation))
		return;					/* not in cache, nothing to do */

	if (!RelationHasReferenceCountZero(relation))
2552
		elog(ERROR, "relation %u is still open", rid);
2553 2554 2555

	/* Unconditionally destroy the relcache entry */
	RelationClearRelation(relation, false);
2556 2557
}

2558
/*
2559
 *		RelationCacheInvalidateEntry
2560 2561 2562
 *
 *		This routine is invoked for SI cache flush messages.
 *
2563 2564
 * Any relcache entry matching the relid must be flushed.  (Note: caller has
 * already determined that the relid belongs to our database or is a shared
2565
 * relation.)
2566 2567 2568 2569 2570 2571
 *
 * We used to skip local relations, on the grounds that they could
 * not be targets of cross-backend SI update messages; but it seems
 * safer to process them, so that our *own* SI update messages will
 * have the same effects during CommandCounterIncrement for both
 * local and nonlocal relations.
2572 2573
 */
void
2574
RelationCacheInvalidateEntry(Oid relationId)
2575
{
2576
	Relation	relation;
2577 2578 2579

	RelationIdCacheLookup(relationId, relation);

2580
	if (PointerIsValid(relation))
2581
	{
2582
		relcacheInvalsReceived++;
2583
		RelationFlushRelation(relation);
2584
	}
2585 2586 2587 2588
}

/*
 * RelationCacheInvalidate
2589
 *	 Blow away cached relation descriptors that have zero reference counts,
B
Bruce Momjian 已提交
2590
 *	 and rebuild those with positive reference counts.	Also reset the smgr
2591
 *	 relation cache.
2592
 *
2593
 *	 This is currently used only to recover from SI message buffer overflow,
2594
 *	 so we do not touch new-in-transaction relations; they cannot be targets
2595 2596
 *	 of cross-backend SI updates (and our own updates now go through a
 *	 separate linked list that isn't limited by the SI message buffer size).
2597 2598 2599
 *	 Likewise, we need not discard new-relfilenode-in-transaction hints,
 *	 since any invalidation of those would be a local event.
 *
2600 2601
 *	 We do this in two phases: the first pass deletes deletable items, and
 *	 the second one rebuilds the rebuildable items.  This is essential for
2602
 *	 safety, because hash_seq_search only copes with concurrent deletion of
B
Bruce Momjian 已提交
2603
 *	 the element it is currently visiting.	If a second SI overflow were to
2604 2605 2606 2607
 *	 occur while we are walking the table, resulting in recursive entry to
 *	 this routine, we could crash because the inner invocation blows away
 *	 the entry next to be visited by the outer scan.  But this way is OK,
 *	 because (a) during the first pass we won't process any more SI messages,
2608
 *	 so hash_seq_search will complete safely; (b) during the second pass we
2609
 *	 only hold onto pointers to nondeletable entries.
2610 2611 2612 2613 2614 2615
 *
 *	 The two-phase approach also makes it easy to ensure that we process
 *	 nailed-in-cache indexes before other nondeletable items, and that we
 *	 process pg_class_oid_index first of all.  In scenarios where a nailed
 *	 index has been given a new relfilenode, we have to detect that update
 *	 before the nailed index is used in reloading any other relcache entry.
2616 2617
 */
void
2618
RelationCacheInvalidate(void)
2619
{
2620
	HASH_SEQ_STATUS status;
2621
	RelIdCacheEnt *idhentry;
2622
	Relation	relation;
2623
	List	   *rebuildFirstList = NIL;
B
Bruce Momjian 已提交
2624
	List	   *rebuildList = NIL;
2625
	ListCell   *l;
2626 2627

	/* Phase 1 */
2628
	hash_seq_init(&status, RelationIdCache);
2629

2630
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2631
	{
2632
		relation = idhentry->reldesc;
2633

2634
		/* Must close all smgr references to avoid leaving dangling ptrs */
2635
		RelationCloseSmgr(relation);
2636

2637
		/* Ignore new relations, since they are never SI targets */
2638
		if (relation->rd_createSubid != InvalidSubTransactionId)
2639
			continue;
2640

2641 2642
		relcacheInvalsReceived++;

2643
		if (RelationHasReferenceCountZero(relation))
2644 2645
		{
			/* Delete this entry immediately */
2646
			Assert(!relation->rd_isnailed);
2647 2648 2649 2650
			RelationClearRelation(relation, false);
		}
		else
		{
2651 2652
			/*
			 * Add this entry to list of stuff to rebuild in second pass.
B
Bruce Momjian 已提交
2653 2654
			 * pg_class_oid_index goes on the front of rebuildFirstList, other
			 * nailed indexes on the back, and everything else into
2655 2656 2657 2658 2659
			 * rebuildList (in no particular order).
			 */
			if (relation->rd_isnailed &&
				relation->rd_rel->relkind == RELKIND_INDEX)
			{
2660
				if (RelationGetRelid(relation) == ClassOidIndexId)
2661 2662 2663 2664 2665 2666
					rebuildFirstList = lcons(relation, rebuildFirstList);
				else
					rebuildFirstList = lappend(rebuildFirstList, relation);
			}
			else
				rebuildList = lcons(relation, rebuildList);
2667
		}
2668
	}
2669

2670
	/*
B
Bruce Momjian 已提交
2671 2672 2673
	 * Now zap any remaining smgr cache entries.  This must happen before we
	 * start to rebuild entries, since that may involve catalog fetches which
	 * will re-open catalog files.
2674 2675 2676
	 */
	smgrcloseall();

2677
	/* Phase 2: rebuild the items found to need rebuild in phase 1 */
2678 2679 2680 2681 2682 2683
	foreach(l, rebuildFirstList)
	{
		relation = (Relation) lfirst(l);
		RelationClearRelation(relation, true);
	}
	list_free(rebuildFirstList);
2684
	foreach(l, rebuildList)
2685
	{
2686 2687
		relation = (Relation) lfirst(l);
		RelationClearRelation(relation, true);
2688
	}
2689
	list_free(rebuildList);
2690
}
2691

2692
/*
2693
 * AtEOXact_RelationCache
2694
 *
2695
 *	Clean up the relcache at main-transaction commit or abort.
2696 2697 2698 2699 2700
 *
 * Note: this must be called *before* processing invalidation messages.
 * In the case of abort, we don't want to try to rebuild any invalidated
 * cache entries (since we can't safely do database accesses).  Therefore
 * we must reset refcnts before handling pending invalidations.
2701 2702 2703 2704 2705 2706
 *
 * As of PostgreSQL 8.1, relcache refcnts should get released by the
 * ResourceOwner mechanism.  This routine just does a debugging
 * cross-check that no pins remain.  However, we also need to do special
 * cleanup when the current transaction created any relations or made use
 * of forced index lists.
2707 2708
 */
void
2709
AtEOXact_RelationCache(bool isCommit)
2710
{
2711
	HASH_SEQ_STATUS status;
2712
	RelIdCacheEnt *idhentry;
2713

2714 2715
	/*
	 * To speed up transaction exit, we want to avoid scanning the relcache
B
Bruce Momjian 已提交
2716 2717 2718 2719
	 * unless there is actually something for this routine to do.  Other than
	 * the debug-only Assert checks, most transactions don't create any work
	 * for us to do here, so we keep a static flag that gets set if there is
	 * anything to do.	(Currently, this means either a relation is created in
2720 2721 2722 2723
	 * the current xact, or one is given a new relfilenode, or an index list
	 * is forced.)  For simplicity, the flag remains set till end of top-level
	 * transaction, even though we could clear it at subtransaction end in
	 * some cases.
2724 2725 2726
	 *
	 * MPP-3333: READERS need to *always* scan, otherwise they will not be able
	 * to maintain a coherent view of the storage layer.
2727 2728
	 */
	if (!need_eoxact_work
2729
		&& DistributedTransactionContext != DTX_CONTEXT_QE_READER
2730 2731 2732 2733 2734 2735
#ifdef USE_ASSERT_CHECKING
		&& !assert_enabled
#endif
		)
		return;

2736
	hash_seq_init(&status, RelationIdCache);
2737

2738
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2739
	{
2740
		Relation	relation = idhentry->reldesc;
2741 2742 2743 2744 2745

		/*
		 * The relcache entry's ref count should be back to its normal
		 * not-in-a-transaction state: 0 unless it's nailed in cache.
		 *
B
Bruce Momjian 已提交
2746 2747 2748
		 * In bootstrap mode, this is NOT true, so don't check it --- the
		 * bootstrap code expects relations to stay open across start/commit
		 * transaction calls.  (That seems bogus, but it's not worth fixing.)
2749 2750 2751 2752 2753 2754 2755 2756 2757 2758
		 */
#ifdef USE_ASSERT_CHECKING
		if (!IsBootstrapProcessingMode())
		{
			int			expected_refcnt;

			expected_refcnt = relation->rd_isnailed ? 1 : 0;
			Assert(relation->rd_refcnt == expected_refcnt);
		}
#endif
2759

2760 2761 2762 2763 2764 2765 2766 2767 2768
		/*
		 * QE-readers aren't properly enrolled in transactions, they
		 * just get the snapshot which corresponds -- so here, where
		 * we are maintaining their relcache, we want to just clean
		 * up (almost as if we had aborted). (MPP-3338)
		 */
		if (DistributedTransactionContext == DTX_CONTEXT_QE_ENTRY_DB_SINGLETON ||
			DistributedTransactionContext == DTX_CONTEXT_QE_READER)
		{
2769
			RelationClearRelation(relation, relation->rd_isnailed ? true : false);
2770 2771 2772
			continue;
		}

2773 2774 2775
		/*
		 * Is it a relation created in the current transaction?
		 *
B
Bruce Momjian 已提交
2776 2777 2778 2779 2780 2781
		 * During commit, reset the flag to zero, since we are now out of the
		 * creating transaction.  During abort, simply delete the relcache
		 * entry --- it isn't interesting any longer.  (NOTE: if we have
		 * forgotten the new-ness of a new relation due to a forced cache
		 * flush, the entry will get deleted anyway by shared-cache-inval
		 * processing of the aborted pg_class insertion.)
2782
		 */
2783
		if (relation->rd_createSubid != InvalidSubTransactionId)
2784
		{
2785
			if (isCommit)
2786
				relation->rd_createSubid = InvalidSubTransactionId;
2787 2788
			else
			{
2789 2790 2791 2792 2793
				/*
				 * In abort, delete the error log file before forgetting
				 * this relation.
				 */
				ErrorLogDelete(MyDatabaseId, RelationGetRelid(relation));
2794 2795 2796 2797 2798
				RelationClearRelation(relation, false);
				continue;
			}
		}

2799 2800 2801
		/*
		 * Likewise, reset the hint about the relfilenode being new.
		 */
2802
		relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
2803

2804 2805 2806 2807 2808
		/*
		 * Flush any temporary index list.
		 */
		if (relation->rd_indexvalid == 2)
		{
2809
			list_free(relation->rd_indexlist);
2810
			relation->rd_indexlist = NIL;
2811
			relation->rd_oidindex = InvalidOid;
2812 2813
			relation->rd_indexvalid = 0;
		}
2814
	}
2815

2816 2817
	/* Once done with the transaction, we can reset need_eoxact_work */
	need_eoxact_work = false;
2818
}
2819

2820 2821 2822 2823 2824 2825 2826 2827
/*
 * AtEOSubXact_RelationCache
 *
 *	Clean up the relcache at sub-transaction commit or abort.
 *
 * Note: this must be called *before* processing invalidation messages.
 */
void
2828 2829
AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
						  SubTransactionId parentSubid)
2830 2831 2832 2833
{
	HASH_SEQ_STATUS status;
	RelIdCacheEnt *idhentry;

2834
	/*
2835 2836
	 * Skip the relcache scan if nothing to do --- see notes for
	 * AtEOXact_RelationCache.
2837
	 */
2838 2839
	if (!need_eoxact_work &&
		DistributedTransactionContext != DTX_CONTEXT_QE_READER)
2840 2841
		return;

2842 2843 2844 2845 2846 2847
	hash_seq_init(&status, RelationIdCache);

	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
	{
		Relation	relation = idhentry->reldesc;

2848 2849 2850 2851 2852 2853 2854 2855
		/*
		 * As opposed to AtEOXact_RelationCache, subtransactions
		 * in readers are only caused by internal commands, and
		 * there shouldn't be interaction with global transactions,
		 * (reader gangs commit their transaction independently)
		 * we must not clear the relcache here.
		 */

2856 2857 2858
		/*
		 * Is it a relation created in the current subtransaction?
		 *
2859 2860
		 * During subcommit, mark it as belonging to the parent, instead.
		 * During subabort, simply delete the relcache entry.
2861
		 */
2862
		if (relation->rd_createSubid == mySubid)
2863 2864
		{
			if (isCommit)
2865
				relation->rd_createSubid = parentSubid;
2866
			else if (RelationHasReferenceCountZero(relation))
2867
			{
2868 2869 2870 2871 2872
				/*
				 * In abort, delete the error log file before forgetting
				 * this relation.
				 */
				ErrorLogDelete(MyDatabaseId, RelationGetRelid(relation));
2873

2874 2875 2876
				RelationClearRelation(relation, false);
				continue;
			}
2877 2878 2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890
			else
			{
				/*
				 * Hmm, somewhere there's a (leaked?) reference to the
				 * relation.  We daren't remove the entry for fear of
				 * dereferencing a dangling pointer later.  Bleat, and mark it
				 * as not belonging to the current transaction.  Hopefully
				 * it'll get cleaned up eventually.  This must be just a
				 * WARNING to avoid error-during-error-recovery loops.
				 */
				relation->rd_createSubid = InvalidSubTransactionId;
				elog(WARNING, "cannot remove relcache entry for \"%s\" because it has nonzero refcount",
					 RelationGetRelationName(relation));
			}
2891 2892
		}

2893
		/*
B
Bruce Momjian 已提交
2894 2895
		 * Likewise, update or drop any new-relfilenode-in-subtransaction
		 * hint.
2896
		 */
2897 2898 2899 2900 2901
		if (relation->rd_newRelfilenodeSubid == mySubid)
		{
			if (isCommit)
				relation->rd_newRelfilenodeSubid = parentSubid;
			else
2902
				relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
2903
		}
2904 2905 2906 2907 2908 2909 2910 2911

		/*
		 * Flush any temporary index list.
		 */
		if (relation->rd_indexvalid == 2)
		{
			list_free(relation->rd_indexlist);
			relation->rd_indexlist = NIL;
2912
			relation->rd_oidindex = InvalidOid;
2913 2914 2915 2916 2917
			relation->rd_indexvalid = 0;
		}
	}
}

2918 2919 2920 2921
/*
 * RelationCacheMarkNewRelfilenode
 *
 *	Mark the rel as having been given a new relfilenode in the current
B
Bruce Momjian 已提交
2922
 *	(sub) transaction.	This is a hint that can be used to optimize
2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933 2934
 *	later operations on the rel in the same transaction.
 */
void
RelationCacheMarkNewRelfilenode(Relation rel)
{
	/* Mark it... */
	rel->rd_newRelfilenodeSubid = GetCurrentSubTransactionId();
	/* ... and now we have eoxact cleanup work to do */
	need_eoxact_work = true;
}


2935
/*
2936 2937 2938
 *		RelationBuildLocalRelation
 *			Build a relcache entry for an about-to-be-created relation,
 *			and enter it into the relcache.
2939
 */
2940 2941
Relation
RelationBuildLocalRelation(const char *relname,
2942
						   Oid relnamespace,
2943
						   TupleDesc tupDesc,
2944 2945
						   Oid relid,
						   Oid reltablespace,
2946
			               char relkind,            /*CDB*/
2947
						   bool shared_relation)
2948
{
2949
	Relation	rel;
2950
	MemoryContext oldcxt;
2951 2952
	int			natts = tupDesc->natts;
	int			i;
2953
	bool		has_not_null;
2954
	bool		nailit;
2955

2956
	AssertArg(natts >= 0);
2957

2958 2959 2960
	/*
	 * check for creation of a rel that must be nailed in cache.
	 *
2961 2962
	 * XXX this list had better match the relations specially handled in
	 * RelationCacheInitializePhase2/3.
2963 2964 2965
	 */
	switch (relid)
	{
2966 2967 2968
		case DatabaseRelationId:
		case AuthIdRelationId:
		case AuthMemRelationId:
2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979
		case RelationRelationId:
		case AttributeRelationId:
		case ProcedureRelationId:
		case TypeRelationId:
			nailit = true;
			break;
		default:
			nailit = false;
			break;
	}

2980 2981
	/*
	 * check that hardwired list of shared rels matches what's in the
B
Bruce Momjian 已提交
2982 2983 2984
	 * bootstrap .bki file.  If you get a failure here during initdb, you
	 * probably need to fix IsSharedRelation() to match whatever you've done
	 * to the set of shared relations.
2985 2986 2987 2988 2989
	 */
	if (shared_relation != IsSharedRelation(relid))
		elog(ERROR, "shared_relation flag for \"%s\" does not match IsSharedRelation(%u)",
			 relname, relid);

2990 2991 2992 2993 2994
	/*
	 * switch to the cache context to create the relcache entry.
	 */
	if (!CacheMemoryContext)
		CreateCacheMemoryContext();
2995

2996 2997
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

2998
	/*
2999
	 * allocate a new relation descriptor and fill in basic state fields.
3000
	 */
3001
	rel = (Relation) palloc0(sizeof(RelationData));
3002

3003 3004 3005
	rel->rd_targblock = InvalidBlockNumber;

	/* make sure relation is marked as having no open file yet */
3006
	rel->rd_smgr = NULL;
3007

3008 3009 3010
	/* mark it nailed if appropriate */
	rel->rd_isnailed = nailit;

3011
	rel->rd_refcnt = nailit ? 1 : 0;
3012

3013
	/* it's being created in this transaction */
3014
	rel->rd_createSubid = GetCurrentSubTransactionId();
3015
	rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3016

3017
	/* must flag that we have rels created in this transaction */
3018
	need_eoxact_work = true;
3019

3020
	/* is it a temporary relation? */
3021
	rel->rd_istemp = isTempOrToastNamespace(relnamespace);
3022

3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037
	/* is it a system catalog? */
	rel->rd_issyscat = (strncmp(relname, "pg_", 3) == 0);

    /*
     * CDB: On QEs, temp relations must use shared buffer cache so data
     * will be visible to all segmates.  On QD, sequence objects must
     * use shared buffer cache so data will be visible to sequence server.
     */
    if (rel->rd_istemp &&
        relkind != RELKIND_SEQUENCE &&
        Gp_role != GP_ROLE_EXECUTE)
        rel->rd_isLocalBuf = true;
    else
        rel->rd_isLocalBuf = false;

3038
	/*
3039
	 * create a new tuple descriptor from the one passed in.  We do this
B
Bruce Momjian 已提交
3040 3041 3042 3043
	 * partly to copy it into the cache context, and partly because the new
	 * relation can't have any defaults or constraints yet; they have to be
	 * added in later steps, because they require additions to multiple system
	 * catalogs.  We can copy attnotnull constraints here, however.
3044
	 */
3045
	rel->rd_att = CreateTupleDescCopy(tupDesc);
3046
	rel->rd_att->tdrefcount = 1;	/* mark as refcounted */
3047
	has_not_null = false;
3048
	for (i = 0; i < natts; i++)
3049
	{
3050
		rel->rd_att->attrs[i]->attnotnull = tupDesc->attrs[i]->attnotnull;
3051 3052 3053 3054 3055 3056 3057 3058 3059 3060
		has_not_null |= tupDesc->attrs[i]->attnotnull;
	}

	if (has_not_null)
	{
		TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

		constr->has_not_null = true;
		rel->rd_att->constr = constr;
	}
3061 3062 3063 3064

	/*
	 * initialize relation tuple form (caller may add/override data later)
	 */
3065
	rel->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
3066

3067 3068
	namestrcpy(&rel->rd_rel->relname, relname);
	rel->rd_rel->relnamespace = relnamespace;
3069 3070

	rel->rd_rel->relkind = RELKIND_UNCATALOGED;
3071
	rel->rd_rel->relstorage = RELSTORAGE_HEAP;
3072
	rel->rd_rel->relhasoids = rel->rd_att->tdhasoid;
3073 3074
	rel->rd_rel->relnatts = natts;
	rel->rd_rel->reltype = InvalidOid;
3075 3076
	/* needed when bootstrapping: */
	rel->rd_rel->relowner = BOOTSTRAP_SUPERUSERID;
3077

3078 3079 3080 3081 3082 3083 3084
	/*
	 * Create zeroed-out gp_relation_node data.  It will be filled in when the
	 * disk file is created.
	 */
	rel->rd_segfile0_relationnodeinfo.isPresent = false;
	rel->rd_segfile0_relationnodeinfo.tidAllowedToBeZero = false;

3085
	/*
B
Bruce Momjian 已提交
3086 3087 3088
	 * Insert relation physical and logical identifiers (OIDs) into the right
	 * places.	Note that the physical ID (relfilenode) is initially the same
	 * as the logical ID (OID).
3089
	 */
3090
	rel->rd_rel->relisshared = shared_relation;
3091 3092 3093 3094 3095 3096

	RelationGetRelid(rel) = relid;

	for (i = 0; i < natts; i++)
		rel->rd_att->attrs[i]->attrelid = relid;

3097 3098
	rel->rd_rel->relfilenode = relid;
	rel->rd_rel->reltablespace = reltablespace;
3099

3100
	RelationInitLockInfo(rel);	/* see lmgr.c */
3101

3102 3103
	RelationInitPhysicalAddr(rel);

3104 3105 3106 3107
	/*
	 * Okay to insert into the relcache hash tables.
	 */
	RelationCacheInsert(rel);
3108

3109 3110 3111
	/*
	 * done building relcache entry.
	 */
3112
	MemoryContextSwitchTo(oldcxt);
3113

3114 3115 3116
	/* It's fully valid */
	rel->rd_isvalid = true;

3117 3118 3119 3120 3121
	/*
	 * Caller expects us to pin the returned entry.
	 */
	RelationIncrementReferenceCount(rel);

3122
	return rel;
3123 3124
}

3125
/*
3126
 *		RelationCacheInitialize
3127
 *
3128 3129
 *		This initializes the relation descriptor cache.  At the time
 *		that this is invoked, we can't do database access yet (mainly
3130 3131 3132 3133 3134
 *		because the transaction subsystem is not up); all we are doing
 *		is making an empty cache hashtable.  This must be done before
 *		starting the initialization transaction, because otherwise
 *		AtEOXact_RelationCache would crash if that transaction aborts
 *		before we can get the relcache set up.
3135 3136
 */

3137
#define INITRELCACHESIZE		400
3138 3139

void
3140
RelationCacheInitialize(void)
3141
{
3142 3143
	MemoryContext oldcxt;
	HASHCTL		ctl;
3144

3145
	/*
3146
	 * make sure cache memory context exists
3147
	 */
3148 3149
	if (!CacheMemoryContext)
		CreateCacheMemoryContext();
3150

3151 3152 3153
    /*
	 * switch to cache memory context
	 */
3154
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3155

3156
	/*
3157
	 * create hashtable that indexes the relcache
3158
	 */
3159
	MemSet(&ctl, 0, sizeof(ctl));
3160
	ctl.keysize = sizeof(Oid);
3161
	ctl.entrysize = sizeof(RelIdCacheEnt);
3162
	ctl.hash = oid_hash;
3163 3164
	RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
								  &ctl, HASH_ELEM | HASH_FUNCTION);
3165

3166 3167 3168 3169 3170 3171
	MemoryContextSwitchTo(oldcxt);
}

/*
 *		RelationCacheInitializePhase2
 *
3172 3173 3174 3175 3176 3177 3178
 *		This is called to prepare for access to shared catalogs during startup.
 *		We must at least set up nailed reldescs for pg_database, pg_authid,
 *		and pg_auth_members.  Ideally we'd like to have reldescs for their
 *		indexes, too.  We attempt to load this information from the shared
 *		relcache init file.  If that's missing or broken, just make phony
 *		entries for the catalogs themselves.  RelationCacheInitializePhase3
 *		will clean up as needed.
3179 3180 3181
 */
void
RelationCacheInitializePhase2(void)
3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226 3227 3228 3229 3230 3231
{
	MemoryContext oldcxt;

	/*
	 * In bootstrap mode, the shared catalogs aren't there yet anyway, so do
	 * nothing.
	 */
	if (IsBootstrapProcessingMode())
		return;

	/*
	 * switch to cache memory context
	 */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

	/*
	 * Try to load the shared relcache cache file.	If unsuccessful, bootstrap
	 * the cache with pre-made descriptors for the critical shared catalogs.
	 */
	if (!load_relcache_init_file(true))
	{
		formrdesc("pg_database", PG_DATABASE_RELTYPE_OID, true,
				  true, Natts_pg_database, Desc_pg_database);
		formrdesc("pg_authid", PG_AUTHID_RELTYPE_OID, true,
				  true, Natts_pg_authid, Desc_pg_authid);
		formrdesc("pg_auth_members", PG_AUTH_MEMBERS_RELTYPE_OID, true,
				  false, Natts_pg_auth_members, Desc_pg_auth_members);

#define NUM_CRITICAL_SHARED_RELS	3	/* fix if you change list above */
	}

	MemoryContextSwitchTo(oldcxt);
}

/*
 *		RelationCacheInitializePhase3
 *
 *		This is called as soon as the catcache and transaction system
 *		are functional and we have determined MyDatabaseId.  At this point
 *		we can actually read data from the database's system catalogs.
 *		We first try to read pre-computed relcache entries from the local
 *		relcache init file.  If that's missing or broken, make phony entries
 *		for the minimum set of nailed-in-cache relations.  Then (unless
 *		bootstrapping) make sure we have entries for the critical system
 *		indexes.  Once we've done all this, we have enough infrastructure to
 *		open any system catalog or use any catcache.  The last step is to
 *		rewrite the cache files if needed.
 */
void
RelationCacheInitializePhase3(void)
3232 3233 3234 3235
{
	HASH_SEQ_STATUS status;
	RelIdCacheEnt *idhentry;
	MemoryContext oldcxt;
3236
	bool		needNewCacheFile = !criticalSharedRelcachesBuilt;
3237

3238
	/*
3239 3240 3241 3242 3243
	 * switch to cache memory context
	 */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

	/*
3244 3245
	 * Try to load the local relcache cache file.  If unsuccessful, bootstrap
	 * the cache with pre-made descriptors for the critical "nailed-in" system
3246
	 * catalogs.
3247
	 */
3248
	if (IsBootstrapProcessingMode() ||
3249
		!load_relcache_init_file(false))
3250
	{
3251 3252
		needNewCacheFile = true;

3253
		formrdesc("pg_class", PG_CLASS_RELTYPE_OID, false,
3254
				  true, Natts_pg_class, Desc_pg_class);
3255
		formrdesc("pg_attribute", PG_ATTRIBUTE_RELTYPE_OID, false,
3256
				  false, Natts_pg_attribute, Desc_pg_attribute);
3257
		formrdesc("pg_proc", PG_PROC_RELTYPE_OID, false,
3258
				  true, Natts_pg_proc, Desc_pg_proc);
3259
		formrdesc("pg_type", PG_TYPE_RELTYPE_OID, false,
3260
				  true, Natts_pg_type, Desc_pg_type);
3261

3262
#define NUM_CRITICAL_LOCAL_RELS 4		/* fix if you change list above */
3263
	}
3264 3265

	MemoryContextSwitchTo(oldcxt);
3266

3267
	/* In bootstrap mode, the faked-up formrdesc info is all we'll have */
3268 3269 3270
	if (IsBootstrapProcessingMode())
		return;

3271
	/*
B
Bruce Momjian 已提交
3272
	 * If we didn't get the critical system indexes loaded into relcache, do
3273 3274
	 * so now.	These are critical because the catcache and/or opclass cache
	 * depend on them for fetches done during relcache load.  Thus, we have an
B
Bruce Momjian 已提交
3275 3276 3277 3278 3279 3280
	 * infinite-recursion problem.	We can break the recursion by doing
	 * heapscans instead of indexscans at certain key spots. To avoid hobbling
	 * performance, we only want to do that until we have the critical indexes
	 * loaded into relcache.  Thus, the flag criticalRelcachesBuilt is used to
	 * decide whether to do heapscan or indexscan at the key spots, and we set
	 * it true after we've loaded the critical indexes.
3281
	 *
B
Bruce Momjian 已提交
3282 3283 3284 3285 3286 3287
	 * The critical indexes are marked as "nailed in cache", partly to make it
	 * easy for load_relcache_init_file to count them, but mainly because we
	 * cannot flush and rebuild them once we've set criticalRelcachesBuilt to
	 * true.  (NOTE: perhaps it would be possible to reload them by
	 * temporarily setting criticalRelcachesBuilt to false again.  For now,
	 * though, we just nail 'em in.)
3288 3289 3290 3291
	 *
	 * RewriteRelRulenameIndexId and TriggerRelidNameIndexId are not critical
	 * in the same way as the others, because the critical catalogs don't
	 * (currently) have any rules or triggers, and so these indexes can be
B
Bruce Momjian 已提交
3292
	 * rebuilt without inducing recursion.	However they are used during
3293 3294
	 * relcache load when a rel does have rules or triggers, so we choose to
	 * nail them for performance reasons.
3295
	 */
B
Bruce Momjian 已提交
3296
	if (!criticalRelcachesBuilt)
3297
	{
3298 3299 3300 3301 3302 3303
		load_critical_index(ClassOidIndexId,
							RelationRelationId);
		load_critical_index(AttributeRelidNumIndexId,
							AttributeRelationId);
		load_critical_index(IndexRelidIndexId,
							IndexRelationId);
3304 3305
		load_critical_index(OpclassOidIndexId,
							OperatorClassRelationId);
3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316
		load_critical_index(AccessMethodStrategyIndexId,
							AccessMethodOperatorRelationId);
		load_critical_index(OpclassOidIndexId,
							OperatorClassRelationId);
		load_critical_index(AccessMethodProcedureIndexId,
							AccessMethodProcedureRelationId);
		load_critical_index(RewriteRelRulenameIndexId,
							RewriteRelationId);
		load_critical_index(TriggerRelidNameIndexId,
							TriggerRelationId);

3317
#define NUM_CRITICAL_LOCAL_INDEXES	9	/* fix if you change list above */
3318 3319 3320 3321

		criticalRelcachesBuilt = true;
	}

3322 3323 3324 3325 3326 3327 3328 3329 3330 3331 3332 3333 3334 3335 3336 3337 3338 3339 3340 3341 3342 3343 3344 3345 3346 3347 3348 3349
	/*
	 * Process critical shared indexes too.
	 *
	 * DatabaseNameIndexId isn't critical for relcache loading, but rather for
	 * initial lookup of MyDatabaseId, without which we'll never find any
	 * non-shared catalogs at all.	Autovacuum calls InitPostgres with a
	 * database OID, so it instead depends on DatabaseOidIndexId.  We also
	 * need to nail up some indexes on pg_authid and pg_auth_members for use
	 * during client authentication.
	 */
	if (!criticalSharedRelcachesBuilt)
	{
		load_critical_index(DatabaseNameIndexId,
							DatabaseRelationId);
		load_critical_index(DatabaseOidIndexId,
							DatabaseRelationId);
		load_critical_index(AuthIdRolnameIndexId,
							AuthIdRelationId);
		load_critical_index(AuthIdOidIndexId,
							AuthIdRelationId);
		load_critical_index(AuthMemMemRoleIndexId,
							AuthMemRelationId);

#define NUM_CRITICAL_SHARED_INDEXES 5	/* fix if you change list above */

		criticalSharedRelcachesBuilt = true;
	}

3350
	/*
B
Bruce Momjian 已提交
3351 3352 3353 3354 3355 3356
	 * Now, scan all the relcache entries and update anything that might be
	 * wrong in the results from formrdesc or the relcache cache file. If we
	 * faked up relcache entries using formrdesc, then read the real pg_class
	 * rows and replace the fake entries with them. Also, if any of the
	 * relcache entries have rules or triggers, load that info the hard way
	 * since it isn't recorded in the cache file.
3357
	 *
3358 3359
	 * Whenever we access the catalogs to read data, there is a possibility
	 * of a shared-inval cache flush causing relcache entries to be removed.
3360 3361 3362 3363 3364
	 * Since hash_seq_search only guarantees to still work after the *current*
	 * entry is removed, it's unsafe to continue the hashtable scan afterward.
	 * We handle this by restarting the scan from scratch after each access.
	 * This is theoretically O(N^2), but the number of entries that actually
	 * need to be fixed is small enough that it doesn't matter.
3365
	 */
3366
	hash_seq_init(&status, RelationIdCache);
3367

3368
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3369
	{
3370
		Relation	relation = idhentry->reldesc;
3371 3372 3373 3374 3375 3376
		bool		restart = false;

		/*
		 * Make sure *this* entry doesn't get flushed while we work with it.
		 */
		RelationIncrementReferenceCount(relation);
3377

3378
		/*
3379
		 * If it's a faked-up entry, read the real pg_class tuple.
3380
		 */
3381
		if (relation->rd_rel->relowner == InvalidOid)
3382 3383 3384
		{
			HeapTuple	htup;
			Form_pg_class relp;
B
Bruce Momjian 已提交
3385

3386
			htup = SearchSysCache(RELOID,
3387
							   ObjectIdGetDatum(RelationGetRelid(relation)),
3388 3389
								  0, 0, 0);
			if (!HeapTupleIsValid(htup))
3390 3391
				elog(FATAL, "cache lookup failed for relation %u",
					 RelationGetRelid(relation));
3392
			relp = (Form_pg_class) GETSTRUCT(htup);
B
Bruce Momjian 已提交
3393

3394 3395 3396 3397 3398
			/*
			 * Copy tuple to relation->rd_rel. (See notes in
			 * AllocateRelationDesc())
			 */
			memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
3399

3400 3401 3402 3403 3404
			/* Update rd_options while we have the tuple */
			if (relation->rd_options)
				pfree(relation->rd_options);
			RelationParseRelOptions(relation, htup);

3405
			/*
3406
			 * Check the values in rd_att were set up correctly.  (We cannot
3407 3408 3409
			 * just copy them over now: formrdesc must have set up the
			 * rd_att data correctly to start with, because it may already
			 * have been copied into one or more catcache entries.)
3410
			 */
3411 3412 3413
			Assert(relation->rd_att->tdtypeid == relp->reltype);
			Assert(relation->rd_att->tdtypmod == -1);
			Assert(relation->rd_att->tdhasoid == relp->relhasoids);
3414

3415
			ReleaseSysCache(htup);
3416 3417 3418 3419 3420 3421 3422

			/* relowner had better be OK now, else we'll loop forever */
			if (relation->rd_rel->relowner == InvalidOid)
				elog(ERROR, "invalid relowner in pg_class entry for \"%s\"",
					 RelationGetRelationName(relation));

			restart = true;
3423 3424 3425 3426
		}

		/*
		 * Fix data that isn't saved in relcache cache file.
3427 3428 3429 3430 3431
		 *
		 * relhasrules or relhastriggers could possibly be wrong or out of
		 * date.  If we don't actually find any rules or triggers, clear the
		 * local copy of the flag so that we don't get into an infinite loop
		 * here.  We don't make any attempt to fix the pg_class entry, though.
3432 3433
		 */
		if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
3434
		{
3435
			RelationBuildRuleLock(relation);
3436 3437 3438 3439
			if (relation->rd_rules == NULL)
				relation->rd_rel->relhasrules = false;
			restart = true;
		}
3440
		if (relation->rd_rel->reltriggers > 0 && relation->trigdesc == NULL)
3441
		{
3442
			RelationBuildTriggers(relation);
3443 3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456
			if (relation->trigdesc == NULL)
				relation->rd_rel->reltriggers = 0;
			restart = true;
		}

		/* Release hold on the relation */
		RelationDecrementReferenceCount(relation);

		/* Now, restart the hashtable scan if needed */
		if (restart)
		{
			hash_seq_term(&status);
			hash_seq_init(&status, RelationIdCache);
		}
3457
	}
3458

3459
	/*
3460 3461
	 * Lastly, write out new relcache cache files if needed.  We don't bother
	 * to distinguish cases where only one of the two needs an update.
3462
	 */
3463 3464 3465
	if (needNewCacheFile)
	{
		/*
B
Bruce Momjian 已提交
3466 3467 3468
		 * Force all the catcaches to finish initializing and thereby open the
		 * catalogs and indexes they use.  This will preload the relcache with
		 * entries for all the most important system catalogs and indexes, so
3469
		 * that the init files will be most useful for future backends.
3470 3471 3472
		 */
		InitCatalogCachePhase2();

3473 3474 3475 3476 3477 3478
		/* reset initFileRelationIds list; we'll fill it during write */
		initFileRelationIds = NIL;

		/* now write the files */
		write_relcache_init_file(true);
		write_relcache_init_file(false);
3479 3480 3481
	}
}

3482 3483 3484 3485 3486 3487 3488 3489 3490 3491 3492 3493 3494 3495 3496 3497 3498 3499 3500 3501 3502 3503 3504 3505 3506 3507 3508 3509
/*
 * Load one critical system index into the relcache
 *
 * indexoid is the OID of the target index, heapoid is the OID of the catalog
 * it belongs to.
 */
static void
load_critical_index(Oid indexoid, Oid heapoid)
{
	Relation	ird;

	/*
	 * We must lock the underlying catalog before locking the index to avoid
	 * deadlock, since RelationBuildDesc might well need to read the catalog,
	 * and if anyone else is exclusive-locking this catalog and index they'll
	 * be doing it in that order.
	 */
	LockRelationOid(heapoid, AccessShareLock);
	LockRelationOid(indexoid, AccessShareLock);
	ird = RelationBuildDesc(indexoid, true);
	if (ird == NULL)
		elog(PANIC, "could not open critical system index %u", indexoid);
	ird->rd_isnailed = true;
	ird->rd_refcnt = 1;
	UnlockRelationOid(indexoid, AccessShareLock);
	UnlockRelationOid(heapoid, AccessShareLock);
}

3510
/*
3511
 * GetPgClassDescriptor -- get a predefined tuple descriptor for pg_class
3512 3513 3514
 * GetPgIndexDescriptor -- get a predefined tuple descriptor for pg_index
 *
 * We need this kluge because we have to be able to access non-fixed-width
3515 3516 3517 3518 3519 3520
 * fields of pg_class and pg_index before we have the standard catalog caches
 * available.  We use predefined data that's set up in just the same way as
 * the bootstrapped reldescs used by formrdesc().  The resulting tupdesc is
 * not 100% kosher: it does not have the correct rowtype OID in tdtypeid, nor
 * does it have a TupleConstr field.  But it's good enough for the purpose of
 * extracting fields.
3521 3522
 */
static TupleDesc
3523 3524
BuildHardcodedDescriptor(int natts, const FormData_pg_attribute *attrs,
						 bool hasoids)
3525
{
3526
	TupleDesc	result;
3527 3528 3529 3530 3531
	MemoryContext oldcxt;
	int			i;

	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

3532
	result = CreateTemplateTupleDesc(natts, hasoids);
B
Bruce Momjian 已提交
3533
	result->tdtypeid = RECORDOID;		/* not right, but we don't care */
3534
	result->tdtypmod = -1;
3535

3536
	for (i = 0; i < natts; i++)
3537
	{
3538
		memcpy(result->attrs[i], &attrs[i], ATTRIBUTE_FIXED_PART_SIZE);
3539
		/* make sure attcacheoff is valid */
3540
		result->attrs[i]->attcacheoff = -1;
3541 3542 3543
	}

	/* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
3544
	result->attrs[0]->attcacheoff = 0;
3545 3546 3547 3548 3549

	/* Note: we don't bother to set up a TupleConstr entry */

	MemoryContextSwitchTo(oldcxt);

3550 3551 3552 3553 3554 3555 3556 3557 3558 3559 3560 3561 3562 3563 3564 3565 3566 3567 3568 3569 3570 3571 3572 3573 3574 3575 3576 3577
	return result;
}

static TupleDesc
GetPgClassDescriptor(void)
{
	static TupleDesc pgclassdesc = NULL;

	/* Already done? */
	if (pgclassdesc == NULL)
		pgclassdesc = BuildHardcodedDescriptor(Natts_pg_class,
											   Desc_pg_class,
											   true);

	return pgclassdesc;
}

static TupleDesc
GetPgIndexDescriptor(void)
{
	static TupleDesc pgindexdesc = NULL;

	/* Already done? */
	if (pgindexdesc == NULL)
		pgindexdesc = BuildHardcodedDescriptor(Natts_pg_index,
											   Desc_pg_index,
											   false);

3578 3579 3580
	return pgindexdesc;
}

3581 3582 3583
/*
 * Load any default attribute value definitions for the relation.
 */
3584
static void
3585
AttrDefaultFetch(Relation relation)
3586
{
3587 3588 3589
	AttrDefault *attrdef = relation->rd_att->constr->defval;
	int			ndef = relation->rd_att->constr->num_defval;
	Relation	adrel;
H
Hiroshi Inoue 已提交
3590
	HeapTuple	htup;
3591 3592
	cqContext	cqc;
	cqContext  *pcqCtx;
3593
	Datum		val;
3594 3595 3596
	bool		isnull;
	int			found;
	int			i;
3597

3598
	adrel = heap_open(AttrDefaultRelationId, AccessShareLock);
3599 3600 3601 3602 3603 3604 3605 3606 3607
	pcqCtx = caql_beginscan(
			caql_syscache(
					caql_indexOK(caql_addrel(cqclr(&cqc), adrel), 
								 true),
					false),
			cql("SELECT * FROM pg_attrdef "
				" WHERE adrelid = :1 ",
				ObjectIdGetDatum(RelationGetRelid(relation))));

3608
	found = 0;
3609

3610
	while (HeapTupleIsValid(htup = caql_getnext(pcqCtx)))
3611
	{
3612
		Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
3613

3614 3615 3616 3617
		for (i = 0; i < ndef; i++)
		{
			if (adform->adnum != attrdef[i].adnum)
				continue;
3618
			if (attrdef[i].adbin != NULL)
3619
				elog(WARNING, "multiple attrdef records found for attr %s of rel %s",
B
Bruce Momjian 已提交
3620
				NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
3621
					 RelationGetRelationName(relation));
3622 3623
			else
				found++;
3624

3625 3626 3627
			val = fastgetattr(htup,
							  Anum_pg_attrdef_adbin,
							  adrel->rd_att, &isnull);
3628
			if (isnull)
3629
				elog(WARNING, "null adbin for attr %s of rel %s",
B
Bruce Momjian 已提交
3630
				NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
3631
					 RelationGetRelationName(relation));
3632 3633
			else
				attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext,
3634
												   TextDatumGetCString(val));
3635 3636
			break;
		}
3637

3638
		if (i >= ndef)
3639 3640
			elog(WARNING, "unexpected attrdef record found for attr %d of rel %s",
				 adform->adnum, RelationGetRelationName(relation));
3641 3642
	}

3643
	caql_endscan(pcqCtx);
3644
	heap_close(adrel, AccessShareLock);
3645 3646

	if (found != ndef)
3647
		elog(WARNING, "%d attrdef record(s) missing for rel %s",
3648
			 ndef - found, RelationGetRelationName(relation));
3649 3650
}

3651 3652 3653
/*
 * Load any check constraints for the relation.
 */
3654
static void
3655
CheckConstraintFetch(Relation relation)
3656
{
3657 3658
	ConstrCheck *check = relation->rd_att->constr->check;
	int			ncheck = relation->rd_att->constr->num_check;
3659
	Relation	conrel;
H
Hiroshi Inoue 已提交
3660
	HeapTuple	htup;
3661 3662
	cqContext	cqc;
	cqContext  *pcqCtx;
3663
	Datum		val;
3664
	bool		isnull;
3665
	int			found = 0;
3666

3667
	conrel = heap_open(ConstraintRelationId, AccessShareLock);
3668

3669 3670 3671 3672 3673 3674 3675 3676 3677 3678
	pcqCtx = caql_beginscan(
			caql_syscache(
					caql_indexOK(caql_addrel(cqclr(&cqc), conrel), 
								 true),
						false),
			cql("SELECT * FROM pg_constraint "
				" WHERE conrelid = :1 ",
				ObjectIdGetDatum(RelationGetRelid(relation))));

	while (HeapTupleIsValid(htup = caql_getnext(pcqCtx)))
3679
	{
3680 3681 3682 3683 3684 3685
		Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);

		/* We want check constraints only */
		if (conform->contype != CONSTRAINT_CHECK)
			continue;

3686
		if (found >= ncheck)
3687 3688 3689
			elog(ERROR,
			     "pg_class reports %d constraint record(s) for rel %s, but found extra in pg_constraint",
			     ncheck, RelationGetRelationName(relation));
3690

3691
		check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
B
Bruce Momjian 已提交
3692
												  NameStr(conform->conname));
3693 3694

		/* Grab and test conbin is actually set */
3695
		val = fastgetattr(htup,
3696 3697
						  Anum_pg_constraint_conbin,
						  conrel->rd_att, &isnull);
3698
		if (isnull)
3699
			elog(ERROR, "null conbin for rel %s",
3700
				 RelationGetRelationName(relation));
3701

3702
		check[found].ccbin = MemoryContextStrdup(CacheMemoryContext,
3703
												 TextDatumGetCString(val));
3704 3705 3706
		found++;
	}

3707
	caql_endscan(pcqCtx);
3708
	heap_close(conrel, AccessShareLock);
3709 3710

	if (found != ncheck)
3711 3712 3713
		elog(ERROR,
		     "found %d in pg_constraint, but pg_class reports %d constraint record(s) for rel %s",
		     found, ncheck, RelationGetRelationName(relation));
3714 3715
}

3716 3717 3718 3719 3720 3721 3722 3723 3724 3725 3726 3727 3728 3729 3730

/*
 * RelationGetPartitioningKey -- get GpPolicy struct for distributed relation
 *
 * Returns a copy of the relation's GpPolicy object, palloc'd in
 * the caller's context.  Caller should pfree() it.  If NULL is
 * returned, relation should be accessed locally.
 */
GpPolicy*
RelationGetPartitioningKey(Relation relation)
{
    return GpPolicyCopy(CurrentMemoryContext, relation->rd_cdbpolicy);
}                                       /* RelationGetPartitioningKey */


3731 3732 3733 3734 3735 3736
/*
 * RelationGetIndexList -- get a list of OIDs of indexes on this relation
 *
 * The index list is created only if someone requests it.  We scan pg_index
 * to find relevant indexes, and add the list to the relcache entry so that
 * we won't have to compute it again.  Note that shared cache inval of a
3737
 * relcache entry will delete the old list and set rd_indexvalid to 0,
3738 3739 3740
 * so that we must recompute the index list on next request.  This handles
 * creation or deletion of an index.
 *
3741 3742 3743 3744 3745 3746
 * The returned list is guaranteed to be sorted in order by OID.  This is
 * needed by the executor, since for index types that we obtain exclusive
 * locks on when updating the index, all backends must lock the indexes in
 * the same order or we will get deadlocks (see ExecOpenIndices()).  Any
 * consistent ordering would do, but ordering by OID is easy.
 *
3747 3748
 * Since shared cache inval causes the relcache's copy of the list to go away,
 * we return a copy of the list palloc'd in the caller's context.  The caller
B
Bruce Momjian 已提交
3749
 * may list_free() the returned list after scanning it. This is necessary
3750 3751
 * since the caller will typically be doing syscache lookups on the relevant
 * indexes, and syscache lookup could cause SI messages to be processed!
3752 3753 3754 3755 3756
 *
 * We also update rd_oidindex, which this module treats as effectively part
 * of the index list.  rd_oidindex is valid when rd_indexvalid isn't zero;
 * it is the pg_class OID of a unique index on OID when the relation has one,
 * and InvalidOid if there is no such index.
3757 3758 3759 3760 3761
 */
List *
RelationGetIndexList(Relation relation)
{
	Relation	indrel;
3762
	HeapTuple	htup;
3763 3764
	cqContext	cqc;
	cqContext  *pcqCtx;
3765
	List	   *result;
3766
	Oid			oidIndex;
3767 3768 3769
	MemoryContext oldcxt;

	/* Quick exit if we already computed the list. */
3770
	if (relation->rd_indexvalid != 0)
3771
		return list_copy(relation->rd_indexlist);
3772 3773

	/*
B
Bruce Momjian 已提交
3774 3775 3776 3777
	 * We build the list we intend to return (in the caller's context) while
	 * doing the scan.	After successfully completing the scan, we copy that
	 * list into the relcache entry.  This avoids cache-context memory leakage
	 * if we get some sort of error partway through.
3778 3779
	 */
	result = NIL;
3780
	oidIndex = InvalidOid;
B
Bruce Momjian 已提交
3781

3782
	/* Prepare to scan pg_index for entries having indrelid = this rel. */
3783

3784
	indrel = heap_open(IndexRelationId, AccessShareLock);
3785

3786 3787 3788 3789 3790 3791 3792 3793 3794 3795
	pcqCtx = caql_beginscan(
			caql_syscache(
					caql_indexOK(caql_addrel(cqclr(&cqc), indrel), 
								 true),
						false),
			cql("SELECT * FROM pg_index "
				" WHERE indrelid = :1 ",
				ObjectIdGetDatum(RelationGetRelid(relation))));

	while (HeapTupleIsValid(htup = caql_getnext(pcqCtx)))
3796 3797
	{
		Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
3798

3799
		/* Add index's OID to result list in the proper order */
3800
		result = insert_ordered_oid(result, index->indexrelid);
3801 3802

		/* Check to see if it is a unique, non-partial btree index on OID */
3803 3804
		if (IndexIsValid(index) &&
			index->indnatts == 1 &&
3805 3806 3807 3808 3809
			index->indisunique &&
			index->indkey.values[0] == ObjectIdAttributeNumber &&
			index->indclass.values[0] == OID_BTREE_OPS_OID &&
			heap_attisnull(htup, Anum_pg_index_indpred))
			oidIndex = index->indexrelid;
3810 3811
	}

3812
	caql_endscan(pcqCtx);
3813 3814
	heap_close(indrel, AccessShareLock);

3815
	/* Now save a copy of the completed list in the relcache entry. */
3816
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3817
	relation->rd_indexlist = list_copy(result);
3818
	relation->rd_oidindex = oidIndex;
3819
	relation->rd_indexvalid = 1;
3820 3821 3822 3823 3824
	MemoryContextSwitchTo(oldcxt);

	return result;
}

3825 3826 3827 3828 3829 3830 3831 3832 3833 3834 3835 3836
/*
 * insert_ordered_oid
 *		Insert a new Oid into a sorted list of Oids, preserving ordering
 *
 * Building the ordered list this way is O(N^2), but with a pretty small
 * constant, so for the number of entries we expect it will probably be
 * faster than trying to apply qsort().  Most tables don't have very many
 * indexes...
 */
static List *
insert_ordered_oid(List *list, Oid datum)
{
B
Bruce Momjian 已提交
3837
	ListCell   *prev;
3838 3839

	/* Does the datum belong at the front? */
3840 3841
	if (list == NIL || datum < linitial_oid(list))
		return lcons_oid(datum, list);
3842
	/* No, so find the entry it belongs after */
3843
	prev = list_head(list);
3844 3845
	for (;;)
	{
B
Bruce Momjian 已提交
3846
		ListCell   *curr = lnext(prev);
3847

3848
		if (curr == NULL || datum < lfirst_oid(curr))
B
Bruce Momjian 已提交
3849
			break;				/* it belongs after 'prev', before 'curr' */
3850 3851

		prev = curr;
3852
	}
3853 3854
	/* Insert datum into list after 'prev' */
	lappend_cell_oid(list, prev, datum);
3855 3856 3857
	return list;
}

3858 3859 3860 3861
/*
 * RelationSetIndexList -- externally force the index list contents
 *
 * This is used to temporarily override what we think the set of valid
3862 3863
 * indexes is (including the presence or absence of an OID index).
 * The forcing will be valid only until transaction commit or abort.
3864 3865 3866 3867 3868 3869
 *
 * This should only be applied to nailed relations, because in a non-nailed
 * relation the hacked index list could be lost at any time due to SI
 * messages.  In practice it is only used on pg_class (see REINDEX).
 *
 * It is up to the caller to make sure the given list is correctly ordered.
3870 3871 3872 3873 3874 3875 3876
 *
 * We deliberately do not change rd_indexattr here: even when operating
 * with a temporary partial index list, HOT-update decisions must be made
 * correctly with respect to the full index set.  It is up to the caller
 * to ensure that a correct rd_indexattr set has been cached before first
 * calling RelationSetIndexList; else a subsequent inquiry might cause a
 * wrong rd_indexattr set to get computed and cached.
3877 3878
 */
void
3879
RelationSetIndexList(Relation relation, List *indexIds, Oid oidIndex)
3880 3881 3882
{
	MemoryContext oldcxt;

3883
	Assert(relation->rd_isnailed);
3884 3885
	/* Copy the list into the cache context (could fail for lack of mem) */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3886
	indexIds = list_copy(indexIds);
3887 3888
	MemoryContextSwitchTo(oldcxt);
	/* Okay to replace old list */
3889
	list_free(relation->rd_indexlist);
3890
	relation->rd_indexlist = indexIds;
3891
	relation->rd_oidindex = oidIndex;
B
Bruce Momjian 已提交
3892
	relation->rd_indexvalid = 2;	/* mark list as forced */
3893
	/* must flag that we have a forced index list */
3894
	need_eoxact_work = true;
3895 3896
}

3897 3898 3899 3900 3901 3902 3903 3904 3905 3906 3907
/*
 * RelationGetOidIndex -- get the pg_class OID of the relation's OID index
 *
 * Returns InvalidOid if there is no such index.
 */
Oid
RelationGetOidIndex(Relation relation)
{
	List	   *ilist;

	/*
B
Bruce Momjian 已提交
3908 3909 3910
	 * If relation doesn't have OIDs at all, caller is probably confused. (We
	 * could just silently return InvalidOid, but it seems better to throw an
	 * assertion.)
3911 3912 3913 3914 3915 3916 3917 3918 3919 3920 3921 3922 3923 3924
	 */
	Assert(relation->rd_rel->relhasoids);

	if (relation->rd_indexvalid == 0)
	{
		/* RelationGetIndexList does the heavy lifting. */
		ilist = RelationGetIndexList(relation);
		list_free(ilist);
		Assert(relation->rd_indexvalid != 0);
	}

	return relation->rd_oidindex;
}

3925 3926 3927 3928 3929 3930 3931 3932 3933 3934 3935 3936 3937 3938 3939 3940 3941 3942 3943 3944 3945 3946 3947 3948 3949 3950 3951 3952
/*
 * RelationGetIndexExpressions -- get the index expressions for an index
 *
 * We cache the result of transforming pg_index.indexprs into a node tree.
 * If the rel is not an index or has no expressional columns, we return NIL.
 * Otherwise, the returned tree is copied into the caller's memory context.
 * (We don't want to return a pointer to the relcache copy, since it could
 * disappear due to relcache invalidation.)
 */
List *
RelationGetIndexExpressions(Relation relation)
{
	List	   *result;
	Datum		exprsDatum;
	bool		isnull;
	char	   *exprsString;
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_indexprs)
		return (List *) copyObject(relation->rd_indexprs);

	/* Quick exit if there is nothing to do. */
	if (relation->rd_indextuple == NULL ||
		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs))
		return NIL;

	/*
B
Bruce Momjian 已提交
3953 3954 3955
	 * We build the tree we intend to return in the caller's context. After
	 * successfully completing the work, we copy it into the relcache entry.
	 * This avoids problems if we get some sort of error partway through.
3956 3957 3958 3959 3960
	 */
	exprsDatum = heap_getattr(relation->rd_indextuple,
							  Anum_pg_index_indexprs,
							  GetPgIndexDescriptor(),
							  &isnull);
3961
	Assert(!isnull);
3962
	exprsString = TextDatumGetCString(exprsDatum);
3963 3964 3965 3966
	result = (List *) stringToNode(exprsString);
	pfree(exprsString);

	/*
3967 3968 3969 3970
	 * Run the expressions through eval_const_expressions. This is not just an
	 * optimization, but is necessary, because the planner will be comparing
	 * them to similarly-processed qual clauses, and may fail to detect valid
	 * matches without this.  We don't bother with canonicalize_qual, however.
3971
	 */
3972
	result = (List *) eval_const_expressions(NULL, (Node *) result);
3973

3974 3975 3976 3977 3978 3979
	/*
	 * Also mark any coercion format fields as "don't care", so that the
	 * planner can match to both explicit and implicit coercions.
	 */
	set_coercionform_dontcare((Node *) result);

3980 3981 3982 3983
	/* May as well fix opfuncids too */
	fix_opfuncids((Node *) result);

	/* Now save a copy of the completed tree in the relcache entry. */
3984
	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
3985 3986 3987 3988 3989 3990 3991 3992 3993
	relation->rd_indexprs = (List *) copyObject(result);
	MemoryContextSwitchTo(oldcxt);

	return result;
}

/*
 * RelationGetIndexPredicate -- get the index predicate for an index
 *
3994 3995
 * We cache the result of transforming pg_index.indpred into an implicit-AND
 * node tree (suitable for ExecQual).
3996 3997 3998 3999 4000 4001 4002 4003 4004 4005 4006 4007 4008 4009 4010 4011 4012 4013 4014 4015 4016 4017 4018 4019
 * If the rel is not an index or has no predicate, we return NIL.
 * Otherwise, the returned tree is copied into the caller's memory context.
 * (We don't want to return a pointer to the relcache copy, since it could
 * disappear due to relcache invalidation.)
 */
List *
RelationGetIndexPredicate(Relation relation)
{
	List	   *result;
	Datum		predDatum;
	bool		isnull;
	char	   *predString;
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_indpred)
		return (List *) copyObject(relation->rd_indpred);

	/* Quick exit if there is nothing to do. */
	if (relation->rd_indextuple == NULL ||
		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indpred))
		return NIL;

	/*
B
Bruce Momjian 已提交
4020 4021 4022
	 * We build the tree we intend to return in the caller's context. After
	 * successfully completing the work, we copy it into the relcache entry.
	 * This avoids problems if we get some sort of error partway through.
4023 4024 4025 4026 4027
	 */
	predDatum = heap_getattr(relation->rd_indextuple,
							 Anum_pg_index_indpred,
							 GetPgIndexDescriptor(),
							 &isnull);
4028
	Assert(!isnull);
4029
	predString = TextDatumGetCString(predDatum);
4030 4031 4032 4033
	result = (List *) stringToNode(predString);
	pfree(predString);

	/*
4034 4035 4036 4037 4038
	 * Run the expression through const-simplification and canonicalization.
	 * This is not just an optimization, but is necessary, because the planner
	 * will be comparing it to similarly-processed qual clauses, and may fail
	 * to detect valid matches without this.  This must match the processing
	 * done to qual clauses in preprocess_expression()!  (We can skip the
B
Bruce Momjian 已提交
4039 4040
	 * stuff involving subqueries, however, since we don't allow any in index
	 * predicates.)
4041
	 */
4042
	result = (List *) eval_const_expressions(NULL, (Node *) result);
4043

4044 4045
	result = (List *) canonicalize_qual((Expr *) result);

4046 4047 4048 4049 4050 4051
	/*
	 * Also mark any coercion format fields as "don't care", so that the
	 * planner can match to both explicit and implicit coercions.
	 */
	set_coercionform_dontcare((Node *) result);

4052 4053 4054
	/* Also convert to implicit-AND format */
	result = make_ands_implicit((Expr *) result);

4055 4056 4057 4058
	/* May as well fix opfuncids too */
	fix_opfuncids((Node *) result);

	/* Now save a copy of the completed tree in the relcache entry. */
4059
	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
4060 4061 4062 4063 4064 4065
	relation->rd_indpred = (List *) copyObject(result);
	MemoryContextSwitchTo(oldcxt);

	return result;
}

4066 4067 4068 4069 4070 4071 4072 4073 4074 4075 4076
/*
 * RelationGetIndexAttrBitmap -- get a bitmap of index attribute numbers
 *
 * The result has a bit set for each attribute used anywhere in the index
 * definitions of all the indexes on this relation.  (This includes not only
 * simple index keys, but attributes used in expressions and partial-index
 * predicates.)
 *
 * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
 * we can include system attributes (e.g., OID) in the bitmap representation.
 *
4077 4078 4079 4080
 * Caller had better hold at least RowExclusiveLock on the target relation
 * to ensure that it has a stable set of indexes.  This also makes it safe
 * (deadlock-free) for us to take locks on the relation's indexes.
 *
4081 4082 4083 4084 4085 4086
 * The returned result is palloc'd in the caller's memory context and should
 * be bms_free'd when not needed anymore.
 */
Bitmapset *
RelationGetIndexAttrBitmap(Relation relation)
{
B
Bruce Momjian 已提交
4087 4088 4089
	Bitmapset  *indexattrs;
	List	   *indexoidlist;
	ListCell   *l;
4090 4091 4092 4093 4094 4095 4096 4097 4098 4099 4100 4101 4102 4103 4104 4105 4106 4107 4108 4109 4110
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_indexattr != NULL)
		return bms_copy(relation->rd_indexattr);

	/* Fast path if definitely no indexes */
	if (!RelationGetForm(relation)->relhasindex)
		return NULL;

	/*
	 * Get cached list of index OIDs
	 */
	indexoidlist = RelationGetIndexList(relation);

	/* Fall out if no indexes (but relhasindex was set) */
	if (indexoidlist == NIL)
		return NULL;

	/*
	 * For each index, add referenced attributes to indexattrs.
4111 4112 4113 4114 4115
	 *
	 * Note: we consider all indexes returned by RelationGetIndexList, even if
	 * they are not indisready or indisvalid.  This is important because an
	 * index for which CREATE INDEX CONCURRENTLY has just started must be
	 * included in HOT-safety decisions (see README.HOT).
4116 4117 4118 4119 4120 4121 4122
	 */
	indexattrs = NULL;
	foreach(l, indexoidlist)
	{
		Oid			indexOid = lfirst_oid(l);
		Relation	indexDesc;
		IndexInfo  *indexInfo;
B
Bruce Momjian 已提交
4123
		int			i;
4124 4125 4126 4127 4128 4129 4130 4131 4132

		indexDesc = index_open(indexOid, AccessShareLock);

		/* Extract index key information from the index's pg_index row */
		indexInfo = BuildIndexInfo(indexDesc);

		/* Collect simple attribute references */
		for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
		{
B
Bruce Momjian 已提交
4133
			int			attrnum = indexInfo->ii_KeyAttrNumbers[i];
4134 4135 4136

			if (attrnum != 0)
				indexattrs = bms_add_member(indexattrs,
B
Bruce Momjian 已提交
4137
							   attrnum - FirstLowInvalidHeapAttributeNumber);
4138 4139 4140 4141 4142 4143 4144 4145 4146 4147 4148 4149 4150 4151 4152 4153 4154 4155 4156 4157 4158 4159
		}

		/* Collect all attributes used in expressions, too */
		pull_varattnos((Node *) indexInfo->ii_Expressions, &indexattrs);

		/* Collect all attributes in the index predicate, too */
		pull_varattnos((Node *) indexInfo->ii_Predicate, &indexattrs);

		index_close(indexDesc, AccessShareLock);
	}

	list_free(indexoidlist);

	/* Now save a copy of the bitmap in the relcache entry. */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_indexattr = bms_copy(indexattrs);
	MemoryContextSwitchTo(oldcxt);

	/* We return our original working copy for caller to play with */
	return indexattrs;
}

4160

4161
/*
4162
 *	load_relcache_init_file, write_relcache_init_file
4163
 *
4164 4165 4166
 *		In late 1992, we started regularly having databases with more than
 *		a thousand classes in them.  With this number of classes, it became
 *		critical to do indexed lookups on the system catalogs.
4167
 *
4168 4169 4170 4171
 *		Bootstrapping these lookups is very hard.  We want to be able to
 *		use an index on pg_attribute, for example, but in order to do so,
 *		we must have read pg_attribute for the attributes in the index,
 *		which implies that we need to use the index.
4172
 *
4173
 *		In order to get around the problem, we do the following:
4174
 *
4175
 *		   +  When the database system is initialized (at initdb time), we
4176
 *			  don't use indexes.  We do sequential scans.
4177
 *
4178 4179 4180
 *		   +  When the backend is started up in normal mode, we load an image
 *			  of the appropriate relation descriptors, in internal format,
 *			  from an initialization file in the data/base/... directory.
4181
 *
4182
 *		   +  If the initialization file isn't there, then we create the
4183
 *			  relation descriptors using sequential scans and write 'em to
4184
 *			  the initialization file for use by subsequent backends.
4185
 *
4186
 *		We could dispense with the initialization files and just build the
4187
 *		critical reldescs the hard way on every backend startup, but that
4188 4189 4190 4191 4192 4193
 *		slows down backend startup noticeably.
 *
 *		We can in fact go further, and save more relcache entries than
 *		just the ones that are absolutely critical; this allows us to speed
 *		up backend startup by not having to build such entries the hard way.
 *		Presently, all the catalog and index entries that are referred to
4194
 *		by catcaches are stored in the initialization files.
4195
 *
T
Tom Lane 已提交
4196 4197
 *		The same mechanism that detects when catcache and relcache entries
 *		need to be invalidated (due to catalog updates) also arranges to
4198 4199
 *		unlink the initialization files when the contents may be out of date.
 *		The files will then be rebuilt during the next backend startup.
4200 4201
 */

4202 4203 4204 4205
/*
 * load_relcache_init_file -- attempt to load cache from the init file
 *
 * If successful, return TRUE and set criticalRelcachesBuilt to true.
4206
 * If not successful, return FALSE.
4207 4208 4209 4210
 *
 * NOTE: we assume we are already switched into CacheMemoryContext.
 */
static bool
4211
load_relcache_init_file(bool shared)
4212
{
4213 4214 4215 4216 4217 4218 4219
	FILE	   *fp;
	char		initfilename[MAXPGPATH];
	Relation   *rels;
	int			relno,
				num_rels,
				max_rels,
				nailed_rels,
4220 4221
				nailed_indexes,
				magic;
4222
	int			i;
4223

4224 4225 4226 4227 4228 4229
	if (shared)
		snprintf(initfilename, sizeof(initfilename), "global/%s",
				 RELCACHE_INIT_FILENAME);
	else
		snprintf(initfilename, sizeof(initfilename), "%s/%s",
				 DatabasePath, RELCACHE_INIT_FILENAME);
4230 4231 4232 4233

	fp = AllocateFile(initfilename, PG_BINARY_R);
	if (fp == NULL)
		return false;
4234

4235
	/*
B
Bruce Momjian 已提交
4236 4237 4238
	 * Read the index relcache entries from the file.  Note we will not enter
	 * any of them into the cache if the read fails partway through; this
	 * helps to guard against broken init files.
4239 4240 4241 4242 4243 4244 4245
	 */
	max_rels = 100;
	rels = (Relation *) palloc(max_rels * sizeof(Relation));
	num_rels = 0;
	nailed_rels = nailed_indexes = 0;
	initFileRelationIds = NIL;

4246 4247 4248 4249 4250 4251
	/* check for correct magic number (compatible version) */
	if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
		goto read_failed;
	if (magic != RELCACHE_INIT_FILEMAGIC)
		goto read_failed;

B
Bruce Momjian 已提交
4252
	for (relno = 0;; relno++)
4253
	{
4254 4255 4256 4257
		Size		len;
		size_t		nread;
		Relation	rel;
		Form_pg_class relform;
4258
		bool		has_not_null;
4259

4260
		/* first read the relation descriptor length */
4261 4262 4263 4264
		if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
		{
			if (nread == 0)
				break;			/* end of file */
4265
			goto read_failed;
4266
		}
4267

4268 4269
		/* safety check for incompatible relcache layout */
		if (len != sizeof(RelationData))
4270
			goto read_failed;
4271

4272 4273 4274 4275 4276 4277
		/* allocate another relcache header */
		if (num_rels >= max_rels)
		{
			max_rels *= 2;
			rels = (Relation *) repalloc(rels, max_rels * sizeof(Relation));
		}
4278

4279
		rel = rels[num_rels++] = (Relation) palloc(len);
4280

4281 4282
		/* then, read the Relation structure */
		if ((nread = fread(rel, 1, len, fp)) != len)
4283
			goto read_failed;
4284 4285

		/* next read the relation tuple form */
4286
		if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
4287
			goto read_failed;
4288 4289

		relform = (Form_pg_class) palloc(len);
4290
		if ((nread = fread(relform, 1, len, fp)) != len)
4291
			goto read_failed;
4292

4293
		rel->rd_rel = relform;
4294 4295

		/* initialize attribute tuple forms */
4296 4297
		rel->rd_att = CreateTemplateTupleDesc(relform->relnatts,
											  relform->relhasoids);
4298 4299
		rel->rd_att->tdrefcount = 1;	/* mark as refcounted */

4300
		rel->rd_att->tdtypeid = relform->reltype;
B
Bruce Momjian 已提交
4301
		rel->rd_att->tdtypmod = -1;		/* unnecessary, but... */
4302 4303

		/* next read all the attribute tuple form data entries */
4304
		has_not_null = false;
4305 4306
		for (i = 0; i < relform->relnatts; i++)
		{
4307
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
4308
				goto read_failed;
4309
			if (len != ATTRIBUTE_FIXED_PART_SIZE)
4310
				goto read_failed;
4311
			if ((nread = fread(rel->rd_att->attrs[i], 1, len, fp)) != len)
4312
				goto read_failed;
4313 4314 4315 4316

			has_not_null |= rel->rd_att->attrs[i]->attnotnull;
		}

B
Bruce Momjian 已提交
4317 4318 4319 4320 4321 4322 4323 4324
		/* next read the access method specific field */
		if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
			goto read_failed;
		if (len > 0)
		{
			rel->rd_options = palloc(len);
			if ((nread = fread(rel->rd_options, 1, len, fp)) != len)
				goto read_failed;
4325
			if (len != VARSIZE(rel->rd_options))
B
Bruce Momjian 已提交
4326
				goto read_failed;		/* sanity check */
B
Bruce Momjian 已提交
4327 4328 4329 4330 4331 4332
		}
		else
		{
			rel->rd_options = NULL;
		}

4333 4334 4335 4336 4337 4338 4339
		/* mark not-null status */
		if (has_not_null)
		{
			TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

			constr->has_not_null = true;
			rel->rd_att->constr = constr;
4340 4341
		}

4342 4343 4344 4345 4346
		/* If it's an index, there's more to do */
		if (rel->rd_rel->relkind == RELKIND_INDEX)
		{
			Form_pg_am	am;
			MemoryContext indexcxt;
4347 4348
			Oid		   *opfamily;
			Oid		   *opcintype;
4349 4350
			Oid		   *operator;
			RegProcedure *support;
4351
			int			nsupport;
4352
			int16	   *indoption;
4353 4354 4355 4356 4357

			/* Count nailed indexes to ensure we have 'em all */
			if (rel->rd_isnailed)
				nailed_indexes++;

4358
			/* next, read the pg_index tuple */
4359 4360
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;
4361

4362 4363
			rel->rd_indextuple = (HeapTuple) palloc(len);
			if ((nread = fread(rel->rd_indextuple, 1, len, fp)) != len)
4364
				goto read_failed;
4365

4366 4367 4368 4369
			/* Fix up internal pointers in the tuple -- see heap_copytuple */
			rel->rd_indextuple->t_data = (HeapTupleHeader) ((char *) rel->rd_indextuple + HEAPTUPLESIZE);
			rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);

4370 4371 4372
			/* next, read the access method tuple form */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;
4373

4374 4375 4376 4377
			am = (Form_pg_am) palloc(len);
			if ((nread = fread(am, 1, len, fp)) != len)
				goto read_failed;
			rel->rd_am = am;
4378

4379 4380 4381 4382 4383 4384
			/*
			 * prepare index info context --- parameters should match
			 * RelationInitIndexAccessInfo
			 */
			indexcxt = AllocSetContextCreate(CacheMemoryContext,
											 RelationGetRelationName(rel),
4385 4386 4387
											 ALLOCSET_SMALL_MINSIZE,
											 ALLOCSET_SMALL_INITSIZE,
											 ALLOCSET_SMALL_MAXSIZE);
4388 4389
			rel->rd_indexcxt = indexcxt;

4390 4391 4392 4393 4394 4395 4396 4397 4398 4399 4400 4401 4402 4403 4404 4405 4406 4407 4408 4409
			/* next, read the vector of opfamily OIDs */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			opfamily = (Oid *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(opfamily, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_opfamily = opfamily;

			/* next, read the vector of opcintype OIDs */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			opcintype = (Oid *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(opcintype, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_opcintype = opcintype;

4410 4411 4412 4413 4414 4415 4416 4417 4418
			/* next, read the vector of operator OIDs */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			operator = (Oid *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(operator, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_operator = operator;
4419

4420
			/* next, read the vector of support procedures */
4421 4422 4423 4424 4425 4426 4427 4428
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;
			support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(support, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_support = support;

4429 4430 4431 4432 4433 4434 4435 4436 4437 4438
			/* finally, read the vector of indoption values */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			indoption = (int16 *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(indoption, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_indoption = indoption;

4439 4440 4441
			/* set up zeroed fmgr-info vectors */
			rel->rd_aminfo = (RelationAmInfo *)
				MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));
4442 4443
			nsupport = relform->relnatts * am->amsupport;
			rel->rd_supportinfo = (FmgrInfo *)
4444
				MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
4445 4446 4447 4448 4449 4450 4451 4452
		}
		else
		{
			/* Count nailed rels to ensure we have 'em all */
			if (rel->rd_isnailed)
				nailed_rels++;

			Assert(rel->rd_index == NULL);
4453
			Assert(rel->rd_indextuple == NULL);
4454 4455
			Assert(rel->rd_am == NULL);
			Assert(rel->rd_indexcxt == NULL);
4456
			Assert(rel->rd_aminfo == NULL);
4457 4458
			Assert(rel->rd_opfamily == NULL);
			Assert(rel->rd_opcintype == NULL);
4459 4460 4461
			Assert(rel->rd_operator == NULL);
			Assert(rel->rd_support == NULL);
			Assert(rel->rd_supportinfo == NULL);
4462
			Assert(rel->rd_indoption == NULL);
4463 4464 4465 4466
		}

		/*
		 * Rules and triggers are not saved (mainly because the internal
B
Bruce Momjian 已提交
4467
		 * format is complex and subject to change).  They must be rebuilt if
4468
		 * needed by RelationCacheInitializePhase3.  This is not expected to
4469 4470
		 * be a big performance hit since few system catalogs have such. Ditto
		 * for index expressions and predicates.
4471 4472 4473 4474
		 */
		rel->rd_rules = NULL;
		rel->rd_rulescxt = NULL;
		rel->trigdesc = NULL;
4475 4476
		rel->rd_indexprs = NIL;
		rel->rd_indpred = NIL;
4477 4478 4479 4480

		/*
		 * Reset transient-state fields in the relcache entry
		 */
4481
		rel->rd_smgr = NULL;
4482 4483
		rel->rd_targblock = InvalidBlockNumber;
		if (rel->rd_isnailed)
4484
			rel->rd_refcnt = 1;
4485
		else
4486
			rel->rd_refcnt = 0;
4487
		rel->rd_indexvalid = 0;
4488
		rel->rd_indexlist = NIL;
4489
		rel->rd_indexattr = NULL;
4490
		rel->rd_oidindex = InvalidOid;
4491
		rel->rd_createSubid = InvalidSubTransactionId;
4492
		rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
4493
		rel->rd_amcache = NULL;
4494
		MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
4495 4496
        rel->rd_cdbpolicy = NULL;
        rel->rd_cdbDefaultStatsWarningIssued = false;
4497

4498
		/*
4499
		 * Recompute lock and physical addressing info.  This is needed in
B
Bruce Momjian 已提交
4500 4501
		 * case the pg_internal.init file was copied from some other database
		 * by CREATE DATABASE.
4502 4503
		 */
		RelationInitLockInfo(rel);
4504
		RelationInitPhysicalAddr(rel);
4505 4506 4507
	}

	/*
B
Bruce Momjian 已提交
4508 4509 4510
	 * We reached the end of the init file without apparent problem. Did we
	 * get the right number of nailed items?  (This is a useful crosscheck in
	 * case the set of critical rels or indexes changes.)
4511
	 */
4512 4513 4514 4515 4516 4517 4518 4519 4520 4521 4522 4523 4524
	if (shared)
	{
		if (nailed_rels != NUM_CRITICAL_SHARED_RELS ||
			nailed_indexes != NUM_CRITICAL_SHARED_INDEXES)
			goto read_failed;
	}
	else
	{
		if (nailed_rels != NUM_CRITICAL_LOCAL_RELS ||
			nailed_indexes != NUM_CRITICAL_LOCAL_INDEXES)
			goto read_failed;
	}

4525 4526 4527 4528 4529 4530 4531 4532 4533 4534

	/*
	 * OK, all appears well.
	 *
	 * Now insert all the new relcache entries into the cache.
	 */
	for (relno = 0; relno < num_rels; relno++)
	{
		RelationCacheInsert(rels[relno]);
		/* also make a list of their OIDs, for RelationIdIsInInitFile */
4535 4536 4537
		if (!shared)
			initFileRelationIds = lcons_oid(RelationGetRelid(rels[relno]),
											initFileRelationIds);
4538
	}
4539

4540 4541 4542
	pfree(rels);
	FreeFile(fp);

4543 4544 4545 4546
	if (shared)
		criticalSharedRelcachesBuilt = true;
	else
		criticalRelcachesBuilt = true;
4547
	return true;
4548

4549
	/*
B
Bruce Momjian 已提交
4550 4551 4552
	 * init file is broken, so do it the hard way.	We don't bother trying to
	 * free the clutter we just allocated; it's not in the relcache so it
	 * won't hurt.
4553
	 */
4554
read_failed:
4555 4556 4557 4558
	pfree(rels);
	FreeFile(fp);

	return false;
4559 4560
}

4561 4562 4563 4564
/*
 * Write out a new initialization file with the current contents
 * of the relcache.
 */
4565
static void
4566
write_relcache_init_file(bool shared)
4567
{
4568
	FILE	   *fp;
4569 4570
	char		tempfilename[MAXPGPATH];
	char		finalfilename[MAXPGPATH];
4571
	int			magic;
4572
	HASH_SEQ_STATUS status;
4573
	RelIdCacheEnt *idhentry;
4574 4575
	MemoryContext oldcxt;
	int			i;
4576 4577

	/*
4578
	 * We must write a temporary file and rename it into place. Otherwise,
B
Bruce Momjian 已提交
4579 4580
	 * another backend starting at about the same time might crash trying to
	 * read the partially-complete file.
4581
	 */
4582 4583 4584 4585 4586 4587 4588 4589 4590 4591 4592 4593 4594 4595
	if (shared)
	{
		snprintf(tempfilename, sizeof(tempfilename), "global/%s.%d",
				 RELCACHE_INIT_FILENAME, MyProcPid);
		snprintf(finalfilename, sizeof(finalfilename), "global/%s",
				 RELCACHE_INIT_FILENAME);
	}
	else
	{
		snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
				 DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
		snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
				 DatabasePath, RELCACHE_INIT_FILENAME);
	}
4596

4597 4598 4599 4600
	unlink(tempfilename);		/* in case it exists w/wrong permissions */

	fp = AllocateFile(tempfilename, PG_BINARY_W);
	if (fp == NULL)
4601 4602 4603 4604 4605
	{
		/*
		 * We used to consider this a fatal error, but we might as well
		 * continue with backend startup ...
		 */
4606 4607
		ereport(WARNING,
				(errcode_for_file_access(),
4608
				 errmsg("could not create relation-cache initialization file \"%s\": %m",
4609
						tempfilename),
B
Bruce Momjian 已提交
4610
			  errdetail("Continuing anyway, but there's something wrong.")));
4611 4612
		return;
	}
4613

4614
	/*
B
Bruce Momjian 已提交
4615
	 * Write a magic number to serve as a file version identifier.	We can
4616 4617 4618 4619 4620 4621
	 * change the magic number whenever the relcache layout changes.
	 */
	magic = RELCACHE_INIT_FILEMAGIC;
	if (fwrite(&magic, 1, sizeof(magic), fp) != sizeof(magic))
		elog(FATAL, "could not write init file");

4622
	/*
4623
	 * Write all the reldescs (in no particular order).
H
Hiroshi Inoue 已提交
4624
	 */
4625
	hash_seq_init(&status, RelationIdCache);
4626

4627
	initFileRelationIds = NIL;
4628

4629
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
4630
	{
4631
		Relation	rel = idhentry->reldesc;
4632
		Form_pg_class relform = rel->rd_rel;
4633

4634 4635 4636 4637
		/* ignore if not correct group */
		if (relform->relisshared != shared)
			continue;

B
Bruce Momjian 已提交
4638 4639
		/* first write the relcache entry proper */
		write_item(rel, sizeof(RelationData), fp);
4640 4641

		/* next write the relation tuple form */
B
Bruce Momjian 已提交
4642
		write_item(relform, CLASS_TUPLE_SIZE, fp);
4643 4644 4645 4646

		/* next, do all the attribute tuple form data entries */
		for (i = 0; i < relform->relnatts; i++)
		{
4647
			write_item(rel->rd_att->attrs[i], ATTRIBUTE_FIXED_PART_SIZE, fp);
4648 4649
		}

B
Bruce Momjian 已提交
4650 4651
		/* next, do the access method specific field */
		write_item(rel->rd_options,
4652
				   (rel->rd_options ? VARSIZE(rel->rd_options) : 0),
4653
				   fp);
B
Bruce Momjian 已提交
4654

4655 4656 4657 4658
		/* If it's an index, there's more to do */
		if (rel->rd_rel->relkind == RELKIND_INDEX)
		{
			Form_pg_am	am = rel->rd_am;
4659

4660 4661
			/* write the pg_index tuple */
			/* we assume this was created by heap_copytuple! */
B
Bruce Momjian 已提交
4662
			write_item(rel->rd_indextuple,
4663 4664
					   HEAPTUPLESIZE + rel->rd_indextuple->t_len,
					   fp);
4665 4666

			/* next, write the access method tuple form */
B
Bruce Momjian 已提交
4667
			write_item(am, sizeof(FormData_pg_am), fp);
4668

4669 4670 4671 4672 4673 4674 4675 4676 4677 4678
			/* next, write the vector of opfamily OIDs */
			write_item(rel->rd_opfamily,
					   relform->relnatts * sizeof(Oid),
					   fp);

			/* next, write the vector of opcintype OIDs */
			write_item(rel->rd_opcintype,
					   relform->relnatts * sizeof(Oid),
					   fp);

4679
			/* next, write the vector of operator OIDs */
4680 4681 4682
			write_item(rel->rd_operator,
					   relform->relnatts * (am->amstrategies * sizeof(Oid)),
					   fp);
4683

4684
			/* next, write the vector of support procedures */
4685
			write_item(rel->rd_support,
B
Bruce Momjian 已提交
4686
				  relform->relnatts * (am->amsupport * sizeof(RegProcedure)),
4687
					   fp);
4688 4689 4690 4691 4692

			/* finally, write the vector of indoption values */
			write_item(rel->rd_indoption,
					   relform->relnatts * sizeof(int16),
					   fp);
4693
		}
4694

4695
		/* also make a list of their OIDs, for RelationIdIsInInitFile */
4696 4697 4698 4699 4700 4701 4702
		if (!shared)
		{
			oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
			initFileRelationIds = lcons_oid(RelationGetRelid(rel),
											initFileRelationIds);
			MemoryContextSwitchTo(oldcxt);
		}
4703
	}
4704

4705 4706
	if (FreeFile(fp))
		elog(FATAL, "could not write init file");
4707

4708
	/*
4709
	 * Now we have to check whether the data we've so painstakingly
B
Bruce Momjian 已提交
4710 4711 4712 4713 4714
	 * accumulated is already obsolete due to someone else's just-committed
	 * catalog changes.  If so, we just delete the temp file and leave it to
	 * the next backend to try again.  (Our own relcache entries will be
	 * updated by SI message processing, but we can't be sure whether what we
	 * wrote out was up-to-date.)
4715
	 *
4716 4717
	 * This mustn't run concurrently with the code that unlinks an init file
	 * and sends SI messages, so grab a serialization lock for the duration.
4718
	 */
4719 4720 4721 4722 4723 4724
	LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);

	/* Make sure we have seen all incoming SI messages */
	AcceptInvalidationMessages();

	/*
B
Bruce Momjian 已提交
4725 4726
	 * If we have received any SI relcache invals since backend start, assume
	 * we may have written out-of-date data.
4727 4728
	 */
	if (relcacheInvalsReceived == 0L)
4729 4730
	{
		/*
4731 4732
		 * OK, rename the temp file to its final name, deleting any
		 * previously-existing init file.
4733
		 *
4734 4735 4736 4737
		 * Note: a failure here is possible under Cygwin, if some other
		 * backend is holding open an unlinked-but-not-yet-gone init file. So
		 * treat this as a noncritical failure; just remove the useless temp
		 * file on failure.
4738
		 */
4739 4740
		if (rename(tempfilename, finalfilename) < 0)
			unlink(tempfilename);
4741 4742 4743 4744
	}
	else
	{
		/* Delete the already-obsolete temp file */
4745 4746
		unlink(tempfilename);
	}
4747 4748

	LWLockRelease(RelCacheInitLock);
4749 4750
}

4751 4752 4753 4754 4755 4756 4757 4758 4759 4760
/* write a chunk of data preceded by its length */
static void
write_item(const void *data, Size len, FILE *fp)
{
	if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
		elog(FATAL, "could not write init file");
	if (fwrite(data, 1, len, fp) != len)
		elog(FATAL, "could not write init file");
}

4761 4762 4763 4764 4765 4766 4767 4768 4769 4770 4771 4772
/*
 * Detect whether a given relation (identified by OID) is one of the ones
 * we store in the init file.
 *
 * Note that we effectively assume that all backends running in a database
 * would choose to store the same set of relations in the init file;
 * otherwise there are cases where we'd fail to detect the need for an init
 * file invalidation.  This does not seem likely to be a problem in practice.
 */
bool
RelationIdIsInInitFile(Oid relationId)
{
4773
	return list_member_oid(initFileRelationIds, relationId);
4774 4775 4776 4777 4778
}

/*
 * Invalidate (remove) the init file during commit of a transaction that
 * changed one or more of the relation cache entries that are kept in the
4779
 * local init file.
4780
 *
4781 4782 4783 4784 4785 4786 4787 4788 4789 4790 4791 4792
 * To be safe against concurrent inspection or rewriting of the init file,
 * we must take RelCacheInitLock, then remove the old init file, then send
 * the SI messages that include relcache inval for such relations, and then
 * release RelCacheInitLock.  This serializes the whole affair against
 * write_relcache_init_file, so that we can be sure that any other process
 * that's concurrently trying to create a new init file won't move an
 * already-stale version into place after we unlink.  Also, because we unlink
 * before sending the SI messages, a backend that's currently starting cannot
 * read the now-obsolete init file and then miss the SI messages that will
 * force it to update its relcache entries.  (This works because the backend
 * startup sequence gets into the sinval array before trying to load the init
 * file.)
4793
 *
4794 4795 4796
 * We take the lock and do the unlink in RelationCacheInitFilePreInvalidate,
 * then release the lock in RelationCacheInitFilePostInvalidate.  Caller must
 * send any pending SI messages between those calls.
4797 4798 4799 4800 4801 4802 4803
 *
 * Notice this deals only with the local init file, not the shared init file.
 * The reason is that there can never be a "significant" change to the
 * relcache entry of a shared relation; the most that could happen is
 * updates of noncritical fields such as relpages/reltuples.  So, while
 * it's worth updating the shared init file from time to time, it can never
 * be invalid enough to make it necessary to remove it.
4804 4805
 */
void
4806
RelationCacheInitFilePreInvalidate(void)
4807 4808 4809 4810 4811 4812
{
	char		initfilename[MAXPGPATH];

	snprintf(initfilename, sizeof(initfilename), "%s/%s",
			 DatabasePath, RELCACHE_INIT_FILENAME);

4813 4814 4815
	LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);

	if (unlink(initfilename) < 0)
4816 4817
	{
		/*
4818 4819 4820 4821
		 * The file might not be there if no backend has been started since
		 * the last removal.  But complain about failures other than ENOENT.
		 * Fortunately, it's not too late to abort the transaction if we
		 * can't get rid of the would-be-obsolete init file.
4822
		 */
4823 4824 4825 4826 4827
		if (errno != ENOENT)
			ereport(ERROR,
					(errcode_for_file_access(),
					 errmsg("could not remove cache file \"%s\": %m",
							initfilename)));
4828
	}
4829
}
4830

4831 4832 4833 4834 4835 4836
void
RelationCacheInitFilePostInvalidate(void)
{
	LWLockRelease(RelCacheInitLock);
}

4837
/*
4838 4839 4840 4841 4842 4843 4844
 * Remove the init files during postmaster startup.
 *
 * We used to keep the init files across restarts, but that is unsafe even in simple
 * crash-recovery cases as there are windows for the init files to become out-of-sync
 * with the database. So now we just remove them during startup and expect the
 * first backend launch to rebuild them. Of course, this has to happen in each
 * database of the cluster.
4845 4846
 */
void
4847 4848 4849 4850 4851 4852 4853 4854 4855 4856 4857 4858 4859 4860 4861 4862 4863 4864 4865
RelationCacheInitFileRemove(void)
{
	char		path[MAXPGPATH];

	/*
	 * We zap the shared cache file too.  In theory it can't get out of sync
	 * enough to be a problem, but in data-corruption cases, who knows ...
	 */
	snprintf(path, sizeof(path), "global/%s",
			 RELCACHE_INIT_FILENAME);
	unlink_initfile(path);

	/* Scan everything in the default tablespace */
	RelationCacheInitFileRemoveInDir("base");
}

/* Process one per-tablespace directory for RelationCacheInitFileRemove */
static void
RelationCacheInitFileRemoveInDir(const char *tblspcpath)
4866
{
4867 4868
	DIR		   *dir;
	struct dirent *de;
4869 4870
	char		initfilename[MAXPGPATH];

4871 4872 4873 4874 4875 4876 4877 4878 4879 4880 4881 4882 4883 4884 4885 4886 4887 4888 4889 4890 4891 4892 4893 4894 4895 4896 4897 4898 4899 4900 4901 4902
	/* Scan the tablespace directory to find per-database directories */
	dir = AllocateDir(tblspcpath);
	if (dir == NULL)
	{
		elog(LOG, "could not open tablespace directory \"%s\": %m",
			 tblspcpath);
		return;
	}

	while ((de = ReadDir(dir, tblspcpath)) != NULL)
	{
		if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
		{
			/* Try to remove the init file in each database */
			snprintf(initfilename, sizeof(initfilename), "%s/%s/%s",
					 tblspcpath, de->d_name, RELCACHE_INIT_FILENAME);
			unlink_initfile(initfilename);
		}
	}

	FreeDir(dir);
}

static void
unlink_initfile(const char *initfilename)
{
	if (unlink(initfilename) < 0)
	{
		/* It might not be there, but log any error other than ENOENT */
		if (errno != ENOENT)
			elog(LOG, "could not remove cache file \"%s\": %m", initfilename);
	}
4903
}