relcache.c 139.6 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * relcache.c
4
 *	  POSTGRES relation descriptor cache code
5
 *
6 7
 * Portions Copyright (c) 2005-2009, Greenplum inc.
 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
B
Add:  
Bruce Momjian 已提交
8
 * Portions Copyright (c) 1994, Regents of the University of California
9 10 11
 *
 *
 * IDENTIFICATION
12
 *	  $PostgreSQL: pgsql/src/backend/utils/cache/relcache.c,v 1.259 2007/03/29 00:15:38 tgl Exp $
13 14 15 16 17
 *
 *-------------------------------------------------------------------------
 */
/*
 * INTERFACE ROUTINES
18
 *		RelationCacheInitialize			- initialize relcache (to empty)
19 20
 *		RelationCacheInitializePhase2	- initialize shared-catalog entries
 *		RelationCacheInitializePhase3	- finish initializing relcache
21 22
 *		RelationIdGetRelation			- get a reldesc by relation id
 *		RelationClose					- close an open relation
23 24
 *
 * NOTES
25 26
 *		The following code contains many undocumented hacks.  Please be
 *		careful....
27
 */
28 29
#include "postgres.h"

30
#include <sys/file.h>
31
#include <fcntl.h>
32
#include <unistd.h>
33

34 35
#include "access/genam.h"
#include "access/heapam.h"
36
#include "access/reloptions.h"
37
#include "access/sysattr.h"
38
#include "access/xact.h"
39
#include "catalog/catalog.h"
40
#include "catalog/catquery.h"
B
Bruce Momjian 已提交
41
#include "catalog/indexing.h"
42
#include "catalog/namespace.h"
43 44
#include "catalog/pg_amop.h"
#include "catalog/pg_amproc.h"
B
Bruce Momjian 已提交
45
#include "catalog/pg_attrdef.h"
46
#include "catalog/pg_authid.h"
47
#include "catalog/pg_auth_members.h"
48
#include "catalog/pg_constraint.h"
49
#include "catalog/pg_database.h"
50
#include "catalog/pg_namespace.h"
51
#include "catalog/pg_opclass.h"
52
#include "catalog/pg_operator.h"
B
Bruce Momjian 已提交
53
#include "catalog/pg_proc.h"
54
#include "catalog/pg_rewrite.h"
55 56
#include "catalog/pg_tablespace.h"
#include "catalog/pg_trigger.h"
57
#include "catalog/pg_type.h"
58
#include "commands/trigger.h"
B
Bruce Momjian 已提交
59
#include "miscadmin.h"
60 61
#include "optimizer/clauses.h"
#include "optimizer/planmain.h"
62
#include "optimizer/prep.h"
63
#include "rewrite/rewriteDefine.h"
64
#include "storage/fd.h"
B
Bruce Momjian 已提交
65
#include "storage/smgr.h"
66
#include "utils/builtins.h"
67
#include "utils/fmgroids.h"
68
#include "utils/inval.h"
69
#include "utils/memutils.h"
B
Bruce Momjian 已提交
70
#include "utils/relcache.h"
71
#include "utils/relationnode.h"
72
#include "utils/resowner.h"
73
#include "utils/syscache.h"
74
#include "utils/typcache.h"
B
Bruce Momjian 已提交
75

76
#include "catalog/gp_policy.h"         /* GpPolicy */
77 78 79 80 81 82
#include "cdb/cdbtm.h"
#include "cdb/cdbvars.h"        /* Gp_role */
#include "cdb/cdbmirroredflatfile.h"
#include "cdb/cdbpersistentfilesysobj.h"
#include "cdb/cdbsreh.h"

83

84 85 86 87 88
/*
 * name of relcache init file, used to speed up backend startup
 */
#define RELCACHE_INIT_FILENAME	"pg_internal.init"

89
#define RELCACHE_INIT_FILEMAGIC		0x773264	/* version ID value */
90

91
/*
92
 *		hardcoded tuple descriptors.  see include/catalog/pg_attribute.h
93
 */
94 95 96 97 98 99 100 101 102
static const FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
static const FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
static const FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
static const FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
static const FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};

static const FormData_pg_attribute Desc_pg_database[Natts_pg_database] = {Schema_pg_database};
static const FormData_pg_attribute Desc_pg_authid[Natts_pg_authid] = {Schema_pg_authid};
static const FormData_pg_attribute Desc_pg_auth_members[Natts_pg_auth_members] = {Schema_pg_auth_members};
103

104
/*
105
 *		Hash tables that index the relation cache
106
 *
107 108
 *		We used to index the cache by both name and OID, but now there
 *		is only an index by OID.
109
 */
110 111 112 113 114 115
typedef struct relidcacheent
{
	Oid			reloid;
	Relation	reldesc;
} RelIdCacheEnt;

116
static HTAB *RelationIdCache;
117

118 119 120 121
/*
 * This flag is false until we have prepared the critical relcache entries
 * that are needed to do indexscans on the tables read by relcache building.
 */
B
Bruce Momjian 已提交
122
bool		criticalRelcachesBuilt = false;
123

124 125 126 127 128 129
/*
 * This flag is false until we have prepared the critical relcache entries
 * for shared catalogs (which are the tables needed for login).
 */
bool		criticalSharedRelcachesBuilt = false;

130 131
/*
 * This counter counts relcache inval events received since backend startup
B
Bruce Momjian 已提交
132
 * (but only for rels that are actually in cache).	Presently, we use it only
133 134 135 136
 * to detect whether data about to be written by write_relcache_init_file()
 * might already be obsolete.
 */
static long relcacheInvalsReceived = 0L;
137

138
/*
139 140 141 142
 * This list remembers the OIDs of the non-shared relations cached in the
 * database's local relcache init file.  Note that there is no corresponding
 * list for the shared relcache init file, for reasons explained in the
 * comments for RelationCacheInitFileRemove.
143 144
 */
static List *initFileRelationIds = NIL;
145

146
/*
147
 * This flag lets us optimize away work in AtEO(Sub)Xact_RelationCache().
148
 */
149
static bool need_eoxact_work = false;
150

151

152
/*
153
 *		macros to manipulate the lookup hashtables
154 155
 */
#define RelationCacheInsert(RELATION)	\
156
do { \
157
	RelIdCacheEnt *idhentry; bool found; \
158
	idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
159
										   (void *) &(RELATION->rd_id), \
160 161
										   HASH_ENTER, \
										   &found); \
162
	/* used to give notice if found -- now just keep quiet */ \
163 164 165
	idhentry->reldesc = RELATION; \
} while(0)

166
#define RelationIdCacheLookup(ID, RELATION) \
167
do { \
168 169
	RelIdCacheEnt *hentry; \
	hentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
170
										 (void *) &(ID), HASH_FIND,NULL); \
171
	if (hentry) \
172 173 174 175 176 177 178
		RELATION = hentry->reldesc; \
	else \
		RELATION = NULL; \
} while(0)

#define RelationCacheDelete(RELATION) \
do { \
179
	RelIdCacheEnt *idhentry; \
180
	idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
181
										   (void *) &(RELATION->rd_id), \
182
										   HASH_REMOVE, NULL); \
183
	if (idhentry == NULL) \
184
		elog(WARNING, "trying to delete a rd_id reldesc that does not exist"); \
185
} while(0)
186

187 188 189

/*
 * Special cache for opclass-related information
190
 *
191 192
 * Note: only default operators and support procs get cached, ie, those with
 * lefttype = righttype = opcintype.
193 194 195 196 197 198 199
 */
typedef struct opclasscacheent
{
	Oid			opclassoid;		/* lookup key: OID of opclass */
	bool		valid;			/* set TRUE after successful fill-in */
	StrategyNumber numStrats;	/* max # of strategies (from pg_am) */
	StrategyNumber numSupport;	/* max # of support procs (from pg_am) */
200 201
	Oid			opcfamily;		/* OID of opclass's family */
	Oid			opcintype;		/* OID of opclass's declared input type */
202
	Oid		   *operatorOids;	/* strategy operators' OIDs */
B
Bruce Momjian 已提交
203
	RegProcedure *supportProcs; /* support procs */
204 205 206 207 208
} OpClassCacheEnt;

static HTAB *OpClassCache = NULL;


209
/* non-export function prototypes */
210

211
static void RelationDestroyRelation(Relation relation);
212
static void RelationClearRelation(Relation relation, bool rebuild);
B
Bruce Momjian 已提交
213

H
Hiroshi Inoue 已提交
214
static void RelationReloadClassinfo(Relation relation);
215
static void RelationFlushRelation(Relation relation);
216 217
static bool load_relcache_init_file(bool shared);
static void write_relcache_init_file(bool shared);
B
Bruce Momjian 已提交
218
static void write_item(const void *data, Size len, FILE *fp);
219

220
static void formrdesc(const char *relationName, Oid relationReltype,
221 222
		  bool isshared, bool hasoids,
		  int natts, const FormData_pg_attribute *att);
223

224 225
static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK, Relation *pg_class_relation);
static Relation AllocateRelationDesc(Form_pg_class relp);
226
static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
227
static void RelationBuildTupleDesc(Relation relation);
228
static Relation RelationBuildDesc(Oid targetRelId, bool insertIt);
229
static void RelationInitPhysicalAddr(Relation relation);
230
static void load_critical_index(Oid indexoid, Oid heapoid);
231
static TupleDesc GetPgClassDescriptor(void);
232
static TupleDesc GetPgIndexDescriptor(void);
233
static void AttrDefaultFetch(Relation relation);
234
static void CheckConstraintFetch(Relation relation);
235
static List *insert_ordered_oid(List *list, Oid datum);
236
static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
B
Bruce Momjian 已提交
237 238
				  StrategyNumber numStrats,
				  StrategyNumber numSupport);
239 240
static void RelationCacheInitFileRemoveInDir(const char *tblspcpath);
static void unlink_initfile(const char *initfilename);
241

242

243
/*
244
 *		ScanPgRelation
245
 *
246 247 248 249 250
 *		This is used by RelationBuildDesc to find a pg_class
 *		tuple matching targetRelId.  The caller must hold at least
 *		AccessShareLock on the target relid to prevent concurrent-update
 *		scenarios --- else our SnapshotNow scan might fail to find any
 *		version that it thinks is live.
251 252 253
 *
 *		NB: the returned tuple has been copied into palloc'd storage
 *		and must eventually be freed with heap_freetuple.
254
 */
255
static HeapTuple
256
ScanPgRelation(Oid targetRelId, bool indexOK, Relation *pg_class_relation)
257
{
258 259
	HeapTuple	pg_class_tuple;
	Relation	pg_class_desc;
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393
	cqContext	cqc;

	/*
	 * If something goes wrong during backend startup, we might find ourselves
	 * trying to read pg_class before we've selected a database.  That ain't
	 * gonna work, so bail out with a useful error message.  If this happens,
	 * it probably means a relcache entry that needs to be nailed isn't.
	 */
	if (!OidIsValid(MyDatabaseId))
		elog(FATAL, "cannot read pg_class without having selected a database");

	/*
	 * form a scan key
	 */

	/*
	 * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).  The caller can also force a heap
	 * scan by setting indexOK == false.
	 */
	pg_class_desc = heap_open(RelationRelationId, AccessShareLock);

	pg_class_tuple = caql_getfirst(
			caql_syscache(
					caql_indexOK(caql_addrel(cqclr(&cqc), pg_class_desc), 
								 (indexOK && criticalRelcachesBuilt)),
					false),
			cql("SELECT * FROM pg_class "
				" WHERE oid = :1 ",
				ObjectIdGetDatum(targetRelId)));

	/*
	 * Must copy tuple before releasing buffer. -- already a copy
	 */

	/* all done */

	if (pg_class_relation == NULL)
		heap_close(pg_class_desc, AccessShareLock);
	else
		*pg_class_relation = pg_class_desc;

	return pg_class_tuple;
}

void
GpRelationNodeBeginScan(
	Snapshot	snapshot,
	Relation 	gp_relation_node,
	Oid		relationId,
	Oid 		relfilenode,
	GpRelationNodeScan 	*gpRelationNodeScan)
{
	Assert (relfilenode != 0);

	MemSet(gpRelationNodeScan, 0, sizeof(GpRelationNodeScan));

	/*
	 * form a scan key
	 */
	/* XXX XXX: break this out -- find callers - jic 2011/12/09 */
	/* maybe it's ok - return a cql context ? */

	/* XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX */
	/* no json defs for persistent tables ? */
/*
	cqxx("SELECT * FROM gp_relation_node_relfilenode "
		 " WHERE oid = :1 ",
		 ObjectIdGetDatum(relfilenode));
*/
	/* XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX */

	ScanKeyInit(&gpRelationNodeScan->scankey[0],
				Anum_gp_relation_node_relfilenode_oid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(relfilenode));

	/*
	 * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).  The caller can also force a heap
	 * scan by setting indexOK == false.
	 */
	gpRelationNodeScan->scan = \
		systable_beginscan(gp_relation_node, GpRelationNodeOidIndexId,
						   /* indexOK */ true,
						   snapshot,
						   /* nKeys */ 1, 
						   gpRelationNodeScan->scankey);

	gpRelationNodeScan->gp_relation_node = gp_relation_node;
	gpRelationNodeScan->relationId = relationId;
	gpRelationNodeScan->relfilenode = relfilenode;
}

HeapTuple
GpRelationNodeGetNext(
	GpRelationNodeScan 	*gpRelationNodeScan,
	int32				*segmentFileNum,
	ItemPointer			persistentTid,
	int64				*persistentSerialNum)
{
	HeapTuple tuple;
	
	bool			nulls[Natts_gp_relation_node];
	Datum			values[Natts_gp_relation_node];
	
	Oid actualRelationNode;
	
	int64 createMirrorDataLossTrackingSessionNum;

	tuple = systable_getnext((SysScanDesc)gpRelationNodeScan->scan);

	/*
	 * if no such tuple exists, return NULL
	 */
	if (!HeapTupleIsValid(tuple))
	{
		MemSet(persistentTid, 0, sizeof(ItemPointerData));
		*persistentSerialNum = 0;
		return tuple;
	}
	
	heap_deform_tuple(tuple, RelationGetDescr(gpRelationNodeScan->gp_relation_node), values, nulls);
		
	GpRelationNode_GetValues(
						values,
						&actualRelationNode,
						segmentFileNum,
						&createMirrorDataLossTrackingSessionNum,
						persistentTid,
						persistentSerialNum);
	if (actualRelationNode != gpRelationNodeScan->relfilenode)
394 395
		elog(FATAL, "Index on gp_relation_node broken."
			   "Mismatch in node tuple for gp_relation_node for relation %u, relfilenode %u, relation node %u",
396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411
			 gpRelationNodeScan->relationId, 
			 gpRelationNodeScan->relfilenode,
			 actualRelationNode);

	return tuple;
}


void
GpRelationNodeEndScan(
	GpRelationNodeScan 	*gpRelationNodeScan)
{
	/* all done */
	systable_endscan((SysScanDesc)gpRelationNodeScan->scan);
}

412
static HeapTuple
413 414 415 416 417 418 419 420 421 422
ScanGpRelationNodeTuple(
	Relation 	gp_relation_node,
	Oid 		relfilenode,
	int32		segmentFileNum)
{
	HeapTuple	tuple;
	SysScanDesc scan;
	ScanKeyData key[2];

	Assert (relfilenode != 0);
423

424
	/*
B
Bruce Momjian 已提交
425
	 * form a scan key
426
	 */
427 428 429 430 431 432 433 434 435 436 437

	/* XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX */
/*
	cqxx("SELECT * FROM gp_relation_node "
		 " WHERE relfilenode_oid = :1 "
		 " AND segment_file_num = :2 ",
		 ObjectIdGetDatum(relfilenode),
		 Int32GetDatum(segmentFileNum));
*/
	/* XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX */

438
	ScanKeyInit(&key[0],
439
				Anum_gp_relation_node_relfilenode_oid,
440
				BTEqualStrategyNumber, F_OIDEQ,
441 442 443 444 445
				ObjectIdGetDatum(relfilenode));
	ScanKeyInit(&key[1],
				Anum_gp_relation_node_segment_file_num,
				BTEqualStrategyNumber, F_INT4EQ,
				Int32GetDatum(segmentFileNum));
446

447
	/*
B
Bruce Momjian 已提交
448
	 * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
B
Bruce Momjian 已提交
449 450 451
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).  The caller can also force a heap
	 * scan by setting indexOK == false.
452
	 */
453 454
	scan = systable_beginscan(gp_relation_node, GpRelationNodeOidIndexId,
									   /* indexOK */ true,
455
									   SnapshotNow,
456
									   2, key);
457

458
	tuple = systable_getnext(scan);
B
Bruce Momjian 已提交
459

H
Hiroshi Inoue 已提交
460
	/*
461
	 * Must copy tuple before releasing buffer.
H
Hiroshi Inoue 已提交
462
	 */
463 464
	if (HeapTupleIsValid(tuple))
		tuple = heap_copytuple(tuple);
465

466
	/* all done */
467
	systable_endscan(scan);
468

469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516
	return tuple;
}

HeapTuple
FetchGpRelationNodeTuple(
	Relation 		gp_relation_node,
	Oid 			relfilenode,
	int32			segmentFileNum,
	ItemPointer		persistentTid,
	int64			*persistentSerialNum)
{
	HeapTuple tuple;
	
	bool			nulls[Natts_gp_relation_node];
	Datum			values[Natts_gp_relation_node];
	
	Oid actualRelationNode;
	int32 actualSegmentFileNum;

	int64 createMirrorDataLossTrackingSessionNum;

	Assert (relfilenode != 0);
	
	tuple = ScanGpRelationNodeTuple(
					gp_relation_node,
					relfilenode,
					segmentFileNum);
	
	/*
	 * if no such tuple exists, return NULL
	 */
	if (!HeapTupleIsValid(tuple))
	{
		MemSet(persistentTid, 0, sizeof(ItemPointerData));
		*persistentSerialNum = 0;
		return tuple;
	}
	
	heap_deform_tuple(tuple, RelationGetDescr(gp_relation_node), values, nulls);
		
	GpRelationNode_GetValues(
						values,
						&actualRelationNode,
						&actualSegmentFileNum,
						&createMirrorDataLossTrackingSessionNum,
						persistentTid,
						persistentSerialNum);
	
517 518 519 520 521 522 523 524
	if (actualRelationNode != relfilenode)
	{
		elog(ERROR, "Index on gp_relation_node broken."
			   "Mismatch in node tuple for gp_relation_node intended relfilenode %u, fetched relfilenode %u",
			 relfilenode,
			 actualRelationNode);
	}

525 526 527 528 529 530 531 532 533 534 535 536 537 538
	return tuple;
}

/*
 * Deletes the gp relation node entry for the
 * given segment file.
 */ 
void
DeleteGpRelationNodeTuple(
	Relation 	relation,
	int32		segmentFileNum)
{
	Relation	gp_relation_node;
	HeapTuple	tuple;
539 540
	ItemPointerData     persistentTid;
	int64               persistentSerialNum;
541 542 543

	gp_relation_node = heap_open(GpRelationNodeRelationId, RowExclusiveLock);

544 545 546 547 548 549
	tuple = FetchGpRelationNodeTuple(gp_relation_node,
				relation->rd_rel->relfilenode,
				segmentFileNum,
				&persistentTid,
				&persistentSerialNum);

550 551 552 553 554 555
	if (!HeapTupleIsValid(tuple))
		elog(ERROR, "could not find node tuple for relation %u, relation file node %u, segment file #%d",
			 RelationGetRelid(relation),
			 relation->rd_rel->relfilenode,
			 segmentFileNum);

556
	/* delete the relation tuple from gp_relation_node, and finish up */
557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723
	simple_heap_delete(gp_relation_node, &tuple->t_self);
	heap_freetuple(tuple);

	heap_close(gp_relation_node, RowExclusiveLock);
}

bool
ReadGpRelationNode(
	Oid 			relfilenode,
	
	int32			segmentFileNum,

	ItemPointer		persistentTid,

	int64			*persistentSerialNum)
{
	Relation gp_relation_node;
	HeapTuple tuple;
	bool found;

	MemSet(persistentTid, 0, sizeof(ItemPointerData));
	*persistentSerialNum = 0;

	gp_relation_node = heap_open(GpRelationNodeRelationId, AccessShareLock);

	tuple = FetchGpRelationNodeTuple(
						gp_relation_node,
						relfilenode,
						segmentFileNum,
						persistentTid,
						persistentSerialNum);

	/*
	 * if no such tuple exists, return NULL
	 */
	if (!HeapTupleIsValid(tuple))
	{
		found = false;
	}
	else
	{
		if (Debug_persistent_print)
		{
			TupleVisibilitySummary tupleVisibilitySummary;
			char *tupleVisibilitySummaryString;
			
			GetTupleVisibilitySummary(
									tuple,
									&tupleVisibilitySummary);
			tupleVisibilitySummaryString = GetTupleVisibilitySummaryString(&tupleVisibilitySummary);
			
			elog(Persistent_DebugPrintLevel(), 
				 "ReadGpRelationNode: For relfilenode %u, segment file #%d found persistent serial number " INT64_FORMAT ", TID %s (gp_relation_node tuple visibility: %s)",
				 relfilenode,
				 segmentFileNum,
				 *persistentSerialNum,
				 ItemPointerToString(persistentTid),
				 tupleVisibilitySummaryString);
			pfree(tupleVisibilitySummaryString);
		}

		found = true;
		heap_freetuple(tuple);
	}

	heap_close(gp_relation_node, AccessShareLock);

	return found;
}

void
RelationFetchSegFile0GpRelationNode(
	Relation relation)
{
	if (!relation->rd_segfile0_relationnodeinfo.isPresent)
	{
		if (Persistent_BeforePersistenceWork() || InRecovery)
		{
			MemSet(&relation->rd_segfile0_relationnodeinfo.persistentTid, 0, sizeof(ItemPointerData));
			relation->rd_segfile0_relationnodeinfo.persistentSerialNum = 0;
		
			relation->rd_segfile0_relationnodeinfo.isPresent = true;
			relation->rd_segfile0_relationnodeinfo.tidAllowedToBeZero = true;
			
			return; // The initdb process will load the persistent table once we out of bootstrap mode.
		}

		if (!ReadGpRelationNode(
					relation->rd_node.relNode,
					/* segmentFileNum */ 0,
					&relation->rd_segfile0_relationnodeinfo.persistentTid,
					&relation->rd_segfile0_relationnodeinfo.persistentSerialNum))
		{
			elog(ERROR, "Did not find gp_relation_node entry for relation name %s, relation id %u, relfilenode %u",
				 relation->rd_rel->relname.data,
				 relation->rd_id,
				 relation->rd_node.relNode);
		}

		Assert(!Persistent_BeforePersistenceWork());
		if (Debug_check_for_invalid_persistent_tid &&
			PersistentStore_IsZeroTid(&relation->rd_segfile0_relationnodeinfo.persistentTid))
		{	
			elog(ERROR, 
				 "RelationFetchSegFile0GpRelationNode has invalid TID (0,0) into relation %u/%u/%u '%s', serial number " INT64_FORMAT,
				 relation->rd_node.spcNode,
				 relation->rd_node.dbNode,
				 relation->rd_node.relNode,
				 NameStr(relation->rd_rel->relname),
				 relation->rd_segfile0_relationnodeinfo.persistentSerialNum);
		}

		relation->rd_segfile0_relationnodeinfo.isPresent = true;
		
	}

}

// UNDONE: Temporary
void
RelationFetchGpRelationNodeForXLog_Index(
	Relation relation)
{
	static int countInThisBackend = 0;
	static int deep = 0;
	
	deep++;

	countInThisBackend++;

	if (deep >= 2)
	{
		int saveDeep;

		if (Debug_gp_relation_node_fetch_wait_for_debugging)
		{
			/* Code for investigating MPP-16395, will be removed as part of the fix */
			elog(WARNING, "RelationFetchGpRelationNodeForXLog_Index [%d] for non-heap %u/%u/%u (deep %d) -- waiting for debug attach...",
				 countInThisBackend,
				 relation->rd_node.spcNode,
				 relation->rd_node.dbNode,
				 relation->rd_node.relNode,
				 deep);

			for (int i=0; i < 24 * 60; i++)
			{
				pg_usleep(60000000L); /* 60 sec */
			}
		}

		/*
		 * Reset counter in case the user continues to use the session.
		 */
		saveDeep = deep;
		deep = 0;

		elog(ERROR, "RelationFetchGpRelationNodeForXLog_Index [%d] for non-heap %u/%u/%u (deep %d)",
			 countInThisBackend,
			 relation->rd_node.spcNode,
			 relation->rd_node.dbNode,
			 relation->rd_node.relNode,
			 saveDeep);
	}

	RelationFetchSegFile0GpRelationNode(relation);

	deep--;
724 725
}

726
/*
727
 *		AllocateRelationDesc
728
 *
729
 *		This is used to allocate memory for a new relation descriptor
730
 *		and initialize the rd_rel field from the given pg_class tuple.
731
 */
732
static Relation
733
AllocateRelationDesc(Form_pg_class relp)
734
{
735
	Relation	relation;
736
	MemoryContext oldcxt;
737
	Form_pg_class relationForm;
738

739 740
	/* Relcache entries must live in CacheMemoryContext */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
741

742
	/*
743
	 * allocate and zero space for new relation descriptor
744
	 */
745
	relation = (Relation) palloc0(sizeof(RelationData));
746

747
	/*
748
	 * clear fields of reldesc that should initialize to something non-zero
749
	 */
750
	relation->rd_targblock = InvalidBlockNumber;
751

752
	/* make sure relation is marked as having no open file yet */
753
	relation->rd_smgr = NULL;
754

755
	/*
B
Bruce Momjian 已提交
756
	 * Copy the relation tuple form
757
	 *
B
Bruce Momjian 已提交
758 759
	 * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE. The
	 * variable-length fields (relacl, reloptions) are NOT stored in the
760 761
	 * relcache --- there'd be little point in it, since we don't copy the
	 * tuple's nulls bitmap and hence wouldn't know if the values are valid.
B
Bruce Momjian 已提交
762 763 764 765
	 * Bottom line is that relacl *cannot* be retrieved from the relcache. Get
	 * it from the syscache if you need it.  The same goes for the original
	 * form of reloptions (however, we do store the parsed form of reloptions
	 * in rd_options).
766 767
	 */
	relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
768

769
	memcpy(relationForm, relp, CLASS_TUPLE_SIZE);
770 771

	/* initialize relation tuple form */
772
	relation->rd_rel = relationForm;
773

774 775 776 777 778 779 780
	/*
	 * This part MUST be remain as a fetch on demand, otherwise you end up
	 * needing it to open pg_class and then relation_open does infinite recursion...
	 */
	relation->rd_segfile0_relationnodeinfo.isPresent = false;
	relation->rd_segfile0_relationnodeinfo.tidAllowedToBeZero = false;

781
	/* and allocate attribute tuple form storage */
782 783
	relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts,
											   relationForm->relhasoids);
784 785
	/* which we mark as a reference-counted tupdesc */
	relation->rd_att->tdrefcount = 1;
786 787 788

	MemoryContextSwitchTo(oldcxt);

789
	return relation;
790 791
}

B
Bruce Momjian 已提交
792
/*
793 794 795 796 797 798
 * RelationParseRelOptions
 *		Convert pg_class.reloptions into pre-parsed rd_options
 *
 * tuple is the real pg_class tuple (not rd_rel!) for relation
 *
 * Note: rd_rel and (if an index) rd_am must be valid already
B
Bruce Momjian 已提交
799 800
 */
static void
801
RelationParseRelOptions(Relation relation, HeapTuple tuple)
B
Bruce Momjian 已提交
802
{
803 804 805
	Datum		datum;
	bool		isnull;
	bytea	   *options;
B
Bruce Momjian 已提交
806

807
	relation->rd_options = NULL;
B
Bruce Momjian 已提交
808

809
	/* Fall out if relkind should not have options */
B
Bruce Momjian 已提交
810 811
	switch (relation->rd_rel->relkind)
	{
812 813
		case RELKIND_RELATION:
		case RELKIND_TOASTVALUE:
814 815 816
		case RELKIND_AOSEGMENTS:
		case RELKIND_AOBLOCKDIR:
		case RELKIND_AOVISIMAP:
817 818 819 820
		case RELKIND_INDEX:
			break;
		default:
			return;
B
Bruce Momjian 已提交
821 822
	}

823
	/*
B
Bruce Momjian 已提交
824 825 826
	 * Fetch reloptions from tuple; have to use a hardwired descriptor because
	 * we might not have any other for pg_class yet (consider executing this
	 * code for pg_class itself)
827 828 829 830 831 832 833
	 */
	datum = fastgetattr(tuple,
						Anum_pg_class_reloptions,
						GetPgClassDescriptor(),
						&isnull);
	if (isnull)
		return;
B
Bruce Momjian 已提交
834

835
	/* Parse into appropriate format; don't error out here */
B
Bruce Momjian 已提交
836 837
	switch (relation->rd_rel->relkind)
	{
838 839
		case RELKIND_RELATION:
		case RELKIND_TOASTVALUE:
840 841 842
		case RELKIND_AOSEGMENTS:
		case RELKIND_AOBLOCKDIR:
		case RELKIND_AOVISIMAP:
843 844 845 846 847 848 849 850 851 852 853 854 855 856
		case RELKIND_UNCATALOGED:
			options = heap_reloptions(relation->rd_rel->relkind, datum,
									  false);
			break;
		case RELKIND_INDEX:
			options = index_reloptions(relation->rd_am->amoptions, datum,
									   false);
			break;
		default:
			Assert(false);		/* can't get here */
			options = NULL;		/* keep compiler quiet */
			break;
	}

857 858 859 860 861 862
	/*
	 * Copy parsed data into CacheMemoryContext.  To guard against the
	 * possibility of leaks in the reloptions code, we want to do the actual
	 * parsing in the caller's memory context and copy the results into
	 * CacheMemoryContext after the fact.
	 */
863 864 865 866 867
	if (options)
	{
		relation->rd_options = MemoryContextAlloc(CacheMemoryContext,
												  VARSIZE(options));
		memcpy(relation->rd_options, options, VARSIZE(options));
868
		pfree(options);
B
Bruce Momjian 已提交
869 870 871
	}
}

872
/*
873
 *		RelationBuildTupleDesc
874
 *
875
 *		Form the relation's tuple descriptor from information in
876
 *		the pg_attribute, pg_attrdef & pg_constraint system catalogs.
877 878
 */
static void
879
RelationBuildTupleDesc(Relation relation)
880
{
881 882
	HeapTuple	pg_attribute_tuple;
	Relation	pg_attribute_desc;
883 884
	cqContext	cqc;
	cqContext  *pcqCtx;
885
	int			need;
886
	TupleConstr *constr;
H
Hiroshi Inoue 已提交
887
	AttrDefault *attrdef = NULL;
888
	int			ndef = 0;
889

890 891 892 893
	/* copy some fields from pg_class row to rd_att */
	relation->rd_att->tdtypeid = relation->rd_rel->reltype;
	relation->rd_att->tdtypmod = -1;	/* unnecessary, but... */
	relation->rd_att->tdhasoid = relation->rd_rel->relhasoids;
894

895 896
	constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext,
												sizeof(TupleConstr));
H
Hiroshi Inoue 已提交
897
	constr->has_not_null = false;
898

899
	/*
900
	 * Form a scan key that selects only user attributes (attnum > 0).
B
Bruce Momjian 已提交
901 902
	 * (Eliminating system attribute rows at the index level is lots faster
	 * than fetching them.)
903 904
	 */

905
	/*
B
Bruce Momjian 已提交
906 907 908
	 * Open pg_attribute and begin a scan.	Force heap scan if we haven't yet
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).
909
	 */
910
	pg_attribute_desc = heap_open(AttributeRelationId, AccessShareLock);
911 912 913 914 915 916 917 918 919 920 921

	pcqCtx = caql_beginscan(
			caql_syscache(
					caql_indexOK(caql_addrel(cqclr(&cqc), pg_attribute_desc), 
								 criticalRelcachesBuilt),
					false),
			cql("SELECT * FROM pg_attribute "
				" WHERE attrelid = :1 "
				" AND attnum > :2 ",
				ObjectIdGetDatum(RelationGetRelid(relation)),
				Int16GetDatum(0)));
922

923
	/*
B
Bruce Momjian 已提交
924
	 * add attribute data to relation->rd_att
925
	 */
926
	need = relation->rd_rel->relnatts;
927

928
	while (HeapTupleIsValid(pg_attribute_tuple = caql_getnext(pcqCtx)))
929
	{
930 931
		Form_pg_attribute attp;

932
		attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
933

934 935
		if (attp->attnum <= 0 ||
			attp->attnum > relation->rd_rel->relnatts)
936
			elog(ERROR, "invalid attribute number %d for %s",
937 938
				 attp->attnum, RelationGetRelationName(relation));

939 940
		memcpy(relation->rd_att->attrs[attp->attnum - 1],
			   attp,
941
			   ATTRIBUTE_FIXED_PART_SIZE);
942

943 944
		/* Update constraint/default info */
		if (attp->attnotnull)
945
			constr->has_not_null = true;
H
Hiroshi Inoue 已提交
946

947 948 949 950
		if (attp->atthasdef)
		{
			if (attrdef == NULL)
				attrdef = (AttrDefault *)
951 952 953
					MemoryContextAllocZero(CacheMemoryContext,
										   relation->rd_rel->relnatts *
										   sizeof(AttrDefault));
954 955 956
			attrdef[ndef].adnum = attp->attnum;
			attrdef[ndef].adbin = NULL;
			ndef++;
957
		}
958 959 960
		need--;
		if (need == 0)
			break;
961
	}
962

963
	/*
B
Bruce Momjian 已提交
964
	 * end the scan and close the attribute relation
965
	 */
966
	caql_endscan(pcqCtx);
967
	heap_close(pg_attribute_desc, AccessShareLock);
H
Hiroshi Inoue 已提交
968

969 970 971 972
	if (need != 0)
		elog(ERROR, "catalog is missing %d attribute(s) for relid %u",
			 need, RelationGetRelid(relation));

973
	/*
B
Bruce Momjian 已提交
974 975 976
	 * The attcacheoff values we read from pg_attribute should all be -1
	 * ("unknown").  Verify this if assert checking is on.	They will be
	 * computed when and if needed during tuple access.
977 978 979
	 */
#ifdef USE_ASSERT_CHECKING
	{
B
Bruce Momjian 已提交
980
		int			i;
981 982 983 984 985 986

		for (i = 0; i < relation->rd_rel->relnatts; i++)
			Assert(relation->rd_att->attrs[i]->attcacheoff == -1);
	}
#endif

987
	/*
B
Bruce Momjian 已提交
988
	 * However, we can easily set the attcacheoff value for the first
B
Bruce Momjian 已提交
989 990
	 * attribute: it must be zero.	This eliminates the need for special cases
	 * for attnum=1 that used to exist in fastgetattr() and index_getattr().
991
	 */
992 993
	if (relation->rd_rel->relnatts > 0)
		relation->rd_att->attrs[0]->attcacheoff = 0;
994

995 996 997 998
	/*
	 * Set up constraint/default info
	 */
	if (constr->has_not_null || ndef > 0 || relation->rd_rel->relchecks)
999
	{
1000
		relation->rd_att->constr = constr;
1001

1002
		if (ndef > 0)			/* DEFAULTs */
1003
		{
1004 1005 1006 1007 1008 1009 1010
			if (ndef < relation->rd_rel->relnatts)
				constr->defval = (AttrDefault *)
					repalloc(attrdef, ndef * sizeof(AttrDefault));
			else
				constr->defval = attrdef;
			constr->num_defval = ndef;
			AttrDefaultFetch(relation);
1011
		}
1012 1013
		else
			constr->num_defval = 0;
1014

1015
		if (relation->rd_rel->relchecks > 0)	/* CHECKs */
1016
		{
1017 1018
			constr->num_check = relation->rd_rel->relchecks;
			constr->check = (ConstrCheck *)
1019
				MemoryContextAllocZero(CacheMemoryContext,
B
Bruce Momjian 已提交
1020
									constr->num_check * sizeof(ConstrCheck));
1021
			CheckConstraintFetch(relation);
1022
		}
1023 1024 1025 1026 1027 1028 1029
		else
			constr->num_check = 0;
	}
	else
	{
		pfree(constr);
		relation->rd_att->constr = NULL;
1030
	}
1031 1032
}

1033
/*
1034
 *		RelationBuildRuleLock
1035
 *
1036 1037
 *		Form the relation's rewrite rules from information in
 *		the pg_rewrite system catalog.
1038 1039 1040 1041 1042 1043 1044
 *
 * Note: The rule parsetrees are potentially very complex node structures.
 * To allow these trees to be freed when the relcache entry is flushed,
 * we make a private memory context to hold the RuleLock information for
 * each relcache entry that has associated rules.  The context is used
 * just for rule info, not for any other subsidiary data of the relcache
 * entry, because that keeps the update logic in RelationClearRelation()
B
Bruce Momjian 已提交
1045
 * manageable.	The other subsidiary data structures are simple enough
1046
 * to be easy to free explicitly, anyway.
1047 1048 1049 1050
 */
static void
RelationBuildRuleLock(Relation relation)
{
1051 1052
	MemoryContext rulescxt;
	MemoryContext oldcxt;
1053 1054 1055
	HeapTuple	rewrite_tuple;
	Relation	rewrite_desc;
	TupleDesc	rewrite_tupdesc;
1056 1057
	cqContext	cqc;
	cqContext  *pcqCtx;
1058 1059 1060 1061
	RuleLock   *rulelock;
	int			numlocks;
	RewriteRule **rules;
	int			maxlocks;
1062

1063
	/*
B
Bruce Momjian 已提交
1064 1065
	 * Make the private context.  Parameters are set on the assumption that
	 * it'll probably not contain much data.
1066 1067 1068
	 */
	rulescxt = AllocSetContextCreate(CacheMemoryContext,
									 RelationGetRelationName(relation),
1069 1070 1071
									 ALLOCSET_SMALL_MINSIZE,
									 ALLOCSET_SMALL_INITSIZE,
									 ALLOCSET_SMALL_MAXSIZE);
1072 1073
	relation->rd_rulescxt = rulescxt;

1074
	/*
B
Bruce Momjian 已提交
1075 1076
	 * allocate an array to hold the rewrite rules (the array is extended if
	 * necessary)
1077 1078
	 */
	maxlocks = 4;
1079 1080
	rules = (RewriteRule **)
		MemoryContextAlloc(rulescxt, sizeof(RewriteRule *) * maxlocks);
1081 1082
	numlocks = 0;

1083
	/*
B
Bruce Momjian 已提交
1084
	 * open pg_rewrite and begin a scan
1085
	 *
1086 1087
	 * Note: since we scan the rules using RewriteRelRulenameIndexId, we will
	 * be reading the rules in name order, except possibly during
B
Bruce Momjian 已提交
1088 1089
	 * emergency-recovery operations (ie, IgnoreSystemIndexes). This in turn
	 * ensures that rules will be fired in name order.
1090
	 */
1091
	rewrite_desc = heap_open(RewriteRelationId, AccessShareLock);
1092 1093
	rewrite_tupdesc = RelationGetDescr(rewrite_desc);

1094 1095 1096 1097 1098 1099 1100 1101 1102 1103
	pcqCtx = caql_beginscan(
			caql_syscache(
					caql_indexOK(caql_addrel(cqclr(&cqc), rewrite_desc), 
								 true),
					false),
			cql("SELECT * FROM pg_rewrite "
				" WHERE ev_class = :1 ",
				ObjectIdGetDatum(RelationGetRelid(relation))));

	while (HeapTupleIsValid(rewrite_tuple = caql_getnext(pcqCtx)))
1104
	{
1105
		Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
1106
		bool		isnull;
1107 1108
		Datum		rule_datum;
		char	   *rule_str;
1109
		RewriteRule *rule;
1110

1111 1112
		rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
												  sizeof(RewriteRule));
1113

1114
		rule->ruleId = HeapTupleGetOid(rewrite_tuple);
1115

1116 1117 1118 1119
		rule->event = rewrite_form->ev_type - '0';
		rule->attrno = rewrite_form->ev_attr;
		rule->isInstead = rewrite_form->is_instead;

1120
		/*
B
Bruce Momjian 已提交
1121 1122 1123 1124
		 * Must use heap_getattr to fetch ev_action and ev_qual.  Also, the
		 * rule strings are often large enough to be toasted.  To avoid
		 * leaking memory in the caller's context, do the detoasting here so
		 * we can free the detoasted version.
1125 1126
		 */
		rule_datum = heap_getattr(rewrite_tuple,
1127
								  Anum_pg_rewrite_ev_action,
1128
								  rewrite_tupdesc,
B
Bruce Momjian 已提交
1129
								  &isnull);
B
Bruce Momjian 已提交
1130
		Assert(!isnull);
1131
		rule_str = TextDatumGetCString(rule_datum);
1132
		oldcxt = MemoryContextSwitchTo(rulescxt);
1133
		rule->actions = (List *) stringToNode(rule_str);
1134
		MemoryContextSwitchTo(oldcxt);
1135
		pfree(rule_str);
1136

1137 1138 1139 1140
		rule_datum = heap_getattr(rewrite_tuple,
								  Anum_pg_rewrite_ev_qual,
								  rewrite_tupdesc,
								  &isnull);
B
Bruce Momjian 已提交
1141
		Assert(!isnull);
1142
		rule_str = TextDatumGetCString(rule_datum);
1143
		oldcxt = MemoryContextSwitchTo(rulescxt);
1144
		rule->qual = (Node *) stringToNode(rule_str);
1145
		MemoryContextSwitchTo(oldcxt);
1146
		pfree(rule_str);
1147

1148 1149
		/*
		 * We want the rule's table references to be checked as though by the
B
Bruce Momjian 已提交
1150
		 * table owner, not the user referencing the rule.	Therefore, scan
1151
		 * through the rule's actions and set the checkAsUser field on all
B
Bruce Momjian 已提交
1152
		 * rtable entries.	We have to look at the qual as well, in case it
1153 1154
		 * contains sublinks.
		 *
B
Bruce Momjian 已提交
1155 1156 1157 1158 1159
		 * The reason for doing this when the rule is loaded, rather than when
		 * it is stored, is that otherwise ALTER TABLE OWNER would have to
		 * grovel through stored rules to update checkAsUser fields. Scanning
		 * the rule tree during load is relatively cheap (compared to
		 * constructing it in the first place), so we do it here.
1160 1161 1162 1163
		 */
		setRuleCheckAsUser((Node *) rule->actions, relation->rd_rel->relowner);
		setRuleCheckAsUser(rule->qual, relation->rd_rel->relowner);

1164
		if (numlocks >= maxlocks)
1165 1166
		{
			maxlocks *= 2;
1167 1168
			rules = (RewriteRule **)
				repalloc(rules, sizeof(RewriteRule *) * maxlocks);
1169
		}
1170
		rules[numlocks++] = rule;
1171
	}
1172

1173
	/*
B
Bruce Momjian 已提交
1174
	 * end the scan and close the attribute relation
1175
	 */
1176
	caql_endscan(pcqCtx);
1177
	heap_close(rewrite_desc, AccessShareLock);
1178

1179
	/*
B
Bruce Momjian 已提交
1180
	 * form a RuleLock and insert into relation
1181
	 */
1182
	rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
1183 1184 1185 1186
	rulelock->numLocks = numlocks;
	rulelock->rules = rules;

	relation->rd_rules = rulelock;
1187 1188
}

1189
/*
1190 1191 1192 1193 1194 1195 1196 1197 1198
 *		equalRuleLocks
 *
 *		Determine whether two RuleLocks are equivalent
 *
 *		Probably this should be in the rules code someplace...
 */
static bool
equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
{
1199
	int			i;
1200

1201
	/*
B
Bruce Momjian 已提交
1202
	 * As of 7.3 we assume the rule ordering is repeatable, because
B
Bruce Momjian 已提交
1203 1204
	 * RelationBuildRuleLock should read 'em in a consistent order.  So just
	 * compare corresponding slots.
1205
	 */
1206 1207 1208 1209 1210 1211 1212 1213 1214
	if (rlock1 != NULL)
	{
		if (rlock2 == NULL)
			return false;
		if (rlock1->numLocks != rlock2->numLocks)
			return false;
		for (i = 0; i < rlock1->numLocks; i++)
		{
			RewriteRule *rule1 = rlock1->rules[i];
1215 1216 1217
			RewriteRule *rule2 = rlock2->rules[i];

			if (rule1->ruleId != rule2->ruleId)
1218 1219 1220 1221 1222 1223 1224
				return false;
			if (rule1->event != rule2->event)
				return false;
			if (rule1->attrno != rule2->attrno)
				return false;
			if (rule1->isInstead != rule2->isInstead)
				return false;
1225
			if (!equal(rule1->qual, rule2->qual))
1226
				return false;
1227
			if (!equal(rule1->actions, rule2->actions))
1228 1229 1230 1231 1232 1233
				return false;
		}
	}
	else if (rlock2 != NULL)
		return false;
	return true;
1234 1235
}

1236
/*
1237 1238
 *		RelationBuildDesc
 *
1239 1240 1241 1242
 *		Build a relation descriptor.  The caller must hold at least
 *		AccessShareLock on the target relid.
 *
 *		The new descriptor is inserted into the hash table if insertIt is true.
1243 1244 1245 1246
 *
 *		Returns NULL if no pg_class row could be found for the given relid
 *		(suggesting we are trying to access a just-deleted relation).
 *		Any other error is reported via elog.
1247
 */
1248
static Relation
1249
RelationBuildDesc(Oid targetRelId, bool insertIt)
1250
{
1251 1252
	Relation	relation;
	Oid			relid;
1253
	Relation    pg_class_relation;
1254
	HeapTuple	pg_class_tuple;
1255
	Form_pg_class relp;
1256

1257
	/*
B
Bruce Momjian 已提交
1258
	 * find the tuple in pg_class corresponding to the given relation id
1259
	 */
1260
	pg_class_tuple = ScanPgRelation(targetRelId, true, &pg_class_relation);
1261

1262
	/*
B
Bruce Momjian 已提交
1263
	 * if no such tuple exists, return NULL
1264 1265 1266 1267
	 */
	if (!HeapTupleIsValid(pg_class_tuple))
		return NULL;

1268
	/*
B
Bruce Momjian 已提交
1269
	 * get information from the pg_class_tuple
1270
	 */
1271
	relid = HeapTupleGetOid(pg_class_tuple);
1272
	relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
1273
	heap_close(pg_class_relation, AccessShareLock);
1274

1275
	/*
B
Bruce Momjian 已提交
1276
	 * allocate storage for the relation descriptor, and copy pg_class_tuple
1277
	 * to relation->rd_rel and new fields into relation->rd_newfields.
1278
	 */
1279
	relation = AllocateRelationDesc(relp);
1280

1281
	/*
B
Bruce Momjian 已提交
1282
	 * initialize the relation's relation id (relation->rd_id)
1283
	 */
1284
	RelationGetRelid(relation) = relid;
1285

1286
	/*
B
Bruce Momjian 已提交
1287 1288 1289
	 * normal relations are not nailed into the cache; nor can a pre-existing
	 * relation be new.  It could be temp though.  (Actually, it could be new
	 * too, but it's okay to forget that fact if forced to flush the entry.)
1290
	 */
1291
	relation->rd_refcnt = 0;
1292
	relation->rd_isnailed = false;
1293
	relation->rd_createSubid = InvalidSubTransactionId;
1294
	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1295
	relation->rd_istemp = isTempNamespace(relation->rd_rel->relnamespace);
1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308
	relation->rd_issyscat = (strncmp(relation->rd_rel->relname.data, "pg_", 3) == 0);

    /*
     * CDB: On QEs, temp relations must use shared buffer cache so data
     * will be visible to all segmates.  On QD, sequence objects must
     * use shared buffer cache so data will be visible to sequence server.
     */
    if (relation->rd_istemp &&
        relation->rd_rel->relkind != RELKIND_SEQUENCE &&
        Gp_role != GP_ROLE_EXECUTE)
        relation->rd_isLocalBuf = true;
    else
        relation->rd_isLocalBuf = false;
1309

1310
	/*
B
Bruce Momjian 已提交
1311
	 * initialize the tuple descriptor (relation->rd_att).
1312
	 */
1313
	RelationBuildTupleDesc(relation);
1314

1315
	/*
B
Bruce Momjian 已提交
1316
	 * Fetch rules and triggers that affect this relation
1317
	 */
1318
	if (relation->rd_rel->relhasrules)
1319 1320
		RelationBuildRuleLock(relation);
	else
1321
	{
1322
		relation->rd_rules = NULL;
1323 1324
		relation->rd_rulescxt = NULL;
	}
1325

1326
	if (relation->rd_rel->reltriggers > 0)
1327 1328 1329 1330
		RelationBuildTriggers(relation);
	else
		relation->trigdesc = NULL;

1331
	/*
1332
	 * if it's an index, initialize index-related information
1333
	 */
1334
	if (OidIsValid(relation->rd_rel->relam))
1335
		RelationInitIndexAccessInfo(relation);
1336

1337 1338 1339
	/* extract reloptions if any */
	RelationParseRelOptions(relation, pg_class_tuple);

1340
	/*
B
Bruce Momjian 已提交
1341
	 * initialize the relation lock manager information
1342 1343 1344
	 */
	RelationInitLockInfo(relation);		/* see lmgr.c */

1345 1346 1347 1348
	/*
	 * initialize physical addressing information for the relation
	 */
	RelationInitPhysicalAddr(relation);
1349

1350
	/* make sure relation is marked as having no open file yet */
1351
	relation->rd_smgr = NULL;
1352

1353 1354 1355 1356 1357 1358 1359 1360 1361
    /*
     * initialize Greenplum Database partitioning info
     */
    if (relation->rd_rel->relkind == RELKIND_RELATION &&
        !IsSystemRelation(relation))
        relation->rd_cdbpolicy = GpPolicyFetch(CacheMemoryContext, targetRelId);

    relation->rd_cdbDefaultStatsWarningIssued = false;

B
Bruce Momjian 已提交
1362 1363 1364 1365 1366
	/*
	 * now we can free the memory allocated for pg_class_tuple
	 */
	heap_freetuple(pg_class_tuple);

1367
	/*
1368
	 * Insert newly created relation into relcache hash table, if requested.
1369
	 */
1370 1371
	if (insertIt)
		RelationCacheInsert(relation);
1372

1373 1374 1375
	/* It's fully valid */
	relation->rd_isvalid = true;

1376
	return relation;
1377 1378
}

1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395
/*
 * Initialize the physical addressing info (RelFileNode) for a relcache entry
 */
static void
RelationInitPhysicalAddr(Relation relation)
{
	if (relation->rd_rel->reltablespace)
		relation->rd_node.spcNode = relation->rd_rel->reltablespace;
	else
		relation->rd_node.spcNode = MyDatabaseTableSpace;
	if (relation->rd_rel->relisshared)
		relation->rd_node.dbNode = InvalidOid;
	else
		relation->rd_node.dbNode = MyDatabaseId;
	relation->rd_node.relNode = relation->rd_rel->relfilenode;
}

1396 1397 1398 1399 1400
/*
 * Initialize index-access-method support data for an index relation
 */
void
RelationInitIndexAccessInfo(Relation relation)
1401
{
1402 1403
	HeapTuple	tuple;
	Form_pg_am	aform;
1404
	Datum		indclassDatum;
1405
	Datum		indoptionDatum;
1406
	bool		isnull;
1407
	oidvector  *indclass;
1408
	int2vector  *indoption;
1409
	MemoryContext indexcxt;
1410
	MemoryContext oldcontext;
1411
	int			natts;
1412 1413
	uint16		amstrategies;
	uint16		amsupport;
1414 1415

	/*
1416
	 * Make a copy of the pg_index entry for the index.  Since pg_index
B
Bruce Momjian 已提交
1417 1418
	 * contains variable-length and possibly-null fields, we have to do this
	 * honestly rather than just treating it as a Form_pg_index struct.
1419 1420 1421 1422 1423
	 */
	tuple = SearchSysCache(INDEXRELID,
						   ObjectIdGetDatum(RelationGetRelid(relation)),
						   0, 0, 0);
	if (!HeapTupleIsValid(tuple))
1424
		elog(ERROR, "cache lookup failed for index %u",
1425
			 RelationGetRelid(relation));
1426 1427 1428 1429
	oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_indextuple = heap_copytuple(tuple);
	relation->rd_index = (Form_pg_index) GETSTRUCT(relation->rd_indextuple);
	MemoryContextSwitchTo(oldcontext);
1430 1431 1432 1433 1434 1435 1436 1437 1438
	ReleaseSysCache(tuple);

	/*
	 * Make a copy of the pg_am entry for the index's access method
	 */
	tuple = SearchSysCache(AMOID,
						   ObjectIdGetDatum(relation->rd_rel->relam),
						   0, 0, 0);
	if (!HeapTupleIsValid(tuple))
1439
		elog(ERROR, "cache lookup failed for access method %u",
1440 1441 1442 1443 1444
			 relation->rd_rel->relam);
	aform = (Form_pg_am) MemoryContextAlloc(CacheMemoryContext, sizeof *aform);
	memcpy(aform, GETSTRUCT(tuple), sizeof *aform);
	ReleaseSysCache(tuple);
	relation->rd_am = aform;
1445 1446

	natts = relation->rd_rel->relnatts;
1447
	if (natts != relation->rd_index->indnatts)
1448
		elog(ERROR, "relnatts disagrees with indnatts for index %u",
1449
			 RelationGetRelid(relation));
1450 1451
	amstrategies = aform->amstrategies;
	amsupport = aform->amsupport;
1452

1453
	/*
B
Bruce Momjian 已提交
1454 1455 1456
	 * Make the private context to hold index access info.	The reason we need
	 * a context, and not just a couple of pallocs, is so that we won't leak
	 * any subsidiary info attached to fmgr lookup records.
1457 1458 1459 1460 1461 1462
	 *
	 * Context parameters are set on the assumption that it'll probably not
	 * contain much data.
	 */
	indexcxt = AllocSetContextCreate(CacheMemoryContext,
									 RelationGetRelationName(relation),
1463 1464 1465
									 ALLOCSET_SMALL_MINSIZE,
									 ALLOCSET_SMALL_INITSIZE,
									 ALLOCSET_SMALL_MAXSIZE);
1466 1467 1468 1469 1470
	relation->rd_indexcxt = indexcxt;

	/*
	 * Allocate arrays to hold data
	 */
1471 1472 1473
	relation->rd_aminfo = (RelationAmInfo *)
		MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));

1474 1475 1476 1477 1478
	relation->rd_opfamily = (Oid *)
		MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
	relation->rd_opcintype = (Oid *)
		MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));

1479
	if (amstrategies > 0)
1480
		relation->rd_operator = (Oid *)
1481 1482
			MemoryContextAllocZero(indexcxt,
								   natts * amstrategies * sizeof(Oid));
1483
	else
1484
		relation->rd_operator = NULL;
1485

1486
	if (amsupport > 0)
1487
	{
1488
		int			nsupport = natts * amsupport;
1489

1490
		relation->rd_support = (RegProcedure *)
1491
			MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
1492
		relation->rd_supportinfo = (FmgrInfo *)
1493
			MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
1494 1495
	}
	else
1496
	{
1497 1498
		relation->rd_support = NULL;
		relation->rd_supportinfo = NULL;
1499
	}
1500

1501 1502 1503
	relation->rd_indoption = (int16 *)
		MemoryContextAllocZero(indexcxt, natts * sizeof(int16));

1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514
	/*
	 * indclass cannot be referenced directly through the C struct, because it
	 * comes after the variable-width indkey field.  Must extract the
	 * datum the hard way...
	 */
	indclassDatum = fastgetattr(relation->rd_indextuple,
								Anum_pg_index_indclass,
								GetPgIndexDescriptor(),
								&isnull);
	Assert(!isnull);
	indclass = (oidvector *) DatumGetPointer(indclassDatum);
1515

1516
	/*
1517 1518
	 * Fill the operator and support procedure OID arrays, as well as the
	 * info about opfamilies and opclass input types.  (aminfo and
1519
	 * supportinfo are left as zeroes, and are filled on-the-fly when used)
1520
	 */
1521 1522 1523
	IndexSupportInitialize(indclass,
						   relation->rd_operator, relation->rd_support,
						   relation->rd_opfamily, relation->rd_opcintype,
1524
						   amstrategies, amsupport, natts);
1525

1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536
	/*
	 * Similarly extract indoption and copy it to the cache entry
	 */
	indoptionDatum = fastgetattr(relation->rd_indextuple,
								 Anum_pg_index_indoption,
								 GetPgIndexDescriptor(),
								 &isnull);
	Assert(!isnull);
	indoption = (int2vector *) DatumGetPointer(indoptionDatum);
	memcpy(relation->rd_indoption, indoption->values, natts * sizeof(int16));

1537 1538 1539 1540 1541
	/*
	 * expressions and predicate cache will be filled later
	 */
	relation->rd_indexprs = NIL;
	relation->rd_indpred = NIL;
1542
	relation->rd_amcache = NULL;
1543 1544
}

1545
/*
1546
 * IndexSupportInitialize
1547
 *		Initializes an index's cached opclass information,
1548
 *		given the index's pg_index.indclass entry.
1549
 *
1550 1551
 * Data is returned into *indexOperator, *indexSupport, *opFamily, and
 * *opcInType, which are arrays allocated by the caller.
1552 1553 1554 1555 1556 1557 1558
 *
 * The caller also passes maxStrategyNumber, maxSupportNumber, and
 * maxAttributeNumber, since these indicate the size of the arrays
 * it has allocated --- but in practice these numbers must always match
 * those obtainable from the system catalog entries for the index and
 * access method.
 */
1559
void
1560
IndexSupportInitialize(oidvector *indclass,
1561 1562
					   Oid *indexOperator,
					   RegProcedure *indexSupport,
1563 1564
					   Oid *opFamily,
					   Oid *opcInType,
1565 1566 1567 1568 1569 1570 1571 1572 1573 1574
					   StrategyNumber maxStrategyNumber,
					   StrategyNumber maxSupportNumber,
					   AttrNumber maxAttributeNumber)
{
	int			attIndex;

	for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
	{
		OpClassCacheEnt *opcentry;

1575
		if (!OidIsValid(indclass->values[attIndex]))
1576
			elog(ERROR, "bogus pg_index tuple");
1577 1578

		/* look up the info for this opclass, using a cache */
1579
		opcentry = LookupOpclassInfo(indclass->values[attIndex],
1580 1581 1582
									 maxStrategyNumber,
									 maxSupportNumber);

1583
		/* copy cached data into relcache entry */
1584 1585
		opFamily[attIndex] = opcentry->opcfamily;
		opcInType[attIndex] = opcentry->opcintype;
1586
		if (maxStrategyNumber > 0)
1587 1588 1589
			memcpy(&indexOperator[attIndex * maxStrategyNumber],
				   opcentry->operatorOids,
				   maxStrategyNumber * sizeof(Oid));
1590
		if (maxSupportNumber > 0)
1591 1592 1593
			memcpy(&indexSupport[attIndex * maxSupportNumber],
				   opcentry->supportProcs,
				   maxSupportNumber * sizeof(RegProcedure));
1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608
	}
}

/*
 * LookupOpclassInfo
 *
 * This routine maintains a per-opclass cache of the information needed
 * by IndexSupportInitialize().  This is more efficient than relying on
 * the catalog cache, because we can load all the info about a particular
 * opclass in a single indexscan of pg_amproc or pg_amop.
 *
 * The information from pg_am about expected range of strategy and support
 * numbers is passed in, rather than being looked up, mainly because the
 * caller will have it already.
 *
1609 1610 1611 1612 1613 1614 1615
 * Note there is no provision for flushing the cache.  This is OK at the
 * moment because there is no way to ALTER any interesting properties of an
 * existing opclass --- all you can do is drop it, which will result in
 * a useless but harmless dead entry in the cache.	To support altering
 * opclass membership (not the same as opfamily membership!), we'd need to
 * be able to flush this cache as well as the contents of relcache entries
 * for indexes.
1616 1617 1618 1619 1620 1621 1622 1623
 */
static OpClassCacheEnt *
LookupOpclassInfo(Oid operatorClassOid,
				  StrategyNumber numStrats,
				  StrategyNumber numSupport)
{
	OpClassCacheEnt *opcentry;
	bool		found;
1624 1625
	Relation	rel;
	SysScanDesc scan;
1626
	ScanKeyData skey[3];
1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640
	HeapTuple	htup;
	bool		indexOK;

	if (OpClassCache == NULL)
	{
		/* First time through: initialize the opclass cache */
		HASHCTL		ctl;

		if (!CacheMemoryContext)
			CreateCacheMemoryContext();

		MemSet(&ctl, 0, sizeof(ctl));
		ctl.keysize = sizeof(Oid);
		ctl.entrysize = sizeof(OpClassCacheEnt);
1641
		ctl.hash = oid_hash;
1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664
		OpClassCache = hash_create("Operator class cache", 64,
								   &ctl, HASH_ELEM | HASH_FUNCTION);
	}

	opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
											   (void *) &operatorClassOid,
											   HASH_ENTER, &found);

	if (found && opcentry->valid)
	{
		/* Already made an entry for it */
		Assert(numStrats == opcentry->numStrats);
		Assert(numSupport == opcentry->numSupport);
		return opcentry;
	}

	/* Need to fill in new entry */
	opcentry->valid = false;	/* until known OK */
	opcentry->numStrats = numStrats;
	opcentry->numSupport = numSupport;

	if (numStrats > 0)
		opcentry->operatorOids = (Oid *)
1665 1666
			MemoryContextAllocZero(CacheMemoryContext,
								   numStrats * sizeof(Oid));
1667 1668 1669 1670 1671
	else
		opcentry->operatorOids = NULL;

	if (numSupport > 0)
		opcentry->supportProcs = (RegProcedure *)
1672 1673
			MemoryContextAllocZero(CacheMemoryContext,
								   numSupport * sizeof(RegProcedure));
1674 1675 1676 1677
	else
		opcentry->supportProcs = NULL;

	/*
B
Bruce Momjian 已提交
1678 1679 1680
	 * To avoid infinite recursion during startup, force heap scans if we're
	 * looking up info for the opclasses used by the indexes we would like to
	 * reference here.
1681 1682 1683 1684 1685
	 */
	indexOK = criticalRelcachesBuilt ||
		(operatorClassOid != OID_BTREE_OPS_OID &&
		 operatorClassOid != INT2_BTREE_OPS_OID);

1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713
	/*
	 * We have to fetch the pg_opclass row to determine its opfamily and
	 * opcintype, which are needed to look up the operators and functions.
	 * It'd be convenient to use the syscache here, but that probably doesn't
	 * work while bootstrapping.
	 */
	ScanKeyInit(&skey[0],
				ObjectIdAttributeNumber,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(operatorClassOid));
	rel = heap_open(OperatorClassRelationId, AccessShareLock);
	scan = systable_beginscan(rel, OpclassOidIndexId, indexOK,
							  SnapshotNow, 1, skey);

	if (HeapTupleIsValid(htup = systable_getnext(scan)))
	{
		Form_pg_opclass opclassform = (Form_pg_opclass) GETSTRUCT(htup);

		opcentry->opcfamily = opclassform->opcfamily;
		opcentry->opcintype = opclassform->opcintype;
	}
	else
		elog(ERROR, "could not find tuple for opclass %u", operatorClassOid);

	systable_endscan(scan);
	heap_close(rel, AccessShareLock);


1714
	/*
B
Bruce Momjian 已提交
1715
	 * Scan pg_amop to obtain operators for the opclass.  We only fetch the
1716
	 * default ones (those with lefttype = righttype = opcintype).
1717 1718 1719
	 */
	if (numStrats > 0)
	{
1720
		ScanKeyInit(&skey[0],
1721
					Anum_pg_amop_amopfamily,
1722
					BTEqualStrategyNumber, F_OIDEQ,
1723
					ObjectIdGetDatum(opcentry->opcfamily));
1724
		ScanKeyInit(&skey[1],
1725 1726 1727 1728 1729
					Anum_pg_amop_amoplefttype,
					BTEqualStrategyNumber, F_OIDEQ,
					ObjectIdGetDatum(opcentry->opcintype));
		ScanKeyInit(&skey[2],
					Anum_pg_amop_amoprighttype,
1730
					BTEqualStrategyNumber, F_OIDEQ,
1731
					ObjectIdGetDatum(opcentry->opcintype));
1732 1733
		rel = heap_open(AccessMethodOperatorRelationId, AccessShareLock);
		scan = systable_beginscan(rel, AccessMethodStrategyIndexId, indexOK,
1734
								  SnapshotNow, 3, skey);
1735 1736

		while (HeapTupleIsValid(htup = systable_getnext(scan)))
1737 1738 1739 1740 1741
		{
			Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(htup);

			if (amopform->amopstrategy <= 0 ||
				(StrategyNumber) amopform->amopstrategy > numStrats)
1742
				elog(ERROR, "invalid amopstrategy number %d for opclass %u",
1743 1744 1745 1746 1747
					 amopform->amopstrategy, operatorClassOid);
			opcentry->operatorOids[amopform->amopstrategy - 1] =
				amopform->amopopr;
		}

1748 1749
		systable_endscan(scan);
		heap_close(rel, AccessShareLock);
1750 1751 1752
	}

	/*
B
Bruce Momjian 已提交
1753
	 * Scan pg_amproc to obtain support procs for the opclass.	We only fetch
1754
	 * the default ones (those with lefttype = righttype = opcintype).
1755 1756 1757
	 */
	if (numSupport > 0)
	{
1758
		ScanKeyInit(&skey[0],
1759
					Anum_pg_amproc_amprocfamily,
1760
					BTEqualStrategyNumber, F_OIDEQ,
1761
					ObjectIdGetDatum(opcentry->opcfamily));
1762
		ScanKeyInit(&skey[1],
1763
					Anum_pg_amproc_amproclefttype,
1764
					BTEqualStrategyNumber, F_OIDEQ,
1765 1766 1767 1768 1769
					ObjectIdGetDatum(opcentry->opcintype));
		ScanKeyInit(&skey[2],
					Anum_pg_amproc_amprocrighttype,
					BTEqualStrategyNumber, F_OIDEQ,
					ObjectIdGetDatum(opcentry->opcintype));
1770 1771
		rel = heap_open(AccessMethodProcedureRelationId, AccessShareLock);
		scan = systable_beginscan(rel, AccessMethodProcedureIndexId, indexOK,
1772
								  SnapshotNow, 3, skey);
1773 1774

		while (HeapTupleIsValid(htup = systable_getnext(scan)))
1775 1776 1777 1778 1779
		{
			Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);

			if (amprocform->amprocnum <= 0 ||
				(StrategyNumber) amprocform->amprocnum > numSupport)
1780
				elog(ERROR, "invalid amproc number %d for opclass %u",
1781 1782 1783 1784 1785 1786
					 amprocform->amprocnum, operatorClassOid);

			opcentry->supportProcs[amprocform->amprocnum - 1] =
				amprocform->amproc;
		}

1787 1788
		systable_endscan(scan);
		heap_close(rel, AccessShareLock);
1789 1790 1791 1792 1793 1794 1795 1796 1797 1798
	}

	opcentry->valid = true;
	return opcentry;
}


/*
 *		formrdesc
 *
1799 1800
 *		This is a special cut-down version of RelationBuildDesc(),
 *		used while initializing the relcache.
1801
 *		The relation descriptor is built just from the supplied parameters,
1802 1803
 *		without actually looking at any system table entries.  We cheat
 *		quite a lot since we only need to work for a few basic system
1804 1805
 *		catalogs.
 *
1806 1807 1808
 * formrdesc is currently used for: pg_database, pg_authid, pg_auth_members,
 * pg_class, pg_attribute, pg_proc, and pg_type
 * (see RelationCacheInitializePhase2/3).
1809
 *
1810 1811
 * Note that these catalogs can't have constraints (except attnotnull),
 * default values, rules, or triggers, since we don't cope with any of that.
1812 1813 1814
 * (Well, actually, this only matters for properties that need to be valid
 * during bootstrap or before RelationCacheInitializePhase3 runs, and none of
 * these properties matter then...)
1815
 *
1816
 * NOTE: we assume we are already switched into CacheMemoryContext.
1817 1818
 */
static void
1819
formrdesc(const char *relationName, Oid relationReltype,
1820 1821
		  bool isshared, bool hasoids,
		  int natts, const FormData_pg_attribute *attrs)
1822
{
1823
	Relation	relation;
1824
	int			i;
1825
	bool		has_not_null;
1826

1827
	/*
1828
	 * allocate new relation desc, clear all fields of reldesc
1829
	 */
1830
	relation = (Relation) palloc0(sizeof(RelationData));
1831 1832 1833
	relation->rd_targblock = InvalidBlockNumber;

	/* make sure relation is marked as having no open file yet */
1834
	relation->rd_smgr = NULL;
1835

1836
	/*
1837
	 * initialize reference count: 1 because it is nailed in cache
1838
	 */
1839
	relation->rd_refcnt = 1;
1840

1841
	/*
B
Bruce Momjian 已提交
1842 1843
	 * all entries built with this routine are nailed-in-cache; none are for
	 * new or temp relations.
1844
	 */
1845
	relation->rd_isnailed = true;
1846
	relation->rd_createSubid = InvalidSubTransactionId;
1847
	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1848
	relation->rd_istemp = false;
1849 1850
	relation->rd_issyscat = (strncmp(relationName, "pg_", 3) == 0);	/* GP */
    relation->rd_isLocalBuf = false;    /*CDB*/
1851

1852
	/*
B
Bruce Momjian 已提交
1853
	 * initialize relation tuple form
1854
	 *
1855
	 * The data we insert here is pretty incomplete/bogus, but it'll serve to
1856 1857 1858 1859
	 * get us launched.  RelationCacheInitializePhase3() will read the real
	 * data from pg_class and replace what we've done here.  Note in
	 * particular that relowner is left as zero; this cues
	 * RelationCacheInitializePhase3 that the real data isn't there yet.
1860
	 */
1861
	relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
1862

1863 1864
	namestrcpy(&relation->rd_rel->relname, relationName);
	relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
1865
	relation->rd_rel->reltype = relationReltype;
1866 1867

	/*
B
Bruce Momjian 已提交
1868
	 * It's important to distinguish between shared and non-shared relations,
1869
	 * even at bootstrap time, to make sure we know where they are stored.
1870
	 */
1871 1872 1873
	relation->rd_rel->relisshared = isshared;
	if (isshared)
		relation->rd_rel->reltablespace = GLOBALTABLESPACE_OID;
1874

1875 1876
	relation->rd_rel->relpages = 0;
	relation->rd_rel->reltuples = 0;
1877
	relation->rd_rel->relkind = RELKIND_RELATION;
1878
	relation->rd_rel->relstorage = RELSTORAGE_HEAP;
1879
	relation->rd_rel->relhasoids = hasoids;
1880
	relation->rd_rel->relnatts = (int16) natts;
1881

1882 1883 1884 1885 1886 1887
	/*
	 * Physical file-system information.
	 */
	relation->rd_segfile0_relationnodeinfo.isPresent = false;
	relation->rd_segfile0_relationnodeinfo.tidAllowedToBeZero = false;
	
1888
	/*
B
Bruce Momjian 已提交
1889
	 * initialize attribute tuple form
1890
	 *
B
Bruce Momjian 已提交
1891
	 * Unlike the case with the relation tuple, this data had better be right
B
Bruce Momjian 已提交
1892 1893
	 * because it will never be replaced.  The input values must be correctly
	 * defined by macros in src/include/catalog/ headers.
1894
	 */
1895
	relation->rd_att = CreateTemplateTupleDesc(natts, hasoids);
1896 1897
	relation->rd_att->tdrefcount = 1;	/* mark as refcounted */

1898 1899
	relation->rd_att->tdtypeid = relationReltype;
	relation->rd_att->tdtypmod = -1;	/* unnecessary, but... */
1900

1901
	/*
B
Bruce Momjian 已提交
1902
	 * initialize tuple desc info
1903
	 */
1904
	has_not_null = false;
1905 1906
	for (i = 0; i < natts; i++)
	{
1907
		memcpy(relation->rd_att->attrs[i],
1908 1909 1910
			   &attrs[i],
			   ATTRIBUTE_FIXED_PART_SIZE);
		has_not_null |= attrs[i].attnotnull;
1911 1912
		/* make sure attcacheoff is valid */
		relation->rd_att->attrs[i]->attcacheoff = -1;
1913 1914
	}

1915 1916 1917
	/* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
	relation->rd_att->attrs[0]->attcacheoff = 0;

1918 1919 1920 1921 1922 1923 1924 1925 1926
	/* mark not-null status */
	if (has_not_null)
	{
		TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

		constr->has_not_null = true;
		relation->rd_att->constr = constr;
	}

1927
	/*
1928
	 * initialize relation id from info in att array (my, this is ugly)
1929
	 */
1930
	RelationGetRelid(relation) = relation->rd_att->attrs[0]->attrelid;
1931
	relation->rd_rel->relfilenode = RelationGetRelid(relation);
1932

1933
	/*
1934
	 * initialize the relation lock manager information
1935 1936 1937
	 */
	RelationInitLockInfo(relation);		/* see lmgr.c */

1938 1939 1940 1941
	/*
	 * initialize physical addressing information for the relation
	 */
	RelationInitPhysicalAddr(relation);
1942

1943
	/*
B
Bruce Momjian 已提交
1944
	 * initialize the rel-has-index flag, using hardwired knowledge
1945
	 */
1946 1947 1948 1949 1950 1951
	if (IsBootstrapProcessingMode())
	{
		/* In bootstrap mode, we have no indexes */
		relation->rd_rel->relhasindex = false;
	}
	else
1952
	{
1953 1954
		/* Otherwise, all the rels formrdesc is used for have indexes */
		relation->rd_rel->relhasindex = true;
1955 1956
	}

1957
	/*
B
Bruce Momjian 已提交
1958
	 * add new reldesc to relcache
1959
	 */
1960
	RelationCacheInsert(relation);
1961 1962 1963

	/* It's fully valid */
	relation->rd_isvalid = true;
1964 1965 1966 1967
}


/* ----------------------------------------------------------------
1968
 *				 Relation Descriptor Lookup Interface
1969 1970 1971
 * ----------------------------------------------------------------
 */

1972
/*
1973
 *		RelationIdGetRelation
1974
 *
1975
 *		Lookup a reldesc by OID; make one if not already in cache.
1976
 *
1977 1978 1979
 *		Returns NULL if no pg_class row could be found for the given relid
 *		(suggesting we are trying to access a just-deleted relation).
 *		Any other error is reported via elog.
1980
 *
1981 1982 1983 1984
 *		NB: caller should already have at least AccessShareLock on the
 *		relation ID, else there are nasty race conditions.
 *
 *		NB: relation ref count is incremented, or set to 1 if new entry.
1985 1986
 *		Caller should eventually decrement count.  (Usually,
 *		that happens by calling RelationClose().)
1987 1988
 */
Relation
1989
RelationIdGetRelation(Oid relationId)
1990
{
1991
	Relation	rd;
1992

1993 1994 1995
	/*
	 * first try to find reldesc in the cache
	 */
1996 1997 1998
	RelationIdCacheLookup(relationId, rd);

	if (RelationIsValid(rd))
1999
	{
2000
		RelationIncrementReferenceCount(rd);
2001
		/* revalidate cache entry if necessary */
2002
		if (!rd->rd_isvalid)
2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014
		{
			/*
			 * Indexes only have a limited number of possible schema changes,
			 * and we don't want to use the full-blown procedure because it's
			 * a headache for indexes that reload itself depends on.
			 */
			if (rd->rd_rel->relkind == RELKIND_INDEX)
				RelationReloadClassinfo(rd);
			else
				RelationClearRelation(rd, true);
		 }

2015
		return rd;
2016
	}
2017

2018
	/*
B
Bruce Momjian 已提交
2019 2020
	 * no reldesc in the cache, so have RelationBuildDesc() build one and add
	 * it.
2021
	 */
2022
	rd = RelationBuildDesc(relationId, true);
2023 2024
	if (RelationIsValid(rd))
		RelationIncrementReferenceCount(rd);
2025

2026 2027 2028 2029
	return rd;
}

/* ----------------------------------------------------------------
2030
 *				cache invalidation support routines
2031 2032 2033
 * ----------------------------------------------------------------
 */

2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057
/*
 * RelationIncrementReferenceCount
 *		Increments relation reference count.
 *
 * Note: bootstrap mode has its own weird ideas about relation refcount
 * behavior; we ought to fix it someday, but for now, just disable
 * reference count ownership tracking in bootstrap mode.
 */
void
RelationIncrementReferenceCount(Relation rel)
{
	ResourceOwnerEnlargeRelationRefs(CurrentResourceOwner);
	rel->rd_refcnt += 1;
	if (!IsBootstrapProcessingMode())
		ResourceOwnerRememberRelationRef(CurrentResourceOwner, rel);
}

/*
 * RelationDecrementReferenceCount
 *		Decrements relation reference count.
 */
void
RelationDecrementReferenceCount(Relation rel)
{
2058 2059 2060 2061 2062 2063 2064 2065 2066 2067
	if (rel->rd_refcnt <= 0)
	{
		elog(ERROR,
			 "Relation decrement reference count found relation %u/%u/%u with bad count (reference count %d)",
			 rel->rd_node.spcNode,
			 rel->rd_node.dbNode,
			 rel->rd_node.relNode,
			 rel->rd_refcnt);
	}
	
2068 2069 2070 2071 2072
	rel->rd_refcnt -= 1;
	if (!IsBootstrapProcessingMode())
		ResourceOwnerForgetRelationRef(CurrentResourceOwner, rel);
}

2073
/*
2074 2075
 * RelationClose - close an open relation
 *
2076 2077 2078 2079 2080 2081 2082
 *	Actually, we just decrement the refcount.
 *
 *	NOTE: if compiled with -DRELCACHE_FORCE_RELEASE then relcache entries
 *	will be freed as soon as their refcount goes to zero.  In combination
 *	with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
 *	to catch references to already-released relcache entries.  It slows
 *	things down quite a bit, however.
2083 2084 2085 2086
 */
void
RelationClose(Relation relation)
{
2087 2088
	/* Note: no locking manipulations needed */
	RelationDecrementReferenceCount(relation);
2089 2090

#ifdef RELCACHE_FORCE_RELEASE
2091
	if (RelationHasReferenceCountZero(relation) &&
2092 2093
		relation->rd_createSubid == InvalidSubTransactionId &&
		relation->rd_newRelfilenodeSubid == InvalidSubTransactionId)
2094 2095
		RelationClearRelation(relation, false);
#endif
2096 2097
}

2098
/*
2099 2100
 * RelationReloadClassinfo - reload the pg_class row (only)
 *
2101 2102 2103 2104 2105 2106 2107
 *	This function is used only for indexes.  We currently allow only the
 *	pg_class row of an existing index to change (to support changes of
 *	owner, tablespace, or relfilenode), not its pg_index row or other
 *	subsidiary index schema information.  Therefore it's sufficient to do
 *	this when we get an SI invalidation.  Furthermore, there are cases
 *	where it's necessary not to throw away the index information, especially
 *	for "nailed" indexes which we are unable to rebuild on-the-fly.
2108 2109 2110 2111
 *
 *	We can't necessarily reread the pg_class row right away; we might be
 *	in a failed transaction when we receive the SI notification.  If so,
 *	RelationClearRelation just marks the entry as invalid by setting
2112
 *	rd_isvalid to false.  This routine is called to fix the entry when it
2113
 *	is next needed.
2114 2115 2116 2117
 *
 *	We assume that at the time we are called, we have at least AccessShareLock
 *	on the target index.  (Note: in the calls from RelationClearRelation,
 *	this is legitimate because we know the rel has positive refcount.)
H
Hiroshi Inoue 已提交
2118 2119 2120 2121
 */
static void
RelationReloadClassinfo(Relation relation)
{
2122
	bool		indexOK;
H
Hiroshi Inoue 已提交
2123
	HeapTuple	pg_class_tuple;
B
Bruce Momjian 已提交
2124
	Form_pg_class relp;
H
Hiroshi Inoue 已提交
2125

2126 2127 2128 2129 2130
	/* Should be called only for invalidated indexes */
	Assert(relation->rd_rel->relkind == RELKIND_INDEX &&
		   !relation->rd_isvalid);
	/* Should be closed at smgr level */
	Assert(relation->rd_smgr == NULL);
B
Bruce Momjian 已提交
2131

2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150
	/* Make sure targblock is reset in case rel was truncated */
	relation->rd_targblock = InvalidBlockNumber;
	/* Must free any AM cached data, too */
	if (relation->rd_amcache)
		pfree(relation->rd_amcache);
	relation->rd_amcache = NULL;
	/*
	 * If it's a shared index, we might be called before backend startup has
	 * finished selecting a database, in which case we have no way to read
	 * pg_class yet.  However, a shared index can never have any significant
	 * schema updates, so it's okay to ignore the invalidation signal.  Just
	 * mark it valid and return without doing anything more.
	 */
	if (relation->rd_rel->relisshared && !criticalRelcachesBuilt)
	{
		relation->rd_isvalid = true;
		return;
	}

2151
	/*
2152 2153
	 * Read the pg_class row
	 *
2154 2155
	 * Don't try to use an indexscan of pg_class_oid_index to reload the info
	 * for pg_class_oid_index ...
2156
	 */
2157
	indexOK = (RelationGetRelid(relation) != ClassOidIndexId);
2158
	pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK, NULL);
H
Hiroshi Inoue 已提交
2159
	if (!HeapTupleIsValid(pg_class_tuple))
2160
		elog(ERROR, "could not find pg_class tuple for index %u",
2161
			 RelationGetRelid(relation));
H
Hiroshi Inoue 已提交
2162
	relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
2163
	memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
2164
	/* Reload reloptions in case they changed */
B
Bruce Momjian 已提交
2165 2166
	if (relation->rd_options)
		pfree(relation->rd_options);
2167 2168
	RelationParseRelOptions(relation, pg_class_tuple);
	/* done with pg_class tuple */
H
Hiroshi Inoue 已提交
2169
	heap_freetuple(pg_class_tuple);
2170 2171
	/* We must recalculate physical address in case it changed */
	RelationInitPhysicalAddr(relation);
2172 2173 2174 2175 2176

	/* Forget gp_relation_node information -- it may have changed. */
	MemSet(&relation->rd_segfile0_relationnodeinfo, 0, sizeof(RelationNodeInfo));


2177
	/* Okay, now it's valid again */
2178
	relation->rd_isvalid = true;
H
Hiroshi Inoue 已提交
2179
}
2180

2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226
/*
 * RelationDestroyRelation
 *
 *	Physically delete a relation cache entry and all subsidiary data.
 *	Caller must already have unhooked the entry from the hash table.
 */
static void
RelationDestroyRelation(Relation relation)
{
	Assert(RelationHasReferenceCountZero(relation));

	/*
	 * Make sure smgr and lower levels close the relation's files, if they
	 * weren't closed already.  (This was probably done by caller, but let's
	 * just be real sure.)
	 */
	RelationCloseSmgr(relation);

	/*
	 * Free all the subsidiary data structures of the relcache entry,
	 * then the entry itself.
	 */
	if (relation->rd_rel)
		pfree(relation->rd_rel);
	/* can't use DecrTupleDescRefCount here */
	Assert(relation->rd_att->tdrefcount > 0);
	if (--relation->rd_att->tdrefcount == 0)
		FreeTupleDesc(relation->rd_att);
	list_free(relation->rd_indexlist);
	FreeTriggerDesc(relation->trigdesc);
	if (relation->rd_options)
		pfree(relation->rd_options);
	if (relation->rd_indextuple)
		pfree(relation->rd_indextuple);
	if (relation->rd_am)
		pfree(relation->rd_am);
	if (relation->rd_indexcxt)
		MemoryContextDelete(relation->rd_indexcxt);
	if (relation->rd_rulescxt)
		MemoryContextDelete(relation->rd_rulescxt);
	if (relation->rd_cdbpolicy)
		pfree(relation->rd_cdbpolicy);

	pfree(relation);
}

2227
/*
2228
 * RelationClearRelation
2229
 *
2230 2231 2232 2233 2234
 *	 Physically blow away a relation cache entry, or reset it and rebuild
 *	 it from scratch (that is, from catalog entries).  The latter path is
 *	 usually used when we are notified of a change to an open relation
 *	 (one with refcount > 0).  However, this routine just does whichever
 *	 it's told to do; callers must determine which they want.
2235 2236 2237
 *
 *	 NB: when rebuilding, we'd better hold some lock on the relation.
 *	 In current usages this is presumed true because it has refcnt > 0.
2238
 */
2239
static void
2240
RelationClearRelation(Relation relation, bool rebuild)
2241
{
2242
	Oid			old_reltype = relation->rd_rel->reltype;
2243 2244

	/*
2245
	 * Make sure smgr and lower levels close the relation's files, if they
B
Bruce Momjian 已提交
2246 2247 2248 2249
	 * weren't closed already.  If the relation is not getting deleted, the
	 * next smgr access should reopen the files automatically.	This ensures
	 * that the low-level file access state is updated after, say, a vacuum
	 * truncation.
2250
	 */
2251
	RelationCloseSmgr(relation);
2252

2253
	/*
B
Bruce Momjian 已提交
2254 2255 2256
	 * Never, never ever blow away a nailed-in system relation, because we'd
	 * be unable to recover.  However, we must reset rd_targblock, in case we
	 * got called because of a relation cache flush that was triggered by
2257
	 * VACUUM.  Likewise reset the fsm and vm size info.
2258
	 *
2259 2260 2261
	 * If it's a nailed index, then we need to re-read the pg_class row to see
	 * if its relfilenode changed.	We can't necessarily do that here, because
	 * we might be in a failed transaction.  We assume it's okay to do it if
B
Bruce Momjian 已提交
2262 2263 2264
	 * there are open references to the relcache entry (cf notes for
	 * AtEOXact_RelationCache).  Otherwise just mark the entry as possibly
	 * invalid, and it'll be fixed when next opened.
2265 2266
	 */
	if (relation->rd_isnailed)
H
Hiroshi Inoue 已提交
2267
	{
2268
		relation->rd_targblock = InvalidBlockNumber;
2269 2270
		if (relation->rd_rel->relkind == RELKIND_INDEX)
		{
B
Bruce Momjian 已提交
2271
			relation->rd_isvalid = false;		/* needs to be revalidated */
2272 2273 2274
			if (relation->rd_refcnt > 1)
				RelationReloadClassinfo(relation);
		}
2275
		return;
H
Hiroshi Inoue 已提交
2276
	}
2277

2278 2279 2280 2281
	/*
	 * Even non-system indexes should not be blown away if they are open and
	 * have valid index support information.  This avoids problems with active
	 * use of the index support information.  As with nailed indexes, we
B
Bruce Momjian 已提交
2282 2283
	 * re-read the pg_class row to handle possible physical relocation of the
	 * index.
2284 2285 2286 2287 2288
	 */
	if (relation->rd_rel->relkind == RELKIND_INDEX &&
		relation->rd_refcnt > 0 &&
		relation->rd_indexcxt != NULL)
	{
B
Bruce Momjian 已提交
2289
		relation->rd_isvalid = false;	/* needs to be revalidated */
2290 2291 2292 2293
		RelationReloadClassinfo(relation);
		return;
	}

2294 2295
	/* Mark it invalid until we've finished rebuild */
	relation->rd_isvalid = false;
2296

2297
	/*
2298
	 * If we're really done with the relcache entry, blow it away. But if
B
Bruce Momjian 已提交
2299 2300 2301
	 * someone is still using it, reconstruct the whole deal without moving
	 * the physical RelationData record (so that the someone's pointer is
	 * still valid).
2302
	 */
2303
	if (!rebuild)
2304
	{
2305
		/* Flush any rowtype cache entry */
2306
		flush_rowtype_cache(old_reltype);
2307 2308 2309 2310 2311 2312

		/* Remove it from the hash table */
		RelationCacheDelete(relation);

		/* And release storage */
		RelationDestroyRelation(relation);
2313 2314 2315
	}
	else
	{
2316
		/*
2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331
		 * Our strategy for rebuilding an open relcache entry is to build
		 * a new entry from scratch, swap its contents with the old entry,
		 * and finally delete the new entry (along with any infrastructure
		 * swapped over from the old entry).  This is to avoid trouble in case
		 * an error causes us to lose control partway through.  The old entry
		 * will still be marked !rd_isvalid, so we'll try to rebuild it again
		 * on next access.  Meanwhile it's not any less valid than it was
		 * before, so any code that might expect to continue accessing it
		 * isn't hurt by the rebuild failure.  (Consider for example a
		 * subtransaction that ALTERs a table and then gets cancelled partway
		 * through the cache entry rebuild.  The outer transaction should
		 * still see the not-modified cache entry as valid.)  The worst
		 * consequence of an error is leaking the necessarily-unreferenced
		 * new entry, and this shouldn't happen often enough for that to be
		 * a big problem.
2332
		 *
2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374
		 * When rebuilding an open relcache entry, we must preserve ref count
		 * and rd_createSubid/rd_newRelfilenodeSubid state.  Also attempt to
		 * preserve the pg_class entry (rd_rel), tupledesc, and rewrite-rule
		 * substructures in place, because various places assume that these
		 * structures won't move while they are working with an open relcache
		 * entry.  (Note: the refcount mechanism for tupledescs might someday
		 * allow us to remove this hack for the tupledesc.)
 		 *
 		 * Note that this process does not touch CurrentResourceOwner; which
 		 * is good because whatever ref counts the entry may have do not
 		 * necessarily belong to that resource owner.
 		 */
		Relation	newrel;
 		Oid			save_relid = RelationGetRelid(relation);
		bool		keep_tupdesc;
		bool		keep_rules;

		/* Build temporary entry, but don't link it into hashtable */
		newrel = RelationBuildDesc(save_relid, false);
		if (newrel == NULL)
 		{
 			/* Should only get here if relation was deleted */
 			flush_rowtype_cache(old_reltype);
			RelationCacheDelete(relation);
			RelationDestroyRelation(relation);
 			elog(ERROR, "relation %u deleted while still in use", save_relid);
 		}
 
		keep_tupdesc = equalTupleDescs(relation->rd_att, newrel->rd_att, true);
		keep_rules = equalRuleLocks(relation->rd_rules, newrel->rd_rules);
		if (!keep_tupdesc)
 			flush_rowtype_cache(old_reltype);

		/*
		 * Perform swapping of the relcache entry contents.  Within this
		 * process the old entry is momentarily invalid, so there *must*
		 * be no possibility of CHECK_FOR_INTERRUPTS within this sequence.
		 * Do it in all-in-line code for safety.
		 *
		 * Since the vast majority of fields should be swapped, our method
		 * is to swap the whole structures and then re-swap those few fields
		 * we didn't want swapped.
2375
		 */
2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421
#define SWAPFIELD(fldtype, fldname) \
		do { \
			fldtype _tmp = newrel->fldname; \
			newrel->fldname = relation->fldname; \
			relation->fldname = _tmp; \
		} while (0)

		/* swap all Relation struct fields */
 		{
			RelationData tmpstruct;

			memcpy(&tmpstruct, newrel, sizeof(RelationData));
			memcpy(newrel, relation, sizeof(RelationData));
			memcpy(relation, &tmpstruct, sizeof(RelationData));
 		}

		/* rd_smgr must not be swapped, due to back-links from smgr level */
		SWAPFIELD(SMgrRelation, rd_smgr);
		/* rd_refcnt must be preserved */
		SWAPFIELD(int, rd_refcnt);
		/* isnailed shouldn't change */
		Assert(newrel->rd_isnailed == relation->rd_isnailed);
		/* creation sub-XIDs must be preserved */
		SWAPFIELD(SubTransactionId, rd_createSubid);
		/* un-swap rd_rel pointers, swap contents instead */
		SWAPFIELD(Form_pg_class, rd_rel);
		/* ... but actually, we don't have to update newrel->rd_rel */
		memcpy(relation->rd_rel, newrel->rd_rel, CLASS_TUPLE_SIZE);
		/* preserve old tupledesc and rules if no logical change */
		if (keep_tupdesc)
			SWAPFIELD(TupleDesc, rd_att);
		if (keep_rules)
 		{
			SWAPFIELD(RuleLock *, rd_rules);
			SWAPFIELD(MemoryContext, rd_rulescxt);
 		}
		/* pgstat_info must be preserved */
		SWAPFIELD(struct PgStat_TableStatus *, pgstat_info);

		/* preserve persistent table information for the relation  */
		SWAPFIELD(struct RelationNodeInfo, rd_segfile0_relationnodeinfo);

#undef SWAPFIELD

		/* And now we can throw away the temporary entry */
		RelationDestroyRelation(newrel);
2422
	}
2423 2424
}

2425
/*
2426 2427 2428 2429 2430
 * RelationFlushRelation
 *
 *	 Rebuild the relation if it is open (refcount > 0), else blow it away.
 */
static void
2431
RelationFlushRelation(Relation relation)
2432
{
2433
	bool		rebuild;
2434

2435 2436
	if (relation->rd_createSubid != InvalidSubTransactionId ||
		relation->rd_newRelfilenodeSubid != InvalidSubTransactionId)
2437 2438
	{
		/*
2439 2440
		 * New relcache entries are always rebuilt, not flushed; else we'd
		 * forget the "new" status of the relation, which is a useful
2441
		 * optimization to have.  Ditto for the new-relfilenode status.
2442
		 */
2443
		rebuild = true;
2444 2445 2446 2447
	}
	else
	{
		/*
2448
		 * Pre-existing rels can be dropped from the relcache if not open.
2449
		 */
2450
		rebuild = !RelationHasReferenceCountZero(relation);
2451
	}
2452

2453
	RelationClearRelation(relation, rebuild);
2454 2455
}

2456
/*
2457
 * RelationForgetRelation - unconditionally remove a relcache entry
2458
 *
2459 2460
 *		   External interface for destroying a relcache entry when we
 *		   drop the relation.
2461 2462
 */
void
2463
RelationForgetRelation(Oid rid)
2464
{
2465
	Relation	relation;
2466 2467 2468

	RelationIdCacheLookup(rid, relation);

2469 2470 2471 2472
	if (!PointerIsValid(relation))
		return;					/* not in cache, nothing to do */

	if (!RelationHasReferenceCountZero(relation))
2473
		elog(ERROR, "relation %u is still open", rid);
2474 2475 2476

	/* Unconditionally destroy the relcache entry */
	RelationClearRelation(relation, false);
2477 2478
}

2479
/*
2480
 *		RelationCacheInvalidateEntry
2481 2482 2483
 *
 *		This routine is invoked for SI cache flush messages.
 *
2484 2485
 * Any relcache entry matching the relid must be flushed.  (Note: caller has
 * already determined that the relid belongs to our database or is a shared
2486
 * relation.)
2487 2488 2489 2490 2491 2492
 *
 * We used to skip local relations, on the grounds that they could
 * not be targets of cross-backend SI update messages; but it seems
 * safer to process them, so that our *own* SI update messages will
 * have the same effects during CommandCounterIncrement for both
 * local and nonlocal relations.
2493 2494
 */
void
2495
RelationCacheInvalidateEntry(Oid relationId)
2496
{
2497
	Relation	relation;
2498 2499 2500

	RelationIdCacheLookup(relationId, relation);

2501
	if (PointerIsValid(relation))
2502
	{
2503
		relcacheInvalsReceived++;
2504
		RelationFlushRelation(relation);
2505
	}
2506 2507 2508 2509
}

/*
 * RelationCacheInvalidate
2510
 *	 Blow away cached relation descriptors that have zero reference counts,
B
Bruce Momjian 已提交
2511
 *	 and rebuild those with positive reference counts.	Also reset the smgr
2512
 *	 relation cache.
2513
 *
2514
 *	 This is currently used only to recover from SI message buffer overflow,
2515
 *	 so we do not touch new-in-transaction relations; they cannot be targets
2516 2517
 *	 of cross-backend SI updates (and our own updates now go through a
 *	 separate linked list that isn't limited by the SI message buffer size).
2518 2519 2520
 *	 Likewise, we need not discard new-relfilenode-in-transaction hints,
 *	 since any invalidation of those would be a local event.
 *
2521 2522 2523
 *	 We don't do anything special for newRelfilenode-in-transaction relations, 
 *	 though since we have a lock on the relation nobody else should be 
 *	 generating cache invalidation messages for it anyhow.
2524 2525 2526
 *
 *	 We do this in two phases: the first pass deletes deletable items, and
 *	 the second one rebuilds the rebuildable items.  This is essential for
2527
 *	 safety, because hash_seq_search only copes with concurrent deletion of
B
Bruce Momjian 已提交
2528
 *	 the element it is currently visiting.	If a second SI overflow were to
2529 2530 2531 2532
 *	 occur while we are walking the table, resulting in recursive entry to
 *	 this routine, we could crash because the inner invocation blows away
 *	 the entry next to be visited by the outer scan.  But this way is OK,
 *	 because (a) during the first pass we won't process any more SI messages,
2533
 *	 so hash_seq_search will complete safely; (b) during the second pass we
2534
 *	 only hold onto pointers to nondeletable entries.
2535 2536 2537 2538 2539 2540
 *
 *	 The two-phase approach also makes it easy to ensure that we process
 *	 nailed-in-cache indexes before other nondeletable items, and that we
 *	 process pg_class_oid_index first of all.  In scenarios where a nailed
 *	 index has been given a new relfilenode, we have to detect that update
 *	 before the nailed index is used in reloading any other relcache entry.
2541 2542
 */
void
2543
RelationCacheInvalidate(void)
2544
{
2545
	HASH_SEQ_STATUS status;
2546
	RelIdCacheEnt *idhentry;
2547
	Relation	relation;
2548
	List	   *rebuildFirstList = NIL;
B
Bruce Momjian 已提交
2549
	List	   *rebuildList = NIL;
2550
	ListCell   *l;
2551 2552

	/* Phase 1 */
2553
	hash_seq_init(&status, RelationIdCache);
2554

2555
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2556
	{
2557
		relation = idhentry->reldesc;
2558

2559
		/* Must close all smgr references to avoid leaving dangling ptrs */
2560
		RelationCloseSmgr(relation);
2561

2562
		/* Ignore new relations, since they are never SI targets */
2563
		if (relation->rd_createSubid != InvalidSubTransactionId)
2564
			continue;
2565

2566 2567
		relcacheInvalsReceived++;

2568
		if (RelationHasReferenceCountZero(relation))
2569 2570
		{
			/* Delete this entry immediately */
2571
			Assert(!relation->rd_isnailed);
2572 2573 2574 2575
			RelationClearRelation(relation, false);
		}
		else
		{
2576 2577
			/*
			 * Add this entry to list of stuff to rebuild in second pass.
B
Bruce Momjian 已提交
2578 2579
			 * pg_class_oid_index goes on the front of rebuildFirstList, other
			 * nailed indexes on the back, and everything else into
2580 2581 2582 2583 2584
			 * rebuildList (in no particular order).
			 */
			if (relation->rd_isnailed &&
				relation->rd_rel->relkind == RELKIND_INDEX)
			{
2585
				if (RelationGetRelid(relation) == ClassOidIndexId)
2586 2587 2588 2589 2590 2591
					rebuildFirstList = lcons(relation, rebuildFirstList);
				else
					rebuildFirstList = lappend(rebuildFirstList, relation);
			}
			else
				rebuildList = lcons(relation, rebuildList);
2592
		}
2593
	}
2594

2595
	/*
B
Bruce Momjian 已提交
2596 2597 2598
	 * Now zap any remaining smgr cache entries.  This must happen before we
	 * start to rebuild entries, since that may involve catalog fetches which
	 * will re-open catalog files.
2599 2600 2601
	 */
	smgrcloseall();

2602
	/* Phase 2: rebuild the items found to need rebuild in phase 1 */
2603 2604 2605 2606 2607 2608
	foreach(l, rebuildFirstList)
	{
		relation = (Relation) lfirst(l);
		RelationClearRelation(relation, true);
	}
	list_free(rebuildFirstList);
2609
	foreach(l, rebuildList)
2610
	{
2611 2612
		relation = (Relation) lfirst(l);
		RelationClearRelation(relation, true);
2613
	}
2614
	list_free(rebuildList);
2615
}
2616

2617
/*
2618
 * AtEOXact_RelationCache
2619
 *
2620
 *	Clean up the relcache at main-transaction commit or abort.
2621 2622 2623 2624 2625
 *
 * Note: this must be called *before* processing invalidation messages.
 * In the case of abort, we don't want to try to rebuild any invalidated
 * cache entries (since we can't safely do database accesses).  Therefore
 * we must reset refcnts before handling pending invalidations.
2626 2627 2628 2629 2630 2631
 *
 * As of PostgreSQL 8.1, relcache refcnts should get released by the
 * ResourceOwner mechanism.  This routine just does a debugging
 * cross-check that no pins remain.  However, we also need to do special
 * cleanup when the current transaction created any relations or made use
 * of forced index lists.
2632 2633
 */
void
2634
AtEOXact_RelationCache(bool isCommit)
2635
{
2636
	HASH_SEQ_STATUS status;
2637
	RelIdCacheEnt *idhentry;
2638

2639 2640
	/*
	 * To speed up transaction exit, we want to avoid scanning the relcache
B
Bruce Momjian 已提交
2641 2642 2643 2644
	 * unless there is actually something for this routine to do.  Other than
	 * the debug-only Assert checks, most transactions don't create any work
	 * for us to do here, so we keep a static flag that gets set if there is
	 * anything to do.	(Currently, this means either a relation is created in
2645 2646 2647 2648
	 * the current xact, or one is given a new relfilenode, or an index list
	 * is forced.)  For simplicity, the flag remains set till end of top-level
	 * transaction, even though we could clear it at subtransaction end in
	 * some cases.
2649 2650 2651
	 *
	 * MPP-3333: READERS need to *always* scan, otherwise they will not be able
	 * to maintain a coherent view of the storage layer.
2652
	 */
2653 2654
	if (!need_eoxact_work &&
		DistributedTransactionContext != DTX_CONTEXT_QE_READER
2655 2656 2657 2658 2659 2660
#ifdef USE_ASSERT_CHECKING
		&& !assert_enabled
#endif
		)
		return;

2661
	hash_seq_init(&status, RelationIdCache);
2662

2663
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2664
	{
2665
		Relation	relation = idhentry->reldesc;
2666 2667 2668 2669 2670

		/*
		 * The relcache entry's ref count should be back to its normal
		 * not-in-a-transaction state: 0 unless it's nailed in cache.
		 *
B
Bruce Momjian 已提交
2671 2672 2673
		 * In bootstrap mode, this is NOT true, so don't check it --- the
		 * bootstrap code expects relations to stay open across start/commit
		 * transaction calls.  (That seems bogus, but it's not worth fixing.)
2674 2675 2676 2677 2678 2679 2680 2681 2682 2683
		 */
#ifdef USE_ASSERT_CHECKING
		if (!IsBootstrapProcessingMode())
		{
			int			expected_refcnt;

			expected_refcnt = relation->rd_isnailed ? 1 : 0;
			Assert(relation->rd_refcnt == expected_refcnt);
		}
#endif
2684

2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697
		/*
		 * QE-readers aren't properly enrolled in transactions, they
		 * just get the snapshot which corresponds -- so here, where
		 * we are maintaining their relcache, we want to just clean
		 * up (almost as if we had aborted). (MPP-3338)
		 */
		if (DistributedTransactionContext == DTX_CONTEXT_QE_ENTRY_DB_SINGLETON ||
			DistributedTransactionContext == DTX_CONTEXT_QE_READER)
		{
			RelationClearRelation(relation, false);
			continue;
		}

2698 2699 2700
		/*
		 * Is it a relation created in the current transaction?
		 *
B
Bruce Momjian 已提交
2701 2702 2703 2704 2705 2706
		 * During commit, reset the flag to zero, since we are now out of the
		 * creating transaction.  During abort, simply delete the relcache
		 * entry --- it isn't interesting any longer.  (NOTE: if we have
		 * forgotten the new-ness of a new relation due to a forced cache
		 * flush, the entry will get deleted anyway by shared-cache-inval
		 * processing of the aborted pg_class insertion.)
2707
		 */
2708
		if (relation->rd_createSubid != InvalidSubTransactionId)
2709
		{
2710
			if (isCommit)
2711
				relation->rd_createSubid = InvalidSubTransactionId;
2712 2713
			else
			{
2714 2715 2716 2717 2718
				/*
				 * In abort, delete the error log file before forgetting
				 * this relation.
				 */
				ErrorLogDelete(MyDatabaseId, RelationGetRelid(relation));
2719 2720 2721 2722 2723
				RelationClearRelation(relation, false);
				continue;
			}
		}

2724 2725 2726
		/*
		 * Likewise, reset the hint about the relfilenode being new.
		 */
2727
		relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
2728

2729 2730 2731 2732 2733
		/*
		 * Flush any temporary index list.
		 */
		if (relation->rd_indexvalid == 2)
		{
2734
			list_free(relation->rd_indexlist);
2735
			relation->rd_indexlist = NIL;
2736
			relation->rd_oidindex = InvalidOid;
2737 2738
			relation->rd_indexvalid = 0;
		}
2739
	}
2740

2741 2742
	/* Once done with the transaction, we can reset need_eoxact_work */
	need_eoxact_work = false;
2743
}
2744

2745 2746 2747 2748 2749 2750 2751 2752
/*
 * AtEOSubXact_RelationCache
 *
 *	Clean up the relcache at sub-transaction commit or abort.
 *
 * Note: this must be called *before* processing invalidation messages.
 */
void
2753 2754
AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
						  SubTransactionId parentSubid)
2755 2756 2757 2758
{
	HASH_SEQ_STATUS status;
	RelIdCacheEnt *idhentry;

2759
	/*
2760 2761
	 * Skip the relcache scan if nothing to do --- see notes for
	 * AtEOXact_RelationCache.
2762
	 */
2763 2764
	if (!need_eoxact_work &&
		DistributedTransactionContext != DTX_CONTEXT_QE_READER)
2765 2766
		return;

2767 2768 2769 2770 2771 2772
	hash_seq_init(&status, RelationIdCache);

	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
	{
		Relation	relation = idhentry->reldesc;

2773 2774 2775 2776 2777 2778 2779 2780
		/*
		 * As opposed to AtEOXact_RelationCache, subtransactions
		 * in readers are only caused by internal commands, and
		 * there shouldn't be interaction with global transactions,
		 * (reader gangs commit their transaction independently)
		 * we must not clear the relcache here.
		 */

2781 2782 2783
		/*
		 * Is it a relation created in the current subtransaction?
		 *
2784 2785
		 * During subcommit, mark it as belonging to the parent, instead.
		 * During subabort, simply delete the relcache entry.
2786
		 */
2787
		if (relation->rd_createSubid == mySubid)
2788 2789
		{
			if (isCommit)
2790
				relation->rd_createSubid = parentSubid;
2791 2792 2793
			else
			{
				Assert(RelationHasReferenceCountZero(relation));
2794 2795 2796 2797 2798
				/*
				 * In abort, delete the error log file before forgetting
				 * this relation.
				 */
				ErrorLogDelete(MyDatabaseId, RelationGetRelid(relation));
2799 2800 2801 2802 2803
				RelationClearRelation(relation, false);
				continue;
			}
		}

2804 2805 2806
		/*
		 * Likewise, update or drop any new-relfilenode-in-subtransaction hint.
		 */
2807 2808 2809 2810 2811 2812 2813
		if (relation->rd_newRelfilenodeSubid == mySubid)
		{
			if (isCommit)
				relation->rd_newRelfilenodeSubid = parentSubid;
			else
			 relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
		}
2814 2815 2816 2817 2818 2819 2820 2821

		/*
		 * Flush any temporary index list.
		 */
		if (relation->rd_indexvalid == 2)
		{
			list_free(relation->rd_indexlist);
			relation->rd_indexlist = NIL;
2822
			relation->rd_oidindex = InvalidOid;
2823 2824 2825 2826 2827
			relation->rd_indexvalid = 0;
		}
	}
}

2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844
/*
 * RelationCacheMarkNewRelfilenode
 *
 *	Mark the rel as having been given a new relfilenode in the current
 *	(sub) transaction.  This is a hint that can be used to optimize
 *	later operations on the rel in the same transaction.
 */
void
RelationCacheMarkNewRelfilenode(Relation rel)
{
	/* Mark it... */
	rel->rd_newRelfilenodeSubid = GetCurrentSubTransactionId();
	/* ... and now we have eoxact cleanup work to do */
	need_eoxact_work = true;
}


2845
/*
2846 2847 2848
 *		RelationBuildLocalRelation
 *			Build a relcache entry for an about-to-be-created relation,
 *			and enter it into the relcache.
2849
 */
2850 2851
Relation
RelationBuildLocalRelation(const char *relname,
2852
						   Oid relnamespace,
2853
						   TupleDesc tupDesc,
2854 2855
						   Oid relid,
						   Oid reltablespace,
2856
			               char relkind,            /*CDB*/
2857
						   bool shared_relation)
2858
{
2859
	Relation	rel;
2860
	MemoryContext oldcxt;
2861 2862
	int			natts = tupDesc->natts;
	int			i;
2863
	bool		has_not_null;
2864
	bool		nailit;
2865

2866
	AssertArg(natts >= 0);
2867

2868 2869 2870
	/*
	 * check for creation of a rel that must be nailed in cache.
	 *
2871 2872
	 * XXX this list had better match the relations specially handled in
	 * RelationCacheInitializePhase2/3.
2873 2874 2875
	 */
	switch (relid)
	{
2876 2877 2878
		case DatabaseRelationId:
		case AuthIdRelationId:
		case AuthMemRelationId:
2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889
		case RelationRelationId:
		case AttributeRelationId:
		case ProcedureRelationId:
		case TypeRelationId:
			nailit = true;
			break;
		default:
			nailit = false;
			break;
	}

2890 2891
	/*
	 * check that hardwired list of shared rels matches what's in the
B
Bruce Momjian 已提交
2892 2893 2894
	 * bootstrap .bki file.  If you get a failure here during initdb, you
	 * probably need to fix IsSharedRelation() to match whatever you've done
	 * to the set of shared relations.
2895 2896 2897 2898 2899
	 */
	if (shared_relation != IsSharedRelation(relid))
		elog(ERROR, "shared_relation flag for \"%s\" does not match IsSharedRelation(%u)",
			 relname, relid);

2900 2901 2902 2903 2904
	/*
	 * switch to the cache context to create the relcache entry.
	 */
	if (!CacheMemoryContext)
		CreateCacheMemoryContext();
2905

2906 2907
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

2908
	/*
2909
	 * allocate a new relation descriptor and fill in basic state fields.
2910
	 */
2911
	rel = (Relation) palloc0(sizeof(RelationData));
2912

2913 2914 2915
	rel->rd_targblock = InvalidBlockNumber;

	/* make sure relation is marked as having no open file yet */
2916
	rel->rd_smgr = NULL;
2917

2918 2919 2920
	/* mark it nailed if appropriate */
	rel->rd_isnailed = nailit;

2921
	rel->rd_refcnt = nailit ? 1 : 0;
2922

2923
	/* it's being created in this transaction */
2924
	rel->rd_createSubid = GetCurrentSubTransactionId();
2925
	rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
2926

2927
	/* must flag that we have rels created in this transaction */
2928
	need_eoxact_work = true;
2929

2930 2931 2932
	/* is it a temporary relation? */
	rel->rd_istemp = isTempNamespace(relnamespace);

2933 2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947
	/* is it a system catalog? */
	rel->rd_issyscat = (strncmp(relname, "pg_", 3) == 0);

    /*
     * CDB: On QEs, temp relations must use shared buffer cache so data
     * will be visible to all segmates.  On QD, sequence objects must
     * use shared buffer cache so data will be visible to sequence server.
     */
    if (rel->rd_istemp &&
        relkind != RELKIND_SEQUENCE &&
        Gp_role != GP_ROLE_EXECUTE)
        rel->rd_isLocalBuf = true;
    else
        rel->rd_isLocalBuf = false;

2948
	/*
2949
	 * create a new tuple descriptor from the one passed in.  We do this
B
Bruce Momjian 已提交
2950 2951 2952 2953
	 * partly to copy it into the cache context, and partly because the new
	 * relation can't have any defaults or constraints yet; they have to be
	 * added in later steps, because they require additions to multiple system
	 * catalogs.  We can copy attnotnull constraints here, however.
2954
	 */
2955
	rel->rd_att = CreateTupleDescCopy(tupDesc);
2956
	rel->rd_att->tdrefcount = 1;	/* mark as refcounted */
2957
	has_not_null = false;
2958
	for (i = 0; i < natts; i++)
2959
	{
2960
		rel->rd_att->attrs[i]->attnotnull = tupDesc->attrs[i]->attnotnull;
2961 2962 2963 2964 2965 2966 2967 2968 2969 2970
		has_not_null |= tupDesc->attrs[i]->attnotnull;
	}

	if (has_not_null)
	{
		TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

		constr->has_not_null = true;
		rel->rd_att->constr = constr;
	}
2971 2972 2973 2974

	/*
	 * initialize relation tuple form (caller may add/override data later)
	 */
2975
	rel->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
2976

2977 2978
	namestrcpy(&rel->rd_rel->relname, relname);
	rel->rd_rel->relnamespace = relnamespace;
2979 2980

	rel->rd_rel->relkind = RELKIND_UNCATALOGED;
2981
	rel->rd_rel->relstorage = RELSTORAGE_HEAP;
2982
	rel->rd_rel->relhasoids = rel->rd_att->tdhasoid;
2983 2984
	rel->rd_rel->relnatts = natts;
	rel->rd_rel->reltype = InvalidOid;
2985 2986
	/* needed when bootstrapping: */
	rel->rd_rel->relowner = BOOTSTRAP_SUPERUSERID;
2987

2988 2989 2990 2991 2992 2993 2994
	/*
	 * Create zeroed-out gp_relation_node data.  It will be filled in when the
	 * disk file is created.
	 */
	rel->rd_segfile0_relationnodeinfo.isPresent = false;
	rel->rd_segfile0_relationnodeinfo.tidAllowedToBeZero = false;

2995
	/*
B
Bruce Momjian 已提交
2996 2997 2998
	 * Insert relation physical and logical identifiers (OIDs) into the right
	 * places.	Note that the physical ID (relfilenode) is initially the same
	 * as the logical ID (OID).
2999
	 */
3000
	rel->rd_rel->relisshared = shared_relation;
3001 3002 3003 3004 3005 3006

	RelationGetRelid(rel) = relid;

	for (i = 0; i < natts; i++)
		rel->rd_att->attrs[i]->attrelid = relid;

3007 3008
	rel->rd_rel->relfilenode = relid;
	rel->rd_rel->reltablespace = reltablespace;
3009

3010
	RelationInitLockInfo(rel);	/* see lmgr.c */
3011

3012 3013
	RelationInitPhysicalAddr(rel);

3014 3015 3016 3017
	/*
	 * Okay to insert into the relcache hash tables.
	 */
	RelationCacheInsert(rel);
3018

3019 3020 3021
	/*
	 * done building relcache entry.
	 */
3022
	MemoryContextSwitchTo(oldcxt);
3023

3024 3025 3026
	/* It's fully valid */
	rel->rd_isvalid = true;

3027 3028 3029 3030 3031
	/*
	 * Caller expects us to pin the returned entry.
	 */
	RelationIncrementReferenceCount(rel);

3032
	return rel;
3033 3034
}

3035
/*
3036
 *		RelationCacheInitialize
3037
 *
3038 3039
 *		This initializes the relation descriptor cache.  At the time
 *		that this is invoked, we can't do database access yet (mainly
3040 3041 3042 3043 3044
 *		because the transaction subsystem is not up); all we are doing
 *		is making an empty cache hashtable.  This must be done before
 *		starting the initialization transaction, because otherwise
 *		AtEOXact_RelationCache would crash if that transaction aborts
 *		before we can get the relcache set up.
3045 3046
 */

3047
#define INITRELCACHESIZE		400
3048 3049

void
3050
RelationCacheInitialize(void)
3051
{
3052 3053
	MemoryContext oldcxt;
	HASHCTL		ctl;
3054

3055
	/*
3056
	 * make sure cache memory context exists
3057
	 */
3058 3059
	if (!CacheMemoryContext)
		CreateCacheMemoryContext();
3060

3061 3062 3063
    /*
	 * switch to cache memory context
	 */
3064
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3065

3066
	/*
3067
	 * create hashtable that indexes the relcache
3068
	 */
3069
	MemSet(&ctl, 0, sizeof(ctl));
3070
	ctl.keysize = sizeof(Oid);
3071
	ctl.entrysize = sizeof(RelIdCacheEnt);
3072
	ctl.hash = oid_hash;
3073 3074
	RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
								  &ctl, HASH_ELEM | HASH_FUNCTION);
3075

3076 3077 3078 3079 3080 3081
	MemoryContextSwitchTo(oldcxt);
}

/*
 *		RelationCacheInitializePhase2
 *
3082 3083 3084 3085 3086 3087 3088
 *		This is called to prepare for access to shared catalogs during startup.
 *		We must at least set up nailed reldescs for pg_database, pg_authid,
 *		and pg_auth_members.  Ideally we'd like to have reldescs for their
 *		indexes, too.  We attempt to load this information from the shared
 *		relcache init file.  If that's missing or broken, just make phony
 *		entries for the catalogs themselves.  RelationCacheInitializePhase3
 *		will clean up as needed.
3089 3090 3091
 */
void
RelationCacheInitializePhase2(void)
3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141
{
	MemoryContext oldcxt;

	/*
	 * In bootstrap mode, the shared catalogs aren't there yet anyway, so do
	 * nothing.
	 */
	if (IsBootstrapProcessingMode())
		return;

	/*
	 * switch to cache memory context
	 */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

	/*
	 * Try to load the shared relcache cache file.	If unsuccessful, bootstrap
	 * the cache with pre-made descriptors for the critical shared catalogs.
	 */
	if (!load_relcache_init_file(true))
	{
		formrdesc("pg_database", PG_DATABASE_RELTYPE_OID, true,
				  true, Natts_pg_database, Desc_pg_database);
		formrdesc("pg_authid", PG_AUTHID_RELTYPE_OID, true,
				  true, Natts_pg_authid, Desc_pg_authid);
		formrdesc("pg_auth_members", PG_AUTH_MEMBERS_RELTYPE_OID, true,
				  false, Natts_pg_auth_members, Desc_pg_auth_members);

#define NUM_CRITICAL_SHARED_RELS	3	/* fix if you change list above */
	}

	MemoryContextSwitchTo(oldcxt);
}

/*
 *		RelationCacheInitializePhase3
 *
 *		This is called as soon as the catcache and transaction system
 *		are functional and we have determined MyDatabaseId.  At this point
 *		we can actually read data from the database's system catalogs.
 *		We first try to read pre-computed relcache entries from the local
 *		relcache init file.  If that's missing or broken, make phony entries
 *		for the minimum set of nailed-in-cache relations.  Then (unless
 *		bootstrapping) make sure we have entries for the critical system
 *		indexes.  Once we've done all this, we have enough infrastructure to
 *		open any system catalog or use any catcache.  The last step is to
 *		rewrite the cache files if needed.
 */
void
RelationCacheInitializePhase3(void)
3142 3143 3144 3145
{
	HASH_SEQ_STATUS status;
	RelIdCacheEnt *idhentry;
	MemoryContext oldcxt;
3146
	bool		needNewCacheFile = !criticalSharedRelcachesBuilt;
3147

3148
	/*
3149 3150 3151 3152 3153
	 * switch to cache memory context
	 */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

	/*
3154 3155
	 * Try to load the local relcache cache file.  If unsuccessful, bootstrap
	 * the cache with pre-made descriptors for the critical "nailed-in" system
3156
	 * catalogs.
3157
	 */
3158
	if (IsBootstrapProcessingMode() ||
3159
		!load_relcache_init_file(false))
3160
	{
3161 3162
		needNewCacheFile = true;

3163
		formrdesc("pg_class", PG_CLASS_RELTYPE_OID, false,
3164
				  true, Natts_pg_class, Desc_pg_class);
3165
		formrdesc("pg_attribute", PG_ATTRIBUTE_RELTYPE_OID, false,
3166
				  false, Natts_pg_attribute, Desc_pg_attribute);
3167
		formrdesc("pg_proc", PG_PROC_RELTYPE_OID, false,
3168
				  true, Natts_pg_proc, Desc_pg_proc);
3169
		formrdesc("pg_type", PG_TYPE_RELTYPE_OID, false,
3170
				  true, Natts_pg_type, Desc_pg_type);
3171

3172
#define NUM_CRITICAL_LOCAL_RELS 4		/* fix if you change list above */
3173
	}
3174 3175

	MemoryContextSwitchTo(oldcxt);
3176

3177
	/* In bootstrap mode, the faked-up formrdesc info is all we'll have */
3178 3179 3180
	if (IsBootstrapProcessingMode())
		return;

3181
	/*
B
Bruce Momjian 已提交
3182
	 * If we didn't get the critical system indexes loaded into relcache, do
3183 3184
	 * so now.	These are critical because the catcache and/or opclass cache
	 * depend on them for fetches done during relcache load.  Thus, we have an
B
Bruce Momjian 已提交
3185 3186 3187 3188 3189 3190
	 * infinite-recursion problem.	We can break the recursion by doing
	 * heapscans instead of indexscans at certain key spots. To avoid hobbling
	 * performance, we only want to do that until we have the critical indexes
	 * loaded into relcache.  Thus, the flag criticalRelcachesBuilt is used to
	 * decide whether to do heapscan or indexscan at the key spots, and we set
	 * it true after we've loaded the critical indexes.
3191
	 *
B
Bruce Momjian 已提交
3192 3193 3194 3195 3196 3197
	 * The critical indexes are marked as "nailed in cache", partly to make it
	 * easy for load_relcache_init_file to count them, but mainly because we
	 * cannot flush and rebuild them once we've set criticalRelcachesBuilt to
	 * true.  (NOTE: perhaps it would be possible to reload them by
	 * temporarily setting criticalRelcachesBuilt to false again.  For now,
	 * though, we just nail 'em in.)
3198 3199 3200 3201
	 *
	 * RewriteRelRulenameIndexId and TriggerRelidNameIndexId are not critical
	 * in the same way as the others, because the critical catalogs don't
	 * (currently) have any rules or triggers, and so these indexes can be
B
Bruce Momjian 已提交
3202
	 * rebuilt without inducing recursion.	However they are used during
3203 3204
	 * relcache load when a rel does have rules or triggers, so we choose to
	 * nail them for performance reasons.
3205
	 */
B
Bruce Momjian 已提交
3206
	if (!criticalRelcachesBuilt)
3207
	{
3208 3209 3210 3211 3212 3213
		load_critical_index(ClassOidIndexId,
							RelationRelationId);
		load_critical_index(AttributeRelidNumIndexId,
							AttributeRelationId);
		load_critical_index(IndexRelidIndexId,
							IndexRelationId);
3214 3215
		load_critical_index(OpclassOidIndexId,
							OperatorClassRelationId);
3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226
		load_critical_index(AccessMethodStrategyIndexId,
							AccessMethodOperatorRelationId);
		load_critical_index(OpclassOidIndexId,
							OperatorClassRelationId);
		load_critical_index(AccessMethodProcedureIndexId,
							AccessMethodProcedureRelationId);
		load_critical_index(RewriteRelRulenameIndexId,
							RewriteRelationId);
		load_critical_index(TriggerRelidNameIndexId,
							TriggerRelationId);

3227
#define NUM_CRITICAL_LOCAL_INDEXES	9	/* fix if you change list above */
3228 3229 3230 3231

		criticalRelcachesBuilt = true;
	}

3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256 3257 3258 3259
	/*
	 * Process critical shared indexes too.
	 *
	 * DatabaseNameIndexId isn't critical for relcache loading, but rather for
	 * initial lookup of MyDatabaseId, without which we'll never find any
	 * non-shared catalogs at all.	Autovacuum calls InitPostgres with a
	 * database OID, so it instead depends on DatabaseOidIndexId.  We also
	 * need to nail up some indexes on pg_authid and pg_auth_members for use
	 * during client authentication.
	 */
	if (!criticalSharedRelcachesBuilt)
	{
		load_critical_index(DatabaseNameIndexId,
							DatabaseRelationId);
		load_critical_index(DatabaseOidIndexId,
							DatabaseRelationId);
		load_critical_index(AuthIdRolnameIndexId,
							AuthIdRelationId);
		load_critical_index(AuthIdOidIndexId,
							AuthIdRelationId);
		load_critical_index(AuthMemMemRoleIndexId,
							AuthMemRelationId);

#define NUM_CRITICAL_SHARED_INDEXES 5	/* fix if you change list above */

		criticalSharedRelcachesBuilt = true;
	}

3260
	/*
B
Bruce Momjian 已提交
3261 3262 3263 3264 3265 3266
	 * Now, scan all the relcache entries and update anything that might be
	 * wrong in the results from formrdesc or the relcache cache file. If we
	 * faked up relcache entries using formrdesc, then read the real pg_class
	 * rows and replace the fake entries with them. Also, if any of the
	 * relcache entries have rules or triggers, load that info the hard way
	 * since it isn't recorded in the cache file.
3267 3268 3269 3270 3271 3272 3273 3274
	 *
	 * Whenever we access the catalogs to read data, there is a possibility of
	 * a shared-inval cache flush causing relcache entries to be removed.
	 * Since hash_seq_search only guarantees to still work after the *current*
	 * entry is removed, it's unsafe to continue the hashtable scan afterward.
	 * We handle this by restarting the scan from scratch after each access.
	 * This is theoretically O(N^2), but the number of entries that actually
	 * need to be fixed is small enough that it doesn't matter.
3275
	 */
3276
	hash_seq_init(&status, RelationIdCache);
3277

3278
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3279
	{
3280
		Relation	relation = idhentry->reldesc;
3281 3282 3283 3284 3285 3286
		bool		restart = false;

		/*
		 * Make sure *this* entry doesn't get flushed while we work with it.
		 */
		RelationIncrementReferenceCount(relation);
3287

3288
		/*
3289
		 * If it's a faked-up entry, read the real pg_class tuple.
3290
		 */
3291
		if (relation->rd_rel->relowner == InvalidOid)
3292 3293 3294
		{
			HeapTuple	htup;
			Form_pg_class relp;
B
Bruce Momjian 已提交
3295

3296
			htup = SearchSysCache(RELOID,
3297
							   ObjectIdGetDatum(RelationGetRelid(relation)),
3298 3299
								  0, 0, 0);
			if (!HeapTupleIsValid(htup))
3300 3301
				elog(FATAL, "cache lookup failed for relation %u",
					 RelationGetRelid(relation));
3302
			relp = (Form_pg_class) GETSTRUCT(htup);
B
Bruce Momjian 已提交
3303

3304 3305 3306 3307 3308
			/*
			 * Copy tuple to relation->rd_rel. (See notes in
			 * AllocateRelationDesc())
			 */
			memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
3309

3310 3311 3312 3313 3314
			/* Update rd_options while we have the tuple */
			if (relation->rd_options)
				pfree(relation->rd_options);
			RelationParseRelOptions(relation, htup);

3315
			/*
3316 3317 3318 3319
			 * Check the values in rd_att were set up correctly.  (We cannot
			 * just copy them over now: formrdesc must have set up the rd_att
			 * data correctly to start with, because it may already have been
			 * copied into one or more catcache entries.)
3320
			 */
3321 3322 3323
			Assert(relation->rd_att->tdtypeid == relp->reltype);
			Assert(relation->rd_att->tdtypmod == -1);
			Assert(relation->rd_att->tdhasoid == relp->relhasoids);
3324

3325
			ReleaseSysCache(htup);
3326 3327 3328 3329 3330 3331 3332

			/* relowner had better be OK now, else we'll loop forever */
			if (relation->rd_rel->relowner == InvalidOid)
				elog(ERROR, "invalid relowner in pg_class entry for \"%s\"",
					 RelationGetRelationName(relation));

			restart = true;
3333 3334 3335 3336
		}

		/*
		 * Fix data that isn't saved in relcache cache file.
3337 3338 3339 3340 3341
		 *
		 * relhasrules or relhastriggers could possibly be wrong or out of
		 * date.  If we don't actually find any rules or triggers, clear the
		 * local copy of the flag so that we don't get into an infinite loop
		 * here.  We don't make any attempt to fix the pg_class entry, though.
3342 3343
		 */
		if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
3344
		{
3345
			RelationBuildRuleLock(relation);
3346 3347 3348 3349
			if (relation->rd_rules == NULL)
				relation->rd_rel->relhasrules = false;
			restart = true;
		}
3350
		if (relation->rd_rel->reltriggers > 0 && relation->trigdesc == NULL)
3351
		{
3352
			RelationBuildTriggers(relation);
3353 3354 3355 3356 3357 3358 3359 3360 3361 3362 3363 3364 3365 3366
			if (relation->trigdesc == NULL)
				relation->rd_rel->reltriggers = 0;
			restart = true;
		}

		/* Release hold on the relation */
		RelationDecrementReferenceCount(relation);

		/* Now, restart the hashtable scan if needed */
		if (restart)
		{
			hash_seq_term(&status);
			hash_seq_init(&status, RelationIdCache);
		}
3367
	}
3368

3369
	/*
3370 3371
	 * Lastly, write out new relcache cache files if needed.  We don't bother
	 * to distinguish cases where only one of the two needs an update.
3372
	 */
3373 3374 3375
	if (needNewCacheFile)
	{
		/*
B
Bruce Momjian 已提交
3376 3377 3378
		 * Force all the catcaches to finish initializing and thereby open the
		 * catalogs and indexes they use.  This will preload the relcache with
		 * entries for all the most important system catalogs and indexes, so
3379
		 * that the init files will be most useful for future backends.
3380 3381 3382
		 */
		InitCatalogCachePhase2();

3383 3384 3385 3386 3387 3388
		/* reset initFileRelationIds list; we'll fill it during write */
		initFileRelationIds = NIL;

		/* now write the files */
		write_relcache_init_file(true);
		write_relcache_init_file(false);
3389 3390 3391
	}
}

3392 3393 3394 3395 3396 3397 3398 3399 3400 3401 3402 3403 3404 3405 3406 3407 3408 3409 3410 3411 3412 3413 3414 3415 3416 3417 3418 3419
/*
 * Load one critical system index into the relcache
 *
 * indexoid is the OID of the target index, heapoid is the OID of the catalog
 * it belongs to.
 */
static void
load_critical_index(Oid indexoid, Oid heapoid)
{
	Relation	ird;

	/*
	 * We must lock the underlying catalog before locking the index to avoid
	 * deadlock, since RelationBuildDesc might well need to read the catalog,
	 * and if anyone else is exclusive-locking this catalog and index they'll
	 * be doing it in that order.
	 */
	LockRelationOid(heapoid, AccessShareLock);
	LockRelationOid(indexoid, AccessShareLock);
	ird = RelationBuildDesc(indexoid, true);
	if (ird == NULL)
		elog(PANIC, "could not open critical system index %u", indexoid);
	ird->rd_isnailed = true;
	ird->rd_refcnt = 1;
	UnlockRelationOid(indexoid, AccessShareLock);
	UnlockRelationOid(heapoid, AccessShareLock);
}

3420
/*
3421
 * GetPgClassDescriptor -- get a predefined tuple descriptor for pg_class
3422 3423 3424
 * GetPgIndexDescriptor -- get a predefined tuple descriptor for pg_index
 *
 * We need this kluge because we have to be able to access non-fixed-width
3425 3426 3427 3428 3429 3430
 * fields of pg_class and pg_index before we have the standard catalog caches
 * available.  We use predefined data that's set up in just the same way as
 * the bootstrapped reldescs used by formrdesc().  The resulting tupdesc is
 * not 100% kosher: it does not have the correct rowtype OID in tdtypeid, nor
 * does it have a TupleConstr field.  But it's good enough for the purpose of
 * extracting fields.
3431 3432
 */
static TupleDesc
3433 3434
BuildHardcodedDescriptor(int natts, const FormData_pg_attribute *attrs,
						 bool hasoids)
3435
{
3436
	TupleDesc	result;
3437 3438 3439 3440 3441
	MemoryContext oldcxt;
	int			i;

	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

3442
	result = CreateTemplateTupleDesc(natts, hasoids);
B
Bruce Momjian 已提交
3443
	result->tdtypeid = RECORDOID;		/* not right, but we don't care */
3444
	result->tdtypmod = -1;
3445

3446
	for (i = 0; i < natts; i++)
3447
	{
3448
		memcpy(result->attrs[i], &attrs[i], ATTRIBUTE_FIXED_PART_SIZE);
3449
		/* make sure attcacheoff is valid */
3450
		result->attrs[i]->attcacheoff = -1;
3451 3452 3453
	}

	/* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
3454
	result->attrs[0]->attcacheoff = 0;
3455 3456 3457 3458 3459

	/* Note: we don't bother to set up a TupleConstr entry */

	MemoryContextSwitchTo(oldcxt);

3460 3461 3462 3463 3464 3465 3466 3467 3468 3469 3470 3471 3472 3473 3474 3475 3476 3477 3478 3479 3480 3481 3482 3483 3484 3485 3486 3487
	return result;
}

static TupleDesc
GetPgClassDescriptor(void)
{
	static TupleDesc pgclassdesc = NULL;

	/* Already done? */
	if (pgclassdesc == NULL)
		pgclassdesc = BuildHardcodedDescriptor(Natts_pg_class,
											   Desc_pg_class,
											   true);

	return pgclassdesc;
}

static TupleDesc
GetPgIndexDescriptor(void)
{
	static TupleDesc pgindexdesc = NULL;

	/* Already done? */
	if (pgindexdesc == NULL)
		pgindexdesc = BuildHardcodedDescriptor(Natts_pg_index,
											   Desc_pg_index,
											   false);

3488 3489 3490
	return pgindexdesc;
}

3491 3492 3493
/*
 * Load any default attribute value definitions for the relation.
 */
3494
static void
3495
AttrDefaultFetch(Relation relation)
3496
{
3497 3498 3499
	AttrDefault *attrdef = relation->rd_att->constr->defval;
	int			ndef = relation->rd_att->constr->num_defval;
	Relation	adrel;
H
Hiroshi Inoue 已提交
3500
	HeapTuple	htup;
3501 3502
	cqContext	cqc;
	cqContext  *pcqCtx;
3503
	Datum		val;
3504 3505 3506
	bool		isnull;
	int			found;
	int			i;
3507

3508
	adrel = heap_open(AttrDefaultRelationId, AccessShareLock);
3509 3510 3511 3512 3513 3514 3515 3516 3517
	pcqCtx = caql_beginscan(
			caql_syscache(
					caql_indexOK(caql_addrel(cqclr(&cqc), adrel), 
								 true),
					false),
			cql("SELECT * FROM pg_attrdef "
				" WHERE adrelid = :1 ",
				ObjectIdGetDatum(RelationGetRelid(relation))));

3518
	found = 0;
3519

3520
	while (HeapTupleIsValid(htup = caql_getnext(pcqCtx)))
3521
	{
3522
		Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
3523

3524 3525 3526 3527
		for (i = 0; i < ndef; i++)
		{
			if (adform->adnum != attrdef[i].adnum)
				continue;
3528
			if (attrdef[i].adbin != NULL)
3529
				elog(WARNING, "multiple attrdef records found for attr %s of rel %s",
B
Bruce Momjian 已提交
3530
				NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
3531
					 RelationGetRelationName(relation));
3532 3533
			else
				found++;
3534

3535 3536 3537
			val = fastgetattr(htup,
							  Anum_pg_attrdef_adbin,
							  adrel->rd_att, &isnull);
3538
			if (isnull)
3539
				elog(WARNING, "null adbin for attr %s of rel %s",
B
Bruce Momjian 已提交
3540
				NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
3541
					 RelationGetRelationName(relation));
3542 3543
			else
				attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext,
3544
												   TextDatumGetCString(val));
3545 3546
			break;
		}
3547

3548
		if (i >= ndef)
3549 3550
			elog(WARNING, "unexpected attrdef record found for attr %d of rel %s",
				 adform->adnum, RelationGetRelationName(relation));
3551 3552
	}

3553
	caql_endscan(pcqCtx);
3554
	heap_close(adrel, AccessShareLock);
3555 3556

	if (found != ndef)
3557
		elog(WARNING, "%d attrdef record(s) missing for rel %s",
3558
			 ndef - found, RelationGetRelationName(relation));
3559 3560
}

3561 3562 3563
/*
 * Load any check constraints for the relation.
 */
3564
static void
3565
CheckConstraintFetch(Relation relation)
3566
{
3567 3568
	ConstrCheck *check = relation->rd_att->constr->check;
	int			ncheck = relation->rd_att->constr->num_check;
3569
	Relation	conrel;
H
Hiroshi Inoue 已提交
3570
	HeapTuple	htup;
3571 3572
	cqContext	cqc;
	cqContext  *pcqCtx;
3573
	Datum		val;
3574
	bool		isnull;
3575
	int			found = 0;
3576

3577
	conrel = heap_open(ConstraintRelationId, AccessShareLock);
3578

3579 3580 3581 3582 3583 3584 3585 3586 3587 3588
	pcqCtx = caql_beginscan(
			caql_syscache(
					caql_indexOK(caql_addrel(cqclr(&cqc), conrel), 
								 true),
						false),
			cql("SELECT * FROM pg_constraint "
				" WHERE conrelid = :1 ",
				ObjectIdGetDatum(RelationGetRelid(relation))));

	while (HeapTupleIsValid(htup = caql_getnext(pcqCtx)))
3589
	{
3590 3591 3592 3593 3594 3595
		Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);

		/* We want check constraints only */
		if (conform->contype != CONSTRAINT_CHECK)
			continue;

3596
		if (found >= ncheck)
3597 3598 3599
			elog(ERROR,
			     "pg_class reports %d constraint record(s) for rel %s, but found extra in pg_constraint",
			     ncheck, RelationGetRelationName(relation));
3600

3601
		check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
B
Bruce Momjian 已提交
3602
												  NameStr(conform->conname));
3603 3604

		/* Grab and test conbin is actually set */
3605
		val = fastgetattr(htup,
3606 3607
						  Anum_pg_constraint_conbin,
						  conrel->rd_att, &isnull);
3608
		if (isnull)
3609
			elog(ERROR, "null conbin for rel %s",
3610
				 RelationGetRelationName(relation));
3611

3612
		check[found].ccbin = MemoryContextStrdup(CacheMemoryContext,
3613
												 TextDatumGetCString(val));
3614 3615 3616
		found++;
	}

3617
	caql_endscan(pcqCtx);
3618
	heap_close(conrel, AccessShareLock);
3619 3620

	if (found != ncheck)
3621 3622 3623
		elog(ERROR,
		     "found %d in pg_constraint, but pg_class reports %d constraint record(s) for rel %s",
		     found, ncheck, RelationGetRelationName(relation));
3624 3625
}

3626 3627 3628 3629 3630 3631 3632 3633 3634 3635 3636 3637 3638 3639 3640

/*
 * RelationGetPartitioningKey -- get GpPolicy struct for distributed relation
 *
 * Returns a copy of the relation's GpPolicy object, palloc'd in
 * the caller's context.  Caller should pfree() it.  If NULL is
 * returned, relation should be accessed locally.
 */
GpPolicy*
RelationGetPartitioningKey(Relation relation)
{
    return GpPolicyCopy(CurrentMemoryContext, relation->rd_cdbpolicy);
}                                       /* RelationGetPartitioningKey */


3641 3642 3643 3644 3645 3646
/*
 * RelationGetIndexList -- get a list of OIDs of indexes on this relation
 *
 * The index list is created only if someone requests it.  We scan pg_index
 * to find relevant indexes, and add the list to the relcache entry so that
 * we won't have to compute it again.  Note that shared cache inval of a
3647
 * relcache entry will delete the old list and set rd_indexvalid to 0,
3648 3649 3650
 * so that we must recompute the index list on next request.  This handles
 * creation or deletion of an index.
 *
3651 3652 3653 3654 3655 3656
 * The returned list is guaranteed to be sorted in order by OID.  This is
 * needed by the executor, since for index types that we obtain exclusive
 * locks on when updating the index, all backends must lock the indexes in
 * the same order or we will get deadlocks (see ExecOpenIndices()).  Any
 * consistent ordering would do, but ordering by OID is easy.
 *
3657 3658
 * Since shared cache inval causes the relcache's copy of the list to go away,
 * we return a copy of the list palloc'd in the caller's context.  The caller
B
Bruce Momjian 已提交
3659
 * may list_free() the returned list after scanning it. This is necessary
3660 3661
 * since the caller will typically be doing syscache lookups on the relevant
 * indexes, and syscache lookup could cause SI messages to be processed!
3662 3663 3664 3665 3666
 *
 * We also update rd_oidindex, which this module treats as effectively part
 * of the index list.  rd_oidindex is valid when rd_indexvalid isn't zero;
 * it is the pg_class OID of a unique index on OID when the relation has one,
 * and InvalidOid if there is no such index.
3667 3668 3669 3670 3671
 */
List *
RelationGetIndexList(Relation relation)
{
	Relation	indrel;
3672
	HeapTuple	htup;
3673 3674
	cqContext	cqc;
	cqContext  *pcqCtx;
3675
	List	   *result;
3676
	Oid			oidIndex;
3677 3678 3679
	MemoryContext oldcxt;

	/* Quick exit if we already computed the list. */
3680
	if (relation->rd_indexvalid != 0)
3681
		return list_copy(relation->rd_indexlist);
3682 3683

	/*
B
Bruce Momjian 已提交
3684 3685 3686 3687
	 * We build the list we intend to return (in the caller's context) while
	 * doing the scan.	After successfully completing the scan, we copy that
	 * list into the relcache entry.  This avoids cache-context memory leakage
	 * if we get some sort of error partway through.
3688 3689
	 */
	result = NIL;
3690
	oidIndex = InvalidOid;
B
Bruce Momjian 已提交
3691

3692
	/* Prepare to scan pg_index for entries having indrelid = this rel. */
3693

3694
	indrel = heap_open(IndexRelationId, AccessShareLock);
3695

3696 3697 3698 3699 3700 3701 3702 3703 3704 3705
	pcqCtx = caql_beginscan(
			caql_syscache(
					caql_indexOK(caql_addrel(cqclr(&cqc), indrel), 
								 true),
						false),
			cql("SELECT * FROM pg_index "
				" WHERE indrelid = :1 ",
				ObjectIdGetDatum(RelationGetRelid(relation))));

	while (HeapTupleIsValid(htup = caql_getnext(pcqCtx)))
3706 3707
	{
		Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
3708

3709
		/* Add index's OID to result list in the proper order */
3710
		result = insert_ordered_oid(result, index->indexrelid);
3711 3712 3713 3714 3715 3716 3717 3718

		/* Check to see if it is a unique, non-partial btree index on OID */
		if (index->indnatts == 1 &&
			index->indisunique &&
			index->indkey.values[0] == ObjectIdAttributeNumber &&
			index->indclass.values[0] == OID_BTREE_OPS_OID &&
			heap_attisnull(htup, Anum_pg_index_indpred))
			oidIndex = index->indexrelid;
3719 3720
	}

3721
	caql_endscan(pcqCtx);
3722 3723
	heap_close(indrel, AccessShareLock);

3724
	/* Now save a copy of the completed list in the relcache entry. */
3725
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3726
	relation->rd_indexlist = list_copy(result);
3727
	relation->rd_oidindex = oidIndex;
3728
	relation->rd_indexvalid = 1;
3729 3730 3731 3732 3733
	MemoryContextSwitchTo(oldcxt);

	return result;
}

3734 3735 3736 3737 3738 3739 3740 3741 3742 3743 3744 3745
/*
 * insert_ordered_oid
 *		Insert a new Oid into a sorted list of Oids, preserving ordering
 *
 * Building the ordered list this way is O(N^2), but with a pretty small
 * constant, so for the number of entries we expect it will probably be
 * faster than trying to apply qsort().  Most tables don't have very many
 * indexes...
 */
static List *
insert_ordered_oid(List *list, Oid datum)
{
B
Bruce Momjian 已提交
3746
	ListCell   *prev;
3747 3748

	/* Does the datum belong at the front? */
3749 3750
	if (list == NIL || datum < linitial_oid(list))
		return lcons_oid(datum, list);
3751
	/* No, so find the entry it belongs after */
3752
	prev = list_head(list);
3753 3754
	for (;;)
	{
B
Bruce Momjian 已提交
3755
		ListCell   *curr = lnext(prev);
3756

3757
		if (curr == NULL || datum < lfirst_oid(curr))
B
Bruce Momjian 已提交
3758
			break;				/* it belongs after 'prev', before 'curr' */
3759 3760

		prev = curr;
3761
	}
3762 3763
	/* Insert datum into list after 'prev' */
	lappend_cell_oid(list, prev, datum);
3764 3765 3766
	return list;
}

3767 3768 3769 3770
/*
 * RelationSetIndexList -- externally force the index list contents
 *
 * This is used to temporarily override what we think the set of valid
3771 3772
 * indexes is (including the presence or absence of an OID index).
 * The forcing will be valid only until transaction commit or abort.
3773 3774 3775 3776 3777 3778 3779 3780
 *
 * This should only be applied to nailed relations, because in a non-nailed
 * relation the hacked index list could be lost at any time due to SI
 * messages.  In practice it is only used on pg_class (see REINDEX).
 *
 * It is up to the caller to make sure the given list is correctly ordered.
 */
void
3781
RelationSetIndexList(Relation relation, List *indexIds, Oid oidIndex)
3782 3783 3784
{
	MemoryContext oldcxt;

3785
	Assert(relation->rd_isnailed);
3786 3787
	/* Copy the list into the cache context (could fail for lack of mem) */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3788
	indexIds = list_copy(indexIds);
3789 3790
	MemoryContextSwitchTo(oldcxt);
	/* Okay to replace old list */
3791
	list_free(relation->rd_indexlist);
3792
	relation->rd_indexlist = indexIds;
3793
	relation->rd_oidindex = oidIndex;
B
Bruce Momjian 已提交
3794
	relation->rd_indexvalid = 2;	/* mark list as forced */
3795
	/* must flag that we have a forced index list */
3796
	need_eoxact_work = true;
3797 3798
}

3799 3800 3801 3802 3803 3804 3805 3806 3807 3808 3809
/*
 * RelationGetOidIndex -- get the pg_class OID of the relation's OID index
 *
 * Returns InvalidOid if there is no such index.
 */
Oid
RelationGetOidIndex(Relation relation)
{
	List	   *ilist;

	/*
B
Bruce Momjian 已提交
3810 3811 3812
	 * If relation doesn't have OIDs at all, caller is probably confused. (We
	 * could just silently return InvalidOid, but it seems better to throw an
	 * assertion.)
3813 3814 3815 3816 3817 3818 3819 3820 3821 3822 3823 3824 3825 3826
	 */
	Assert(relation->rd_rel->relhasoids);

	if (relation->rd_indexvalid == 0)
	{
		/* RelationGetIndexList does the heavy lifting. */
		ilist = RelationGetIndexList(relation);
		list_free(ilist);
		Assert(relation->rd_indexvalid != 0);
	}

	return relation->rd_oidindex;
}

3827 3828 3829 3830 3831 3832 3833 3834 3835 3836 3837 3838 3839 3840 3841 3842 3843 3844 3845 3846 3847 3848 3849 3850 3851 3852 3853 3854
/*
 * RelationGetIndexExpressions -- get the index expressions for an index
 *
 * We cache the result of transforming pg_index.indexprs into a node tree.
 * If the rel is not an index or has no expressional columns, we return NIL.
 * Otherwise, the returned tree is copied into the caller's memory context.
 * (We don't want to return a pointer to the relcache copy, since it could
 * disappear due to relcache invalidation.)
 */
List *
RelationGetIndexExpressions(Relation relation)
{
	List	   *result;
	Datum		exprsDatum;
	bool		isnull;
	char	   *exprsString;
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_indexprs)
		return (List *) copyObject(relation->rd_indexprs);

	/* Quick exit if there is nothing to do. */
	if (relation->rd_indextuple == NULL ||
		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs))
		return NIL;

	/*
B
Bruce Momjian 已提交
3855 3856 3857
	 * We build the tree we intend to return in the caller's context. After
	 * successfully completing the work, we copy it into the relcache entry.
	 * This avoids problems if we get some sort of error partway through.
3858 3859 3860 3861 3862
	 */
	exprsDatum = heap_getattr(relation->rd_indextuple,
							  Anum_pg_index_indexprs,
							  GetPgIndexDescriptor(),
							  &isnull);
3863
	Assert(!isnull);
3864
	exprsString = TextDatumGetCString(exprsDatum);
3865 3866 3867 3868
	result = (List *) stringToNode(exprsString);
	pfree(exprsString);

	/*
3869 3870 3871 3872
	 * Run the expressions through eval_const_expressions. This is not just an
	 * optimization, but is necessary, because the planner will be comparing
	 * them to similarly-processed qual clauses, and may fail to detect valid
	 * matches without this.  We don't bother with canonicalize_qual, however.
3873
	 */
3874
	result = (List *) eval_const_expressions(NULL, (Node *) result);
3875

3876 3877 3878 3879 3880 3881
	/*
	 * Also mark any coercion format fields as "don't care", so that the
	 * planner can match to both explicit and implicit coercions.
	 */
	set_coercionform_dontcare((Node *) result);

3882 3883 3884 3885 3886 3887 3888 3889 3890 3891 3892 3893 3894 3895
	/* May as well fix opfuncids too */
	fix_opfuncids((Node *) result);

	/* Now save a copy of the completed tree in the relcache entry. */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_indexprs = (List *) copyObject(result);
	MemoryContextSwitchTo(oldcxt);

	return result;
}

/*
 * RelationGetIndexPredicate -- get the index predicate for an index
 *
3896 3897
 * We cache the result of transforming pg_index.indpred into an implicit-AND
 * node tree (suitable for ExecQual).
3898 3899 3900 3901 3902 3903 3904 3905 3906 3907 3908 3909 3910 3911 3912 3913 3914 3915 3916 3917 3918 3919 3920 3921
 * If the rel is not an index or has no predicate, we return NIL.
 * Otherwise, the returned tree is copied into the caller's memory context.
 * (We don't want to return a pointer to the relcache copy, since it could
 * disappear due to relcache invalidation.)
 */
List *
RelationGetIndexPredicate(Relation relation)
{
	List	   *result;
	Datum		predDatum;
	bool		isnull;
	char	   *predString;
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_indpred)
		return (List *) copyObject(relation->rd_indpred);

	/* Quick exit if there is nothing to do. */
	if (relation->rd_indextuple == NULL ||
		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indpred))
		return NIL;

	/*
B
Bruce Momjian 已提交
3922 3923 3924
	 * We build the tree we intend to return in the caller's context. After
	 * successfully completing the work, we copy it into the relcache entry.
	 * This avoids problems if we get some sort of error partway through.
3925 3926 3927 3928 3929
	 */
	predDatum = heap_getattr(relation->rd_indextuple,
							 Anum_pg_index_indpred,
							 GetPgIndexDescriptor(),
							 &isnull);
3930
	Assert(!isnull);
3931
	predString = TextDatumGetCString(predDatum);
3932 3933 3934 3935
	result = (List *) stringToNode(predString);
	pfree(predString);

	/*
3936 3937 3938 3939 3940
	 * Run the expression through const-simplification and canonicalization.
	 * This is not just an optimization, but is necessary, because the planner
	 * will be comparing it to similarly-processed qual clauses, and may fail
	 * to detect valid matches without this.  This must match the processing
	 * done to qual clauses in preprocess_expression()!  (We can skip the
B
Bruce Momjian 已提交
3941 3942
	 * stuff involving subqueries, however, since we don't allow any in index
	 * predicates.)
3943
	 */
3944
	result = (List *) eval_const_expressions(NULL, (Node *) result);
3945

3946 3947
	result = (List *) canonicalize_qual((Expr *) result);

3948 3949 3950 3951 3952 3953
	/*
	 * Also mark any coercion format fields as "don't care", so that the
	 * planner can match to both explicit and implicit coercions.
	 */
	set_coercionform_dontcare((Node *) result);

3954 3955 3956
	/* Also convert to implicit-AND format */
	result = make_ands_implicit((Expr *) result);

3957 3958 3959 3960 3961 3962 3963 3964 3965 3966 3967
	/* May as well fix opfuncids too */
	fix_opfuncids((Node *) result);

	/* Now save a copy of the completed tree in the relcache entry. */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_indpred = (List *) copyObject(result);
	MemoryContextSwitchTo(oldcxt);

	return result;
}

3968

3969
/*
3970
 *	load_relcache_init_file, write_relcache_init_file
3971
 *
3972 3973 3974
 *		In late 1992, we started regularly having databases with more than
 *		a thousand classes in them.  With this number of classes, it became
 *		critical to do indexed lookups on the system catalogs.
3975
 *
3976 3977 3978 3979
 *		Bootstrapping these lookups is very hard.  We want to be able to
 *		use an index on pg_attribute, for example, but in order to do so,
 *		we must have read pg_attribute for the attributes in the index,
 *		which implies that we need to use the index.
3980
 *
3981
 *		In order to get around the problem, we do the following:
3982
 *
3983
 *		   +  When the database system is initialized (at initdb time), we
3984
 *			  don't use indexes.  We do sequential scans.
3985
 *
3986 3987 3988
 *		   +  When the backend is started up in normal mode, we load an image
 *			  of the appropriate relation descriptors, in internal format,
 *			  from an initialization file in the data/base/... directory.
3989
 *
3990
 *		   +  If the initialization file isn't there, then we create the
3991
 *			  relation descriptors using sequential scans and write 'em to
3992
 *			  the initialization file for use by subsequent backends.
3993
 *
3994
 *		We could dispense with the initialization files and just build the
3995
 *		critical reldescs the hard way on every backend startup, but that
3996 3997 3998 3999 4000 4001
 *		slows down backend startup noticeably.
 *
 *		We can in fact go further, and save more relcache entries than
 *		just the ones that are absolutely critical; this allows us to speed
 *		up backend startup by not having to build such entries the hard way.
 *		Presently, all the catalog and index entries that are referred to
4002
 *		by catcaches are stored in the initialization files.
4003
 *
T
Tom Lane 已提交
4004 4005
 *		The same mechanism that detects when catcache and relcache entries
 *		need to be invalidated (due to catalog updates) also arranges to
4006 4007
 *		unlink the initialization files when the contents may be out of date.
 *		The files will then be rebuilt during the next backend startup.
4008 4009
 */

4010 4011 4012 4013
/*
 * load_relcache_init_file -- attempt to load cache from the init file
 *
 * If successful, return TRUE and set criticalRelcachesBuilt to true.
4014
 * If not successful, return FALSE.
4015 4016 4017 4018
 *
 * NOTE: we assume we are already switched into CacheMemoryContext.
 */
static bool
4019
load_relcache_init_file(bool shared)
4020
{
4021 4022 4023 4024 4025 4026 4027
	FILE	   *fp;
	char		initfilename[MAXPGPATH];
	Relation   *rels;
	int			relno,
				num_rels,
				max_rels,
				nailed_rels,
4028 4029
				nailed_indexes,
				magic;
4030
	int			i;
4031

4032 4033 4034 4035 4036 4037
	if (shared)
		snprintf(initfilename, sizeof(initfilename), "global/%s",
				 RELCACHE_INIT_FILENAME);
	else
		snprintf(initfilename, sizeof(initfilename), "%s/%s",
				 DatabasePath, RELCACHE_INIT_FILENAME);
4038 4039 4040 4041

	fp = AllocateFile(initfilename, PG_BINARY_R);
	if (fp == NULL)
		return false;
4042

4043
	/*
B
Bruce Momjian 已提交
4044 4045 4046
	 * Read the index relcache entries from the file.  Note we will not enter
	 * any of them into the cache if the read fails partway through; this
	 * helps to guard against broken init files.
4047 4048 4049 4050 4051 4052 4053
	 */
	max_rels = 100;
	rels = (Relation *) palloc(max_rels * sizeof(Relation));
	num_rels = 0;
	nailed_rels = nailed_indexes = 0;
	initFileRelationIds = NIL;

4054 4055 4056 4057 4058 4059
	/* check for correct magic number (compatible version) */
	if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
		goto read_failed;
	if (magic != RELCACHE_INIT_FILEMAGIC)
		goto read_failed;

B
Bruce Momjian 已提交
4060
	for (relno = 0;; relno++)
4061
	{
4062 4063 4064 4065
		Size		len;
		size_t		nread;
		Relation	rel;
		Form_pg_class relform;
4066
		bool		has_not_null;
4067

4068
		/* first read the relation descriptor length */
4069 4070 4071 4072
		if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
		{
			if (nread == 0)
				break;			/* end of file */
4073
			goto read_failed;
4074
		}
4075

4076 4077
		/* safety check for incompatible relcache layout */
		if (len != sizeof(RelationData))
4078
			goto read_failed;
4079

4080 4081 4082 4083 4084 4085
		/* allocate another relcache header */
		if (num_rels >= max_rels)
		{
			max_rels *= 2;
			rels = (Relation *) repalloc(rels, max_rels * sizeof(Relation));
		}
4086

4087
		rel = rels[num_rels++] = (Relation) palloc(len);
4088

4089 4090
		/* then, read the Relation structure */
		if ((nread = fread(rel, 1, len, fp)) != len)
4091
			goto read_failed;
4092 4093

		/* next read the relation tuple form */
4094
		if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
4095
			goto read_failed;
4096 4097

		relform = (Form_pg_class) palloc(len);
4098
		if ((nread = fread(relform, 1, len, fp)) != len)
4099
			goto read_failed;
4100

4101
		rel->rd_rel = relform;
4102 4103

		/* initialize attribute tuple forms */
4104 4105
		rel->rd_att = CreateTemplateTupleDesc(relform->relnatts,
											  relform->relhasoids);
4106 4107
		rel->rd_att->tdrefcount = 1;	/* mark as refcounted */

4108
		rel->rd_att->tdtypeid = relform->reltype;
B
Bruce Momjian 已提交
4109
		rel->rd_att->tdtypmod = -1;		/* unnecessary, but... */
4110 4111

		/* next read all the attribute tuple form data entries */
4112
		has_not_null = false;
4113 4114
		for (i = 0; i < relform->relnatts; i++)
		{
4115
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
4116
				goto read_failed;
4117
			if (len != ATTRIBUTE_FIXED_PART_SIZE)
4118
				goto read_failed;
4119
			if ((nread = fread(rel->rd_att->attrs[i], 1, len, fp)) != len)
4120
				goto read_failed;
4121 4122 4123 4124

			has_not_null |= rel->rd_att->attrs[i]->attnotnull;
		}

B
Bruce Momjian 已提交
4125 4126 4127 4128 4129 4130 4131 4132
		/* next read the access method specific field */
		if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
			goto read_failed;
		if (len > 0)
		{
			rel->rd_options = palloc(len);
			if ((nread = fread(rel->rd_options, 1, len, fp)) != len)
				goto read_failed;
4133
			if (len != VARSIZE(rel->rd_options))
B
Bruce Momjian 已提交
4134
				goto read_failed;		/* sanity check */
B
Bruce Momjian 已提交
4135 4136 4137 4138 4139 4140
		}
		else
		{
			rel->rd_options = NULL;
		}

4141 4142 4143 4144 4145 4146 4147
		/* mark not-null status */
		if (has_not_null)
		{
			TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

			constr->has_not_null = true;
			rel->rd_att->constr = constr;
4148 4149
		}

4150 4151 4152 4153 4154
		/* If it's an index, there's more to do */
		if (rel->rd_rel->relkind == RELKIND_INDEX)
		{
			Form_pg_am	am;
			MemoryContext indexcxt;
4155 4156
			Oid		   *opfamily;
			Oid		   *opcintype;
4157 4158
			Oid		   *operator;
			RegProcedure *support;
4159
			int			nsupport;
4160
			int16	   *indoption;
4161 4162 4163 4164 4165

			/* Count nailed indexes to ensure we have 'em all */
			if (rel->rd_isnailed)
				nailed_indexes++;

4166
			/* next, read the pg_index tuple */
4167 4168
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;
4169

4170 4171
			rel->rd_indextuple = (HeapTuple) palloc(len);
			if ((nread = fread(rel->rd_indextuple, 1, len, fp)) != len)
4172
				goto read_failed;
4173

4174 4175 4176 4177
			/* Fix up internal pointers in the tuple -- see heap_copytuple */
			rel->rd_indextuple->t_data = (HeapTupleHeader) ((char *) rel->rd_indextuple + HEAPTUPLESIZE);
			rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);

4178 4179 4180
			/* next, read the access method tuple form */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;
4181

4182 4183 4184 4185
			am = (Form_pg_am) palloc(len);
			if ((nread = fread(am, 1, len, fp)) != len)
				goto read_failed;
			rel->rd_am = am;
4186

4187 4188 4189 4190 4191 4192
			/*
			 * prepare index info context --- parameters should match
			 * RelationInitIndexAccessInfo
			 */
			indexcxt = AllocSetContextCreate(CacheMemoryContext,
											 RelationGetRelationName(rel),
4193 4194 4195
											 ALLOCSET_SMALL_MINSIZE,
											 ALLOCSET_SMALL_INITSIZE,
											 ALLOCSET_SMALL_MAXSIZE);
4196 4197
			rel->rd_indexcxt = indexcxt;

4198 4199 4200 4201 4202 4203 4204 4205 4206 4207 4208 4209 4210 4211 4212 4213 4214 4215 4216 4217
			/* next, read the vector of opfamily OIDs */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			opfamily = (Oid *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(opfamily, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_opfamily = opfamily;

			/* next, read the vector of opcintype OIDs */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			opcintype = (Oid *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(opcintype, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_opcintype = opcintype;

4218 4219 4220 4221 4222 4223 4224 4225 4226
			/* next, read the vector of operator OIDs */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			operator = (Oid *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(operator, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_operator = operator;
4227

4228
			/* next, read the vector of support procedures */
4229 4230 4231 4232 4233 4234 4235 4236
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;
			support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(support, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_support = support;

4237 4238 4239 4240 4241 4242 4243 4244 4245 4246
			/* finally, read the vector of indoption values */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			indoption = (int16 *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(indoption, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_indoption = indoption;

4247 4248 4249
			/* set up zeroed fmgr-info vectors */
			rel->rd_aminfo = (RelationAmInfo *)
				MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));
4250 4251
			nsupport = relform->relnatts * am->amsupport;
			rel->rd_supportinfo = (FmgrInfo *)
4252
				MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
4253 4254 4255 4256 4257 4258 4259 4260
		}
		else
		{
			/* Count nailed rels to ensure we have 'em all */
			if (rel->rd_isnailed)
				nailed_rels++;

			Assert(rel->rd_index == NULL);
4261
			Assert(rel->rd_indextuple == NULL);
4262 4263
			Assert(rel->rd_am == NULL);
			Assert(rel->rd_indexcxt == NULL);
4264
			Assert(rel->rd_aminfo == NULL);
4265 4266
			Assert(rel->rd_opfamily == NULL);
			Assert(rel->rd_opcintype == NULL);
4267 4268 4269
			Assert(rel->rd_operator == NULL);
			Assert(rel->rd_support == NULL);
			Assert(rel->rd_supportinfo == NULL);
4270
			Assert(rel->rd_indoption == NULL);
4271 4272 4273 4274
		}

		/*
		 * Rules and triggers are not saved (mainly because the internal
B
Bruce Momjian 已提交
4275
		 * format is complex and subject to change).  They must be rebuilt if
4276
		 * needed by RelationCacheInitializePhase3.  This is not expected to
4277 4278
		 * be a big performance hit since few system catalogs have such. Ditto
		 * for index expressions and predicates.
4279 4280 4281 4282
		 */
		rel->rd_rules = NULL;
		rel->rd_rulescxt = NULL;
		rel->trigdesc = NULL;
4283 4284
		rel->rd_indexprs = NIL;
		rel->rd_indpred = NIL;
4285 4286 4287 4288

		/*
		 * Reset transient-state fields in the relcache entry
		 */
4289
		rel->rd_smgr = NULL;
4290 4291
		rel->rd_targblock = InvalidBlockNumber;
		if (rel->rd_isnailed)
4292
			rel->rd_refcnt = 1;
4293
		else
4294
			rel->rd_refcnt = 0;
4295
		rel->rd_indexvalid = 0;
4296
		rel->rd_indexlist = NIL;
4297
		rel->rd_oidindex = InvalidOid;
4298
		rel->rd_createSubid = InvalidSubTransactionId;
4299
		rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
4300
		rel->rd_amcache = NULL;
4301
		MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
4302 4303
        rel->rd_cdbpolicy = NULL;
        rel->rd_cdbDefaultStatsWarningIssued = false;
4304

4305
		/*
4306
		 * Recompute lock and physical addressing info.  This is needed in
B
Bruce Momjian 已提交
4307 4308
		 * case the pg_internal.init file was copied from some other database
		 * by CREATE DATABASE.
4309 4310
		 */
		RelationInitLockInfo(rel);
4311
		RelationInitPhysicalAddr(rel);
4312 4313 4314
	}

	/*
B
Bruce Momjian 已提交
4315 4316 4317
	 * We reached the end of the init file without apparent problem. Did we
	 * get the right number of nailed items?  (This is a useful crosscheck in
	 * case the set of critical rels or indexes changes.)
4318
	 */
4319 4320 4321 4322 4323 4324 4325 4326 4327 4328 4329 4330 4331
	if (shared)
	{
		if (nailed_rels != NUM_CRITICAL_SHARED_RELS ||
			nailed_indexes != NUM_CRITICAL_SHARED_INDEXES)
			goto read_failed;
	}
	else
	{
		if (nailed_rels != NUM_CRITICAL_LOCAL_RELS ||
			nailed_indexes != NUM_CRITICAL_LOCAL_INDEXES)
			goto read_failed;
	}

4332 4333 4334 4335 4336 4337 4338 4339 4340 4341

	/*
	 * OK, all appears well.
	 *
	 * Now insert all the new relcache entries into the cache.
	 */
	for (relno = 0; relno < num_rels; relno++)
	{
		RelationCacheInsert(rels[relno]);
		/* also make a list of their OIDs, for RelationIdIsInInitFile */
4342 4343 4344
		if (!shared)
			initFileRelationIds = lcons_oid(RelationGetRelid(rels[relno]),
											initFileRelationIds);
4345
	}
4346

4347 4348 4349
	pfree(rels);
	FreeFile(fp);

4350 4351 4352 4353
	if (shared)
		criticalSharedRelcachesBuilt = true;
	else
		criticalRelcachesBuilt = true;
4354
	return true;
4355

4356
	/*
B
Bruce Momjian 已提交
4357 4358 4359
	 * init file is broken, so do it the hard way.	We don't bother trying to
	 * free the clutter we just allocated; it's not in the relcache so it
	 * won't hurt.
4360
	 */
4361
read_failed:
4362 4363 4364 4365
	pfree(rels);
	FreeFile(fp);

	return false;
4366 4367
}

4368 4369 4370 4371
/*
 * Write out a new initialization file with the current contents
 * of the relcache.
 */
4372
static void
4373
write_relcache_init_file(bool shared)
4374
{
4375
	FILE	   *fp;
4376 4377
	char		tempfilename[MAXPGPATH];
	char		finalfilename[MAXPGPATH];
4378
	int			magic;
4379
	HASH_SEQ_STATUS status;
4380
	RelIdCacheEnt *idhentry;
4381 4382
	MemoryContext oldcxt;
	int			i;
4383 4384

	/*
4385
	 * We must write a temporary file and rename it into place. Otherwise,
B
Bruce Momjian 已提交
4386 4387
	 * another backend starting at about the same time might crash trying to
	 * read the partially-complete file.
4388
	 */
4389 4390 4391 4392 4393 4394 4395 4396 4397 4398 4399 4400 4401 4402
	if (shared)
	{
		snprintf(tempfilename, sizeof(tempfilename), "global/%s.%d",
				 RELCACHE_INIT_FILENAME, MyProcPid);
		snprintf(finalfilename, sizeof(finalfilename), "global/%s",
				 RELCACHE_INIT_FILENAME);
	}
	else
	{
		snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
				 DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
		snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
				 DatabasePath, RELCACHE_INIT_FILENAME);
	}
4403

4404 4405 4406 4407
	unlink(tempfilename);		/* in case it exists w/wrong permissions */

	fp = AllocateFile(tempfilename, PG_BINARY_W);
	if (fp == NULL)
4408 4409 4410 4411 4412
	{
		/*
		 * We used to consider this a fatal error, but we might as well
		 * continue with backend startup ...
		 */
4413 4414
		ereport(WARNING,
				(errcode_for_file_access(),
4415
				 errmsg("could not create relation-cache initialization file \"%s\": %m",
4416
						tempfilename),
B
Bruce Momjian 已提交
4417
			  errdetail("Continuing anyway, but there's something wrong.")));
4418 4419
		return;
	}
4420

4421
	/*
B
Bruce Momjian 已提交
4422
	 * Write a magic number to serve as a file version identifier.	We can
4423 4424 4425 4426 4427 4428
	 * change the magic number whenever the relcache layout changes.
	 */
	magic = RELCACHE_INIT_FILEMAGIC;
	if (fwrite(&magic, 1, sizeof(magic), fp) != sizeof(magic))
		elog(FATAL, "could not write init file");

4429
	/*
4430
	 * Write all the reldescs (in no particular order).
H
Hiroshi Inoue 已提交
4431
	 */
4432
	hash_seq_init(&status, RelationIdCache);
4433

4434
	initFileRelationIds = NIL;
4435

4436
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
4437
	{
4438
		Relation	rel = idhentry->reldesc;
4439
		Form_pg_class relform = rel->rd_rel;
4440

4441 4442 4443 4444
		/* ignore if not correct group */
		if (relform->relisshared != shared)
			continue;

B
Bruce Momjian 已提交
4445 4446
		/* first write the relcache entry proper */
		write_item(rel, sizeof(RelationData), fp);
4447 4448

		/* next write the relation tuple form */
B
Bruce Momjian 已提交
4449
		write_item(relform, CLASS_TUPLE_SIZE, fp);
4450 4451 4452 4453

		/* next, do all the attribute tuple form data entries */
		for (i = 0; i < relform->relnatts; i++)
		{
4454
			write_item(rel->rd_att->attrs[i], ATTRIBUTE_FIXED_PART_SIZE, fp);
4455 4456
		}

B
Bruce Momjian 已提交
4457 4458
		/* next, do the access method specific field */
		write_item(rel->rd_options,
4459
				   (rel->rd_options ? VARSIZE(rel->rd_options) : 0),
4460
				   fp);
B
Bruce Momjian 已提交
4461

4462 4463 4464 4465
		/* If it's an index, there's more to do */
		if (rel->rd_rel->relkind == RELKIND_INDEX)
		{
			Form_pg_am	am = rel->rd_am;
4466

4467 4468
			/* write the pg_index tuple */
			/* we assume this was created by heap_copytuple! */
B
Bruce Momjian 已提交
4469
			write_item(rel->rd_indextuple,
4470 4471
					   HEAPTUPLESIZE + rel->rd_indextuple->t_len,
					   fp);
4472 4473

			/* next, write the access method tuple form */
B
Bruce Momjian 已提交
4474
			write_item(am, sizeof(FormData_pg_am), fp);
4475

4476 4477 4478 4479 4480 4481 4482 4483 4484 4485
			/* next, write the vector of opfamily OIDs */
			write_item(rel->rd_opfamily,
					   relform->relnatts * sizeof(Oid),
					   fp);

			/* next, write the vector of opcintype OIDs */
			write_item(rel->rd_opcintype,
					   relform->relnatts * sizeof(Oid),
					   fp);

4486
			/* next, write the vector of operator OIDs */
4487 4488 4489
			write_item(rel->rd_operator,
					   relform->relnatts * (am->amstrategies * sizeof(Oid)),
					   fp);
4490

4491
			/* next, write the vector of support procedures */
4492
			write_item(rel->rd_support,
B
Bruce Momjian 已提交
4493
				  relform->relnatts * (am->amsupport * sizeof(RegProcedure)),
4494
					   fp);
4495 4496 4497 4498 4499

			/* finally, write the vector of indoption values */
			write_item(rel->rd_indoption,
					   relform->relnatts * sizeof(int16),
					   fp);
4500
		}
4501

4502
		/* also make a list of their OIDs, for RelationIdIsInInitFile */
4503 4504 4505 4506 4507 4508 4509
		if (!shared)
		{
			oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
			initFileRelationIds = lcons_oid(RelationGetRelid(rel),
											initFileRelationIds);
			MemoryContextSwitchTo(oldcxt);
		}
4510
	}
4511

4512 4513
	if (FreeFile(fp))
		elog(FATAL, "could not write init file");
4514

4515
	/*
4516
	 * Now we have to check whether the data we've so painstakingly
B
Bruce Momjian 已提交
4517 4518 4519 4520 4521
	 * accumulated is already obsolete due to someone else's just-committed
	 * catalog changes.  If so, we just delete the temp file and leave it to
	 * the next backend to try again.  (Our own relcache entries will be
	 * updated by SI message processing, but we can't be sure whether what we
	 * wrote out was up-to-date.)
4522
	 *
B
Bruce Momjian 已提交
4523 4524
	 * This mustn't run concurrently with RelationCacheInitFileInvalidate, so
	 * grab a serialization lock for the duration.
4525
	 */
4526 4527 4528 4529 4530 4531
	LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);

	/* Make sure we have seen all incoming SI messages */
	AcceptInvalidationMessages();

	/*
B
Bruce Momjian 已提交
4532 4533
	 * If we have received any SI relcache invals since backend start, assume
	 * we may have written out-of-date data.
4534 4535
	 */
	if (relcacheInvalsReceived == 0L)
4536 4537
	{
		/*
4538 4539
		 * OK, rename the temp file to its final name, deleting any
		 * previously-existing init file.
4540
		 *
4541 4542 4543 4544
		 * Note: a failure here is possible under Cygwin, if some other
		 * backend is holding open an unlinked-but-not-yet-gone init file. So
		 * treat this as a noncritical failure; just remove the useless temp
		 * file on failure.
4545
		 */
4546 4547
		if (rename(tempfilename, finalfilename) < 0)
			unlink(tempfilename);
4548 4549 4550 4551
	}
	else
	{
		/* Delete the already-obsolete temp file */
4552 4553
		unlink(tempfilename);
	}
4554 4555

	LWLockRelease(RelCacheInitLock);
4556 4557
}

4558 4559 4560 4561 4562 4563 4564 4565 4566 4567
/* write a chunk of data preceded by its length */
static void
write_item(const void *data, Size len, FILE *fp)
{
	if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
		elog(FATAL, "could not write init file");
	if (fwrite(data, 1, len, fp) != len)
		elog(FATAL, "could not write init file");
}

4568 4569 4570 4571 4572 4573 4574 4575 4576 4577 4578 4579
/*
 * Detect whether a given relation (identified by OID) is one of the ones
 * we store in the init file.
 *
 * Note that we effectively assume that all backends running in a database
 * would choose to store the same set of relations in the init file;
 * otherwise there are cases where we'd fail to detect the need for an init
 * file invalidation.  This does not seem likely to be a problem in practice.
 */
bool
RelationIdIsInInitFile(Oid relationId)
{
4580
	return list_member_oid(initFileRelationIds, relationId);
4581 4582 4583 4584 4585
}

/*
 * Invalidate (remove) the init file during commit of a transaction that
 * changed one or more of the relation cache entries that are kept in the
4586
 * local init file.
4587 4588 4589 4590 4591 4592
 *
 * We actually need to remove the init file twice: once just before sending
 * the SI messages that include relcache inval for such relations, and once
 * just after sending them.  The unlink before ensures that a backend that's
 * currently starting cannot read the now-obsolete init file and then miss
 * the SI messages that will force it to update its relcache entries.  (This
4593
 * works because the backend startup sequence gets into the PGPROC array before
4594 4595 4596 4597 4598 4599 4600
 * trying to load the init file.)  The unlink after is to synchronize with a
 * backend that may currently be trying to write an init file based on data
 * that we've just rendered invalid.  Such a backend will see the SI messages,
 * but we can't leave the init file sitting around to fool later backends.
 *
 * Ignore any failure to unlink the file, since it might not be there if
 * no backend has been started since the last removal.
4601 4602 4603 4604 4605 4606 4607
 *
 * Notice this deals only with the local init file, not the shared init file.
 * The reason is that there can never be a "significant" change to the
 * relcache entry of a shared relation; the most that could happen is
 * updates of noncritical fields such as relpages/reltuples.  So, while
 * it's worth updating the shared init file from time to time, it can never
 * be invalid enough to make it necessary to remove it.
4608 4609 4610 4611 4612 4613 4614 4615 4616 4617 4618 4619 4620 4621 4622 4623 4624
 */
void
RelationCacheInitFileInvalidate(bool beforeSend)
{
	char		initfilename[MAXPGPATH];

	snprintf(initfilename, sizeof(initfilename), "%s/%s",
			 DatabasePath, RELCACHE_INIT_FILENAME);

	if (beforeSend)
	{
		/* no interlock needed here */
		unlink(initfilename);
	}
	else
	{
		/*
B
Bruce Momjian 已提交
4625 4626
		 * We need to interlock this against write_relcache_init_file, to
		 * guard against possibility that someone renames a new-but-
B
Bruce Momjian 已提交
4627 4628 4629 4630
		 * already-obsolete init file into place just after we unlink. With
		 * the interlock, it's certain that write_relcache_init_file will
		 * notice our SI inval message before renaming into place, or else
		 * that we will execute second and successfully unlink the file.
4631 4632 4633 4634 4635
		 */
		LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
		unlink(initfilename);
		LWLockRelease(RelCacheInitLock);
	}
4636
}
4637 4638

/*
4639 4640 4641 4642 4643 4644 4645
 * Remove the init files during postmaster startup.
 *
 * We used to keep the init files across restarts, but that is unsafe even in simple
 * crash-recovery cases as there are windows for the init files to become out-of-sync
 * with the database. So now we just remove them during startup and expect the
 * first backend launch to rebuild them. Of course, this has to happen in each
 * database of the cluster.
4646 4647
 */
void
4648 4649 4650 4651 4652 4653 4654 4655 4656 4657 4658 4659 4660 4661 4662 4663 4664 4665 4666
RelationCacheInitFileRemove(void)
{
	char		path[MAXPGPATH];

	/*
	 * We zap the shared cache file too.  In theory it can't get out of sync
	 * enough to be a problem, but in data-corruption cases, who knows ...
	 */
	snprintf(path, sizeof(path), "global/%s",
			 RELCACHE_INIT_FILENAME);
	unlink_initfile(path);

	/* Scan everything in the default tablespace */
	RelationCacheInitFileRemoveInDir("base");
}

/* Process one per-tablespace directory for RelationCacheInitFileRemove */
static void
RelationCacheInitFileRemoveInDir(const char *tblspcpath)
4667
{
4668 4669
	DIR		   *dir;
	struct dirent *de;
4670 4671
	char		initfilename[MAXPGPATH];

4672 4673 4674 4675 4676 4677 4678 4679 4680 4681 4682 4683 4684 4685 4686 4687 4688 4689 4690 4691 4692 4693 4694 4695 4696 4697 4698 4699 4700 4701 4702 4703
	/* Scan the tablespace directory to find per-database directories */
	dir = AllocateDir(tblspcpath);
	if (dir == NULL)
	{
		elog(LOG, "could not open tablespace directory \"%s\": %m",
			 tblspcpath);
		return;
	}

	while ((de = ReadDir(dir, tblspcpath)) != NULL)
	{
		if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
		{
			/* Try to remove the init file in each database */
			snprintf(initfilename, sizeof(initfilename), "%s/%s/%s",
					 tblspcpath, de->d_name, RELCACHE_INIT_FILENAME);
			unlink_initfile(initfilename);
		}
	}

	FreeDir(dir);
}

static void
unlink_initfile(const char *initfilename)
{
	if (unlink(initfilename) < 0)
	{
		/* It might not be there, but log any error other than ENOENT */
		if (errno != ENOENT)
			elog(LOG, "could not remove cache file \"%s\": %m", initfilename);
	}
4704
}