relcache.c 150.9 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * relcache.c
4
 *	  POSTGRES relation descriptor cache code
5
 *
6 7
 * Portions Copyright (c) 2005-2009, Greenplum inc.
 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
B
Add:  
Bruce Momjian 已提交
8
 * Portions Copyright (c) 1994, Regents of the University of California
9 10 11
 *
 *
 * IDENTIFICATION
12
 *	  $PostgreSQL: pgsql/src/backend/utils/cache/relcache.c,v 1.266.2.10 2010/09/02 03:17:06 tgl Exp $
13 14 15 16 17
 *
 *-------------------------------------------------------------------------
 */
/*
 * INTERFACE ROUTINES
18
 *		RelationCacheInitialize			- initialize relcache (to empty)
19 20
 *		RelationCacheInitializePhase2	- initialize shared-catalog entries
 *		RelationCacheInitializePhase3	- finish initializing relcache
21 22
 *		RelationIdGetRelation			- get a reldesc by relation id
 *		RelationClose					- close an open relation
23 24
 *
 * NOTES
25 26
 *		The following code contains many undocumented hacks.  Please be
 *		careful....
27
 */
28 29
#include "postgres.h"

30
#include <sys/file.h>
31
#include <fcntl.h>
32
#include <unistd.h>
33

34 35
#include "access/genam.h"
#include "access/heapam.h"
36
#include "access/reloptions.h"
37
#include "access/sysattr.h"
38
#include "access/xact.h"
39
#include "catalog/catalog.h"
40
#include "catalog/index.h"
B
Bruce Momjian 已提交
41
#include "catalog/indexing.h"
42
#include "catalog/namespace.h"
43 44
#include "catalog/pg_amop.h"
#include "catalog/pg_amproc.h"
B
Bruce Momjian 已提交
45
#include "catalog/pg_attrdef.h"
46
#include "catalog/pg_authid.h"
47
#include "catalog/pg_auth_members.h"
48
#include "catalog/pg_constraint.h"
49
#include "catalog/pg_database.h"
50
#include "catalog/pg_namespace.h"
51
#include "catalog/pg_opclass.h"
52
#include "catalog/pg_operator.h"
B
Bruce Momjian 已提交
53
#include "catalog/pg_proc.h"
54
#include "catalog/pg_rewrite.h"
55 56
#include "catalog/pg_tablespace.h"
#include "catalog/pg_trigger.h"
57
#include "catalog/pg_type.h"
58
#include "commands/trigger.h"
B
Bruce Momjian 已提交
59
#include "miscadmin.h"
60 61
#include "optimizer/clauses.h"
#include "optimizer/planmain.h"
62
#include "optimizer/prep.h"
63
#include "optimizer/var.h"
64
#include "rewrite/rewriteDefine.h"
65
#include "storage/fd.h"
B
Bruce Momjian 已提交
66
#include "storage/smgr.h"
67
#include "utils/builtins.h"
68
#include "utils/fmgroids.h"
69
#include "utils/inval.h"
70
#include "utils/memutils.h"
B
Bruce Momjian 已提交
71
#include "utils/relcache.h"
72
#include "utils/relationnode.h"
73
#include "utils/resowner.h"
74
#include "utils/syscache.h"
B
Bruce Momjian 已提交
75

76
#include "catalog/gp_policy.h"         /* GpPolicy */
77 78 79 80 81 82
#include "cdb/cdbtm.h"
#include "cdb/cdbvars.h"        /* Gp_role */
#include "cdb/cdbmirroredflatfile.h"
#include "cdb/cdbpersistentfilesysobj.h"
#include "cdb/cdbsreh.h"

83

84 85 86 87 88
/*
 * name of relcache init file, used to speed up backend startup
 */
#define RELCACHE_INIT_FILENAME	"pg_internal.init"

89
#define RELCACHE_INIT_FILEMAGIC		0x773264	/* version ID value */
90

91
/*
92
 *		hardcoded tuple descriptors.  see include/catalog/pg_attribute.h
93
 */
94 95 96 97 98 99 100 101 102
static const FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
static const FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
static const FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
static const FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
static const FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};

static const FormData_pg_attribute Desc_pg_database[Natts_pg_database] = {Schema_pg_database};
static const FormData_pg_attribute Desc_pg_authid[Natts_pg_authid] = {Schema_pg_authid};
static const FormData_pg_attribute Desc_pg_auth_members[Natts_pg_auth_members] = {Schema_pg_auth_members};
103

104
/*
105
 *		Hash tables that index the relation cache
106
 *
107 108
 *		We used to index the cache by both name and OID, but now there
 *		is only an index by OID.
109
 */
110 111 112 113 114 115
typedef struct relidcacheent
{
	Oid			reloid;
	Relation	reldesc;
} RelIdCacheEnt;

116
static HTAB *RelationIdCache;
117

118 119 120 121
/*
 * This flag is false until we have prepared the critical relcache entries
 * that are needed to do indexscans on the tables read by relcache building.
 */
B
Bruce Momjian 已提交
122
bool		criticalRelcachesBuilt = false;
123

124 125 126 127 128 129
/*
 * This flag is false until we have prepared the critical relcache entries
 * for shared catalogs (which are the tables needed for login).
 */
bool		criticalSharedRelcachesBuilt = false;

130 131
/*
 * This counter counts relcache inval events received since backend startup
B
Bruce Momjian 已提交
132
 * (but only for rels that are actually in cache).	Presently, we use it only
133 134 135 136
 * to detect whether data about to be written by write_relcache_init_file()
 * might already be obsolete.
 */
static long relcacheInvalsReceived = 0L;
137

138
/*
139 140 141 142
 * This list remembers the OIDs of the non-shared relations cached in the
 * database's local relcache init file.  Note that there is no corresponding
 * list for the shared relcache init file, for reasons explained in the
 * comments for RelationCacheInitFileRemove.
143 144
 */
static List *initFileRelationIds = NIL;
145

146
/*
147
 * This flag lets us optimize away work in AtEO(Sub)Xact_RelationCache().
148
 */
149
static bool need_eoxact_work = false;
150

151

152
/*
153
 *		macros to manipulate the lookup hashtables
154 155
 */
#define RelationCacheInsert(RELATION)	\
156
do { \
157
	RelIdCacheEnt *idhentry; bool found; \
158
	idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
159
										   (void *) &(RELATION->rd_id), \
160
										   HASH_ENTER, &found); \
161
	/* used to give notice if found -- now just keep quiet */ \
162 163 164
	idhentry->reldesc = RELATION; \
} while(0)

165
#define RelationIdCacheLookup(ID, RELATION) \
166
do { \
167 168
	RelIdCacheEnt *hentry; \
	hentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
169 170
										 (void *) &(ID), \
										 HASH_FIND, NULL); \
171
	if (hentry) \
172 173 174 175 176 177 178
		RELATION = hentry->reldesc; \
	else \
		RELATION = NULL; \
} while(0)

#define RelationCacheDelete(RELATION) \
do { \
179
	RelIdCacheEnt *idhentry; \
180
	idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
181
										   (void *) &(RELATION->rd_id), \
182
										   HASH_REMOVE, NULL); \
183
	if (idhentry == NULL) \
184
		elog(WARNING, "trying to delete a rd_id reldesc that does not exist"); \
185
} while(0)
186

187 188 189

/*
 * Special cache for opclass-related information
190
 *
191 192
 * Note: only default operators and support procs get cached, ie, those with
 * lefttype = righttype = opcintype.
193 194 195 196 197 198 199
 */
typedef struct opclasscacheent
{
	Oid			opclassoid;		/* lookup key: OID of opclass */
	bool		valid;			/* set TRUE after successful fill-in */
	StrategyNumber numStrats;	/* max # of strategies (from pg_am) */
	StrategyNumber numSupport;	/* max # of support procs (from pg_am) */
200 201
	Oid			opcfamily;		/* OID of opclass's family */
	Oid			opcintype;		/* OID of opclass's declared input type */
202
	Oid		   *operatorOids;	/* strategy operators' OIDs */
B
Bruce Momjian 已提交
203
	RegProcedure *supportProcs; /* support procs */
204 205 206 207 208
} OpClassCacheEnt;

static HTAB *OpClassCache = NULL;


209
/* non-export function prototypes */
210

211
static void RelationDestroyRelation(Relation relation);
212
static void RelationClearRelation(Relation relation, bool rebuild);
B
Bruce Momjian 已提交
213

214
static void RelationReloadIndexInfo(Relation relation);
215
static void RelationFlushRelation(Relation relation);
216 217
static bool load_relcache_init_file(bool shared);
static void write_relcache_init_file(bool shared);
B
Bruce Momjian 已提交
218
static void write_item(const void *data, Size len, FILE *fp);
219

220
static void formrdesc(const char *relationName, Oid relationReltype,
221 222
		  bool isshared, bool hasoids,
		  int natts, const FormData_pg_attribute *att);
223

224 225
static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK, Relation *pg_class_relation);
static Relation AllocateRelationDesc(Form_pg_class relp);
226
static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
227
static void RelationBuildTupleDesc(Relation relation);
228
static Relation RelationBuildDesc(Oid targetRelId, bool insertIt);
229
static void RelationInitPhysicalAddr(Relation relation);
230
static void RelationInitAppendOnlyInfo(Relation relation);
231
static void load_critical_index(Oid indexoid, Oid heapoid);
232
static TupleDesc GetPgClassDescriptor(void);
233
static TupleDesc GetPgIndexDescriptor(void);
234
static void AttrDefaultFetch(Relation relation);
235
static void CheckConstraintFetch(Relation relation);
236
static List *insert_ordered_oid(List *list, Oid datum);
237
static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
B
Bruce Momjian 已提交
238 239
				  StrategyNumber numStrats,
				  StrategyNumber numSupport);
240 241
static void RelationCacheInitFileRemoveInDir(const char *tblspcpath);
static void unlink_initfile(const char *initfilename);
242

243

244
/*
245
 *		ScanPgRelation
246
 *
247 248 249 250 251
 *		This is used by RelationBuildDesc to find a pg_class
 *		tuple matching targetRelId.  The caller must hold at least
 *		AccessShareLock on the target relid to prevent concurrent-update
 *		scenarios --- else our SnapshotNow scan might fail to find any
 *		version that it thinks is live.
252 253 254
 *
 *		NB: the returned tuple has been copied into palloc'd storage
 *		and must eventually be freed with heap_freetuple.
255
 */
256
static HeapTuple
257
ScanPgRelation(Oid targetRelId, bool indexOK, Relation *pg_class_relation)
258
{
259 260
	HeapTuple	pg_class_tuple;
	Relation	pg_class_desc;
261 262
	SysScanDesc pg_class_scan;
	ScanKeyData key[1];
263 264 265 266 267 268 269 270 271 272 273 274 275

	/*
	 * If something goes wrong during backend startup, we might find ourselves
	 * trying to read pg_class before we've selected a database.  That ain't
	 * gonna work, so bail out with a useful error message.  If this happens,
	 * it probably means a relcache entry that needs to be nailed isn't.
	 */
	if (!OidIsValid(MyDatabaseId))
		elog(FATAL, "cannot read pg_class without having selected a database");

	/*
	 * form a scan key
	 */
276 277 278 279
	ScanKeyInit(&key[0],
				ObjectIdAttributeNumber,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(targetRelId));
280 281 282 283 284 285 286 287

	/*
	 * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).  The caller can also force a heap
	 * scan by setting indexOK == false.
	 */
	pg_class_desc = heap_open(RelationRelationId, AccessShareLock);
288 289 290 291
	pg_class_scan = systable_beginscan(pg_class_desc, ClassOidIndexId,
									   indexOK && criticalRelcachesBuilt,
									   SnapshotNow,
									   1, key);
292

293
	pg_class_tuple = systable_getnext(pg_class_scan);
294 295

	/*
296
	 * Must copy tuple before releasing buffer.
297
	 */
298 299
	if (HeapTupleIsValid(pg_class_tuple))
		pg_class_tuple = heap_copytuple(pg_class_tuple);
300 301

	/* all done */
302
	systable_endscan(pg_class_scan);
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
	if (pg_class_relation == NULL)
		heap_close(pg_class_desc, AccessShareLock);
	else
		*pg_class_relation = pg_class_desc;

	return pg_class_tuple;
}

void
GpRelationNodeBeginScan(
	Snapshot	snapshot,
	Relation 	gp_relation_node,
	Oid		relationId,
	Oid 		relfilenode,
	GpRelationNodeScan 	*gpRelationNodeScan)
{
	Assert (relfilenode != 0);

	MemSet(gpRelationNodeScan, 0, sizeof(GpRelationNodeScan));

	/*
	 * form a scan key
	 */
	/* XXX XXX: break this out -- find callers - jic 2011/12/09 */
	/* maybe it's ok - return a cql context ? */

	/* XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX */
	/* no json defs for persistent tables ? */
/*
	cqxx("SELECT * FROM gp_relation_node_relfilenode "
		 " WHERE oid = :1 ",
		 ObjectIdGetDatum(relfilenode));
*/
	/* XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX */

	ScanKeyInit(&gpRelationNodeScan->scankey[0],
				Anum_gp_relation_node_relfilenode_oid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(relfilenode));

	/*
	 * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).  The caller can also force a heap
	 * scan by setting indexOK == false.
	 */
	gpRelationNodeScan->scan = \
		systable_beginscan(gp_relation_node, GpRelationNodeOidIndexId,
						   /* indexOK */ true,
						   snapshot,
						   /* nKeys */ 1, 
						   gpRelationNodeScan->scankey);

	gpRelationNodeScan->gp_relation_node = gp_relation_node;
	gpRelationNodeScan->relationId = relationId;
	gpRelationNodeScan->relfilenode = relfilenode;
}

HeapTuple
GpRelationNodeGetNext(
	GpRelationNodeScan 	*gpRelationNodeScan,
	int32				*segmentFileNum,
	ItemPointer			persistentTid,
	int64				*persistentSerialNum)
{
	HeapTuple tuple;
	
	bool			nulls[Natts_gp_relation_node];
	Datum			values[Natts_gp_relation_node];
	
	Oid actualRelationNode;
	
	int64 createMirrorDataLossTrackingSessionNum;

	tuple = systable_getnext((SysScanDesc)gpRelationNodeScan->scan);

	/*
	 * if no such tuple exists, return NULL
	 */
	if (!HeapTupleIsValid(tuple))
	{
		MemSet(persistentTid, 0, sizeof(ItemPointerData));
		*persistentSerialNum = 0;
		return tuple;
	}
	
	heap_deform_tuple(tuple, RelationGetDescr(gpRelationNodeScan->gp_relation_node), values, nulls);
		
	GpRelationNode_GetValues(
						values,
						&actualRelationNode,
						segmentFileNum,
						&createMirrorDataLossTrackingSessionNum,
						persistentTid,
						persistentSerialNum);
	if (actualRelationNode != gpRelationNodeScan->relfilenode)
399 400
		elog(FATAL, "Index on gp_relation_node broken."
			   "Mismatch in node tuple for gp_relation_node for relation %u, relfilenode %u, relation node %u",
401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416
			 gpRelationNodeScan->relationId, 
			 gpRelationNodeScan->relfilenode,
			 actualRelationNode);

	return tuple;
}


void
GpRelationNodeEndScan(
	GpRelationNodeScan 	*gpRelationNodeScan)
{
	/* all done */
	systable_endscan((SysScanDesc)gpRelationNodeScan->scan);
}

417
static HeapTuple
418 419 420 421 422 423 424 425 426 427
ScanGpRelationNodeTuple(
	Relation 	gp_relation_node,
	Oid 		relfilenode,
	int32		segmentFileNum)
{
	HeapTuple	tuple;
	SysScanDesc scan;
	ScanKeyData key[2];

	Assert (relfilenode != 0);
428

429
	/*
B
Bruce Momjian 已提交
430
	 * form a scan key
431
	 */
432 433 434 435 436 437 438 439 440 441 442

	/* XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX */
/*
	cqxx("SELECT * FROM gp_relation_node "
		 " WHERE relfilenode_oid = :1 "
		 " AND segment_file_num = :2 ",
		 ObjectIdGetDatum(relfilenode),
		 Int32GetDatum(segmentFileNum));
*/
	/* XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX */

443
	ScanKeyInit(&key[0],
444
				Anum_gp_relation_node_relfilenode_oid,
445
				BTEqualStrategyNumber, F_OIDEQ,
446 447 448 449 450
				ObjectIdGetDatum(relfilenode));
	ScanKeyInit(&key[1],
				Anum_gp_relation_node_segment_file_num,
				BTEqualStrategyNumber, F_INT4EQ,
				Int32GetDatum(segmentFileNum));
451

452
	/*
B
Bruce Momjian 已提交
453
	 * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
B
Bruce Momjian 已提交
454 455 456
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).  The caller can also force a heap
	 * scan by setting indexOK == false.
457
	 */
458 459
	scan = systable_beginscan(gp_relation_node, GpRelationNodeOidIndexId,
									   /* indexOK */ true,
460
									   SnapshotNow,
461
									   2, key);
462

463
	tuple = systable_getnext(scan);
B
Bruce Momjian 已提交
464

H
Hiroshi Inoue 已提交
465
	/*
466
	 * Must copy tuple before releasing buffer.
H
Hiroshi Inoue 已提交
467
	 */
468 469
	if (HeapTupleIsValid(tuple))
		tuple = heap_copytuple(tuple);
470

471
	/* all done */
472
	systable_endscan(scan);
473

474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521
	return tuple;
}

HeapTuple
FetchGpRelationNodeTuple(
	Relation 		gp_relation_node,
	Oid 			relfilenode,
	int32			segmentFileNum,
	ItemPointer		persistentTid,
	int64			*persistentSerialNum)
{
	HeapTuple tuple;
	
	bool			nulls[Natts_gp_relation_node];
	Datum			values[Natts_gp_relation_node];
	
	Oid actualRelationNode;
	int32 actualSegmentFileNum;

	int64 createMirrorDataLossTrackingSessionNum;

	Assert (relfilenode != 0);
	
	tuple = ScanGpRelationNodeTuple(
					gp_relation_node,
					relfilenode,
					segmentFileNum);
	
	/*
	 * if no such tuple exists, return NULL
	 */
	if (!HeapTupleIsValid(tuple))
	{
		MemSet(persistentTid, 0, sizeof(ItemPointerData));
		*persistentSerialNum = 0;
		return tuple;
	}
	
	heap_deform_tuple(tuple, RelationGetDescr(gp_relation_node), values, nulls);
		
	GpRelationNode_GetValues(
						values,
						&actualRelationNode,
						&actualSegmentFileNum,
						&createMirrorDataLossTrackingSessionNum,
						persistentTid,
						persistentSerialNum);
	
522 523 524 525 526 527 528 529
	if (actualRelationNode != relfilenode)
	{
		elog(ERROR, "Index on gp_relation_node broken."
			   "Mismatch in node tuple for gp_relation_node intended relfilenode %u, fetched relfilenode %u",
			 relfilenode,
			 actualRelationNode);
	}

530 531 532 533 534 535 536 537 538 539 540 541 542 543
	return tuple;
}

/*
 * Deletes the gp relation node entry for the
 * given segment file.
 */ 
void
DeleteGpRelationNodeTuple(
	Relation 	relation,
	int32		segmentFileNum)
{
	Relation	gp_relation_node;
	HeapTuple	tuple;
544 545
	ItemPointerData     persistentTid;
	int64               persistentSerialNum;
546 547 548

	gp_relation_node = heap_open(GpRelationNodeRelationId, RowExclusiveLock);

549 550 551 552 553 554
	tuple = FetchGpRelationNodeTuple(gp_relation_node,
				relation->rd_rel->relfilenode,
				segmentFileNum,
				&persistentTid,
				&persistentSerialNum);

555 556 557 558 559 560
	if (!HeapTupleIsValid(tuple))
		elog(ERROR, "could not find node tuple for relation %u, relation file node %u, segment file #%d",
			 RelationGetRelid(relation),
			 relation->rd_rel->relfilenode,
			 segmentFileNum);

561
	/* delete the relation tuple from gp_relation_node, and finish up */
562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661
	simple_heap_delete(gp_relation_node, &tuple->t_self);
	heap_freetuple(tuple);

	heap_close(gp_relation_node, RowExclusiveLock);
}

bool
ReadGpRelationNode(
	Oid 			relfilenode,
	
	int32			segmentFileNum,

	ItemPointer		persistentTid,

	int64			*persistentSerialNum)
{
	Relation gp_relation_node;
	HeapTuple tuple;
	bool found;

	MemSet(persistentTid, 0, sizeof(ItemPointerData));
	*persistentSerialNum = 0;

	gp_relation_node = heap_open(GpRelationNodeRelationId, AccessShareLock);

	tuple = FetchGpRelationNodeTuple(
						gp_relation_node,
						relfilenode,
						segmentFileNum,
						persistentTid,
						persistentSerialNum);

	/*
	 * if no such tuple exists, return NULL
	 */
	if (!HeapTupleIsValid(tuple))
	{
		found = false;
	}
	else
	{
		if (Debug_persistent_print)
		{
			TupleVisibilitySummary tupleVisibilitySummary;
			char *tupleVisibilitySummaryString;
			
			GetTupleVisibilitySummary(
									tuple,
									&tupleVisibilitySummary);
			tupleVisibilitySummaryString = GetTupleVisibilitySummaryString(&tupleVisibilitySummary);
			
			elog(Persistent_DebugPrintLevel(), 
				 "ReadGpRelationNode: For relfilenode %u, segment file #%d found persistent serial number " INT64_FORMAT ", TID %s (gp_relation_node tuple visibility: %s)",
				 relfilenode,
				 segmentFileNum,
				 *persistentSerialNum,
				 ItemPointerToString(persistentTid),
				 tupleVisibilitySummaryString);
			pfree(tupleVisibilitySummaryString);
		}

		found = true;
		heap_freetuple(tuple);
	}

	heap_close(gp_relation_node, AccessShareLock);

	return found;
}

void
RelationFetchSegFile0GpRelationNode(
	Relation relation)
{
	if (!relation->rd_segfile0_relationnodeinfo.isPresent)
	{
		if (Persistent_BeforePersistenceWork() || InRecovery)
		{
			MemSet(&relation->rd_segfile0_relationnodeinfo.persistentTid, 0, sizeof(ItemPointerData));
			relation->rd_segfile0_relationnodeinfo.persistentSerialNum = 0;
		
			relation->rd_segfile0_relationnodeinfo.isPresent = true;
			relation->rd_segfile0_relationnodeinfo.tidAllowedToBeZero = true;
			
			return; // The initdb process will load the persistent table once we out of bootstrap mode.
		}

		if (!ReadGpRelationNode(
					relation->rd_node.relNode,
					/* segmentFileNum */ 0,
					&relation->rd_segfile0_relationnodeinfo.persistentTid,
					&relation->rd_segfile0_relationnodeinfo.persistentSerialNum))
		{
			elog(ERROR, "Did not find gp_relation_node entry for relation name %s, relation id %u, relfilenode %u",
				 relation->rd_rel->relname.data,
				 relation->rd_id,
				 relation->rd_node.relNode);
		}

		Assert(!Persistent_BeforePersistenceWork());
662
		if (PersistentStore_IsZeroTid(&relation->rd_segfile0_relationnodeinfo.persistentTid))
663 664 665 666 667 668 669 670 671 672 673 674 675
		{	
			elog(ERROR, 
				 "RelationFetchSegFile0GpRelationNode has invalid TID (0,0) into relation %u/%u/%u '%s', serial number " INT64_FORMAT,
				 relation->rd_node.spcNode,
				 relation->rd_node.dbNode,
				 relation->rd_node.relNode,
				 NameStr(relation->rd_rel->relname),
				 relation->rd_segfile0_relationnodeinfo.persistentSerialNum);
		}

		relation->rd_segfile0_relationnodeinfo.isPresent = true;
		
	}
676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719
	else if (gp_validate_pt_info_relcache &&
		     !(relation->rd_index &&
			   relation->rd_index->indrelid == GpRelationNodeRelationId))
	{
		/*
		 * bypass the check for gp_relation_node_index because
		 * ReadGpRelationNode() uses the same index to probe relfile node.
		 */

		ItemPointerData persistentTid;
		int64			persistentSerialNum;

		if (!ReadGpRelationNode(
					relation->rd_node.relNode,
					/* segmentFileNum */ 0,
					&persistentTid,
					&persistentSerialNum))
		{
			elog(ERROR,
				 "did not find gp_relation_node entry for relation name %s, "
				 "relation id %u, relfilenode %u", relation->rd_rel->relname.data,
				 relation->rd_id, relation->rd_node.relNode);
		}

		if (ItemPointerCompare(&persistentTid,
							   &relation->rd_segfile0_relationnodeinfo.persistentTid) ||
			(persistentSerialNum != relation->rd_segfile0_relationnodeinfo.persistentSerialNum))
		{
			ereport(ERROR,
					(errmsg("invalid persistent TID and/or serial number in "
							"relcache entry"),
					 errdetail("relation name %s, relation id %u, relfilenode %u "
							   "contains invalid persistent TID %s and/or serial "
							   "number " INT64_FORMAT ".  Expected TID is %s and "
							   "serial number " INT64_FORMAT,
							   relation->rd_rel->relname.data, relation->rd_id,
							   relation->rd_node.relNode,
							   ItemPointerToString(
								   &relation->rd_segfile0_relationnodeinfo.persistentTid),
							   relation->rd_segfile0_relationnodeinfo.persistentSerialNum,
							   ItemPointerToString2(&persistentTid),
							   persistentSerialNum)));
		}
	}
720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771

}

// UNDONE: Temporary
void
RelationFetchGpRelationNodeForXLog_Index(
	Relation relation)
{
	static int countInThisBackend = 0;
	static int deep = 0;
	
	deep++;

	countInThisBackend++;

	if (deep >= 2)
	{
		int saveDeep;

		if (Debug_gp_relation_node_fetch_wait_for_debugging)
		{
			/* Code for investigating MPP-16395, will be removed as part of the fix */
			elog(WARNING, "RelationFetchGpRelationNodeForXLog_Index [%d] for non-heap %u/%u/%u (deep %d) -- waiting for debug attach...",
				 countInThisBackend,
				 relation->rd_node.spcNode,
				 relation->rd_node.dbNode,
				 relation->rd_node.relNode,
				 deep);

			for (int i=0; i < 24 * 60; i++)
			{
				pg_usleep(60000000L); /* 60 sec */
			}
		}

		/*
		 * Reset counter in case the user continues to use the session.
		 */
		saveDeep = deep;
		deep = 0;

		elog(ERROR, "RelationFetchGpRelationNodeForXLog_Index [%d] for non-heap %u/%u/%u (deep %d)",
			 countInThisBackend,
			 relation->rd_node.spcNode,
			 relation->rd_node.dbNode,
			 relation->rd_node.relNode,
			 saveDeep);
	}

	RelationFetchSegFile0GpRelationNode(relation);

	deep--;
772 773
}

774
/*
775
 *		AllocateRelationDesc
776
 *
777
 *		This is used to allocate memory for a new relation descriptor
778
 *		and initialize the rd_rel field from the given pg_class tuple.
779
 */
780
static Relation
781
AllocateRelationDesc(Form_pg_class relp)
782
{
783
	Relation	relation;
784
	MemoryContext oldcxt;
785
	Form_pg_class relationForm;
786

787 788
	/* Relcache entries must live in CacheMemoryContext */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
789

790
	/*
791
	 * allocate and zero space for new relation descriptor
792
	 */
793
	relation = (Relation) palloc0(sizeof(RelationData));
794

795
	/*
796
	 * clear fields of reldesc that should initialize to something non-zero
797
	 */
798
	relation->rd_targblock = InvalidBlockNumber;
799

800
	/* make sure relation is marked as having no open file yet */
801
	relation->rd_smgr = NULL;
802

803
	/*
B
Bruce Momjian 已提交
804
	 * Copy the relation tuple form
805
	 *
B
Bruce Momjian 已提交
806 807
	 * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE. The
	 * variable-length fields (relacl, reloptions) are NOT stored in the
808 809
	 * relcache --- there'd be little point in it, since we don't copy the
	 * tuple's nulls bitmap and hence wouldn't know if the values are valid.
B
Bruce Momjian 已提交
810 811 812 813
	 * Bottom line is that relacl *cannot* be retrieved from the relcache. Get
	 * it from the syscache if you need it.  The same goes for the original
	 * form of reloptions (however, we do store the parsed form of reloptions
	 * in rd_options).
814 815
	 */
	relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
816

817
	memcpy(relationForm, relp, CLASS_TUPLE_SIZE);
818 819

	/* initialize relation tuple form */
820
	relation->rd_rel = relationForm;
821

822 823 824 825 826 827 828
	/*
	 * This part MUST be remain as a fetch on demand, otherwise you end up
	 * needing it to open pg_class and then relation_open does infinite recursion...
	 */
	relation->rd_segfile0_relationnodeinfo.isPresent = false;
	relation->rd_segfile0_relationnodeinfo.tidAllowedToBeZero = false;

829
	/* and allocate attribute tuple form storage */
830 831
	relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts,
											   relationForm->relhasoids);
832 833
	/* which we mark as a reference-counted tupdesc */
	relation->rd_att->tdrefcount = 1;
834 835 836

	MemoryContextSwitchTo(oldcxt);

837
	return relation;
838 839
}

B
Bruce Momjian 已提交
840
/*
841 842 843 844 845 846
 * RelationParseRelOptions
 *		Convert pg_class.reloptions into pre-parsed rd_options
 *
 * tuple is the real pg_class tuple (not rd_rel!) for relation
 *
 * Note: rd_rel and (if an index) rd_am must be valid already
B
Bruce Momjian 已提交
847 848
 */
static void
849
RelationParseRelOptions(Relation relation, HeapTuple tuple)
B
Bruce Momjian 已提交
850
{
851 852 853
	Datum		datum;
	bool		isnull;
	bytea	   *options;
B
Bruce Momjian 已提交
854

855
	relation->rd_options = NULL;
B
Bruce Momjian 已提交
856

857
	/* Fall out if relkind should not have options */
B
Bruce Momjian 已提交
858 859
	switch (relation->rd_rel->relkind)
	{
860 861
		case RELKIND_RELATION:
		case RELKIND_TOASTVALUE:
862 863 864
		case RELKIND_AOSEGMENTS:
		case RELKIND_AOBLOCKDIR:
		case RELKIND_AOVISIMAP:
865 866 867 868
		case RELKIND_INDEX:
			break;
		default:
			return;
B
Bruce Momjian 已提交
869 870
	}

871
	/*
B
Bruce Momjian 已提交
872 873 874
	 * Fetch reloptions from tuple; have to use a hardwired descriptor because
	 * we might not have any other for pg_class yet (consider executing this
	 * code for pg_class itself)
875 876 877 878 879 880 881
	 */
	datum = fastgetattr(tuple,
						Anum_pg_class_reloptions,
						GetPgClassDescriptor(),
						&isnull);
	if (isnull)
		return;
B
Bruce Momjian 已提交
882

883
	/* Parse into appropriate format; don't error out here */
B
Bruce Momjian 已提交
884 885
	switch (relation->rd_rel->relkind)
	{
886 887
		case RELKIND_RELATION:
		case RELKIND_TOASTVALUE:
888 889 890
		case RELKIND_AOSEGMENTS:
		case RELKIND_AOBLOCKDIR:
		case RELKIND_AOVISIMAP:
891 892 893 894 895 896 897 898 899 900 901 902 903 904
		case RELKIND_UNCATALOGED:
			options = heap_reloptions(relation->rd_rel->relkind, datum,
									  false);
			break;
		case RELKIND_INDEX:
			options = index_reloptions(relation->rd_am->amoptions, datum,
									   false);
			break;
		default:
			Assert(false);		/* can't get here */
			options = NULL;		/* keep compiler quiet */
			break;
	}

905 906 907 908 909 910
	/*
	 * Copy parsed data into CacheMemoryContext.  To guard against the
	 * possibility of leaks in the reloptions code, we want to do the actual
	 * parsing in the caller's memory context and copy the results into
	 * CacheMemoryContext after the fact.
	 */
911 912 913 914 915
	if (options)
	{
		relation->rd_options = MemoryContextAlloc(CacheMemoryContext,
												  VARSIZE(options));
		memcpy(relation->rd_options, options, VARSIZE(options));
916
		pfree(options);
B
Bruce Momjian 已提交
917 918 919
	}
}

920
/*
921
 *		RelationBuildTupleDesc
922
 *
923
 *		Form the relation's tuple descriptor from information in
924
 *		the pg_attribute, pg_attrdef & pg_constraint system catalogs.
925 926
 */
static void
927
RelationBuildTupleDesc(Relation relation)
928
{
929 930
	HeapTuple	pg_attribute_tuple;
	Relation	pg_attribute_desc;
931 932
	SysScanDesc pg_attribute_scan;
	ScanKeyData skey[2];
933
	int			need;
934
	TupleConstr *constr;
H
Hiroshi Inoue 已提交
935
	AttrDefault *attrdef = NULL;
936
	int			ndef = 0;
937

938 939 940 941
	/* copy some fields from pg_class row to rd_att */
	relation->rd_att->tdtypeid = relation->rd_rel->reltype;
	relation->rd_att->tdtypmod = -1;	/* unnecessary, but... */
	relation->rd_att->tdhasoid = relation->rd_rel->relhasoids;
942

943 944
	constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext,
												sizeof(TupleConstr));
H
Hiroshi Inoue 已提交
945
	constr->has_not_null = false;
946

947
	/*
948
	 * Form a scan key that selects only user attributes (attnum > 0).
B
Bruce Momjian 已提交
949 950
	 * (Eliminating system attribute rows at the index level is lots faster
	 * than fetching them.)
951
	 */
952 953 954 955 956 957 958 959
	ScanKeyInit(&skey[0],
				Anum_pg_attribute_attrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
	ScanKeyInit(&skey[1],
				Anum_pg_attribute_attnum,
				BTGreaterStrategyNumber, F_INT2GT,
				Int16GetDatum(0));
960

961
	/*
B
Bruce Momjian 已提交
962 963 964
	 * Open pg_attribute and begin a scan.	Force heap scan if we haven't yet
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).
965
	 */
966
	pg_attribute_desc = heap_open(AttributeRelationId, AccessShareLock);
967 968 969 970 971
	pg_attribute_scan = systable_beginscan(pg_attribute_desc,
										   AttributeRelidNumIndexId,
										   criticalRelcachesBuilt,
										   SnapshotNow,
										   2, skey);
972

973
	/*
B
Bruce Momjian 已提交
974
	 * add attribute data to relation->rd_att
975
	 */
976
	need = relation->rd_rel->relnatts;
977

978
	while (HeapTupleIsValid(pg_attribute_tuple = systable_getnext(pg_attribute_scan)))
979
	{
980 981
		Form_pg_attribute attp;

982
		attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
983

984 985
		if (attp->attnum <= 0 ||
			attp->attnum > relation->rd_rel->relnatts)
986
			elog(ERROR, "invalid attribute number %d for %s",
987 988
				 attp->attnum, RelationGetRelationName(relation));

989 990
		memcpy(relation->rd_att->attrs[attp->attnum - 1],
			   attp,
991
			   ATTRIBUTE_FIXED_PART_SIZE);
992

993 994
		/* Update constraint/default info */
		if (attp->attnotnull)
995
			constr->has_not_null = true;
H
Hiroshi Inoue 已提交
996

997 998 999 1000
		if (attp->atthasdef)
		{
			if (attrdef == NULL)
				attrdef = (AttrDefault *)
1001 1002 1003
					MemoryContextAllocZero(CacheMemoryContext,
										   relation->rd_rel->relnatts *
										   sizeof(AttrDefault));
1004 1005 1006
			attrdef[ndef].adnum = attp->attnum;
			attrdef[ndef].adbin = NULL;
			ndef++;
1007
		}
1008 1009 1010
		need--;
		if (need == 0)
			break;
1011
	}
1012

1013
	/*
B
Bruce Momjian 已提交
1014
	 * end the scan and close the attribute relation
1015
	 */
1016
	systable_endscan(pg_attribute_scan);
1017
	heap_close(pg_attribute_desc, AccessShareLock);
H
Hiroshi Inoue 已提交
1018

1019 1020 1021 1022
	if (need != 0)
		elog(ERROR, "catalog is missing %d attribute(s) for relid %u",
			 need, RelationGetRelid(relation));

1023
	/*
B
Bruce Momjian 已提交
1024 1025 1026
	 * The attcacheoff values we read from pg_attribute should all be -1
	 * ("unknown").  Verify this if assert checking is on.	They will be
	 * computed when and if needed during tuple access.
1027 1028 1029
	 */
#ifdef USE_ASSERT_CHECKING
	{
B
Bruce Momjian 已提交
1030
		int			i;
1031 1032 1033 1034 1035 1036

		for (i = 0; i < relation->rd_rel->relnatts; i++)
			Assert(relation->rd_att->attrs[i]->attcacheoff == -1);
	}
#endif

1037
	/*
B
Bruce Momjian 已提交
1038
	 * However, we can easily set the attcacheoff value for the first
B
Bruce Momjian 已提交
1039 1040
	 * attribute: it must be zero.	This eliminates the need for special cases
	 * for attnum=1 that used to exist in fastgetattr() and index_getattr().
1041
	 */
1042 1043
	if (relation->rd_rel->relnatts > 0)
		relation->rd_att->attrs[0]->attcacheoff = 0;
1044

1045 1046 1047 1048
	/*
	 * Set up constraint/default info
	 */
	if (constr->has_not_null || ndef > 0 || relation->rd_rel->relchecks)
1049
	{
1050
		relation->rd_att->constr = constr;
1051

1052
		if (ndef > 0)			/* DEFAULTs */
1053
		{
1054 1055 1056 1057 1058 1059 1060
			if (ndef < relation->rd_rel->relnatts)
				constr->defval = (AttrDefault *)
					repalloc(attrdef, ndef * sizeof(AttrDefault));
			else
				constr->defval = attrdef;
			constr->num_defval = ndef;
			AttrDefaultFetch(relation);
1061
		}
1062 1063
		else
			constr->num_defval = 0;
1064

1065
		if (relation->rd_rel->relchecks > 0)	/* CHECKs */
1066
		{
1067 1068
			constr->num_check = relation->rd_rel->relchecks;
			constr->check = (ConstrCheck *)
1069
				MemoryContextAllocZero(CacheMemoryContext,
B
Bruce Momjian 已提交
1070
									constr->num_check * sizeof(ConstrCheck));
1071
			CheckConstraintFetch(relation);
1072
		}
1073 1074 1075 1076 1077 1078 1079
		else
			constr->num_check = 0;
	}
	else
	{
		pfree(constr);
		relation->rd_att->constr = NULL;
1080
	}
1081 1082
}

1083
/*
1084
 *		RelationBuildRuleLock
1085
 *
1086 1087
 *		Form the relation's rewrite rules from information in
 *		the pg_rewrite system catalog.
1088 1089 1090 1091 1092 1093 1094
 *
 * Note: The rule parsetrees are potentially very complex node structures.
 * To allow these trees to be freed when the relcache entry is flushed,
 * we make a private memory context to hold the RuleLock information for
 * each relcache entry that has associated rules.  The context is used
 * just for rule info, not for any other subsidiary data of the relcache
 * entry, because that keeps the update logic in RelationClearRelation()
B
Bruce Momjian 已提交
1095
 * manageable.	The other subsidiary data structures are simple enough
1096
 * to be easy to free explicitly, anyway.
1097 1098 1099 1100
 */
static void
RelationBuildRuleLock(Relation relation)
{
1101 1102
	MemoryContext rulescxt;
	MemoryContext oldcxt;
1103 1104 1105
	HeapTuple	rewrite_tuple;
	Relation	rewrite_desc;
	TupleDesc	rewrite_tupdesc;
1106 1107
	SysScanDesc rewrite_scan;
	ScanKeyData key;
1108 1109 1110 1111
	RuleLock   *rulelock;
	int			numlocks;
	RewriteRule **rules;
	int			maxlocks;
1112

1113
	/*
B
Bruce Momjian 已提交
1114 1115
	 * Make the private context.  Parameters are set on the assumption that
	 * it'll probably not contain much data.
1116 1117 1118
	 */
	rulescxt = AllocSetContextCreate(CacheMemoryContext,
									 RelationGetRelationName(relation),
1119 1120 1121
									 ALLOCSET_SMALL_MINSIZE,
									 ALLOCSET_SMALL_INITSIZE,
									 ALLOCSET_SMALL_MAXSIZE);
1122 1123
	relation->rd_rulescxt = rulescxt;

1124
	/*
B
Bruce Momjian 已提交
1125 1126
	 * allocate an array to hold the rewrite rules (the array is extended if
	 * necessary)
1127 1128
	 */
	maxlocks = 4;
1129 1130
	rules = (RewriteRule **)
		MemoryContextAlloc(rulescxt, sizeof(RewriteRule *) * maxlocks);
1131 1132
	numlocks = 0;

1133 1134 1135 1136 1137 1138 1139 1140
	/*
	 * form a scan key
	 */
	ScanKeyInit(&key,
				Anum_pg_rewrite_ev_class,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));

1141
	/*
B
Bruce Momjian 已提交
1142
	 * open pg_rewrite and begin a scan
1143
	 *
1144 1145
	 * Note: since we scan the rules using RewriteRelRulenameIndexId, we will
	 * be reading the rules in name order, except possibly during
B
Bruce Momjian 已提交
1146 1147
	 * emergency-recovery operations (ie, IgnoreSystemIndexes). This in turn
	 * ensures that rules will be fired in name order.
1148
	 */
1149
	rewrite_desc = heap_open(RewriteRelationId, AccessShareLock);
1150 1151
	rewrite_tupdesc = RelationGetDescr(rewrite_desc);

1152 1153 1154 1155
	rewrite_scan = systable_beginscan(rewrite_desc,
									  RewriteRelRulenameIndexId,
									  true, SnapshotNow,
									  1, &key);
1156

1157
	while (HeapTupleIsValid(rewrite_tuple = systable_getnext(rewrite_scan)))
1158
	{
1159
		Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
1160
		bool		isnull;
1161 1162
		Datum		rule_datum;
		char	   *rule_str;
1163
		RewriteRule *rule;
1164

1165 1166
		rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
												  sizeof(RewriteRule));
1167

1168
		rule->ruleId = HeapTupleGetOid(rewrite_tuple);
1169

1170 1171
		rule->event = rewrite_form->ev_type - '0';
		rule->attrno = rewrite_form->ev_attr;
1172
		rule->enabled = rewrite_form->ev_enabled;
1173 1174
		rule->isInstead = rewrite_form->is_instead;

1175
		/*
B
Bruce Momjian 已提交
1176 1177 1178 1179
		 * Must use heap_getattr to fetch ev_action and ev_qual.  Also, the
		 * rule strings are often large enough to be toasted.  To avoid
		 * leaking memory in the caller's context, do the detoasting here so
		 * we can free the detoasted version.
1180 1181
		 */
		rule_datum = heap_getattr(rewrite_tuple,
1182
								  Anum_pg_rewrite_ev_action,
1183
								  rewrite_tupdesc,
B
Bruce Momjian 已提交
1184
								  &isnull);
B
Bruce Momjian 已提交
1185
		Assert(!isnull);
1186
		rule_str = TextDatumGetCString(rule_datum);
1187
		oldcxt = MemoryContextSwitchTo(rulescxt);
1188
		rule->actions = (List *) stringToNode(rule_str);
1189
		MemoryContextSwitchTo(oldcxt);
1190
		pfree(rule_str);
1191

1192 1193 1194 1195
		rule_datum = heap_getattr(rewrite_tuple,
								  Anum_pg_rewrite_ev_qual,
								  rewrite_tupdesc,
								  &isnull);
B
Bruce Momjian 已提交
1196
		Assert(!isnull);
1197
		rule_str = TextDatumGetCString(rule_datum);
1198
		oldcxt = MemoryContextSwitchTo(rulescxt);
1199
		rule->qual = (Node *) stringToNode(rule_str);
1200
		MemoryContextSwitchTo(oldcxt);
1201
		pfree(rule_str);
1202

1203 1204
		/*
		 * We want the rule's table references to be checked as though by the
B
Bruce Momjian 已提交
1205
		 * table owner, not the user referencing the rule.	Therefore, scan
1206
		 * through the rule's actions and set the checkAsUser field on all
B
Bruce Momjian 已提交
1207
		 * rtable entries.	We have to look at the qual as well, in case it
1208 1209
		 * contains sublinks.
		 *
B
Bruce Momjian 已提交
1210 1211 1212 1213 1214
		 * The reason for doing this when the rule is loaded, rather than when
		 * it is stored, is that otherwise ALTER TABLE OWNER would have to
		 * grovel through stored rules to update checkAsUser fields. Scanning
		 * the rule tree during load is relatively cheap (compared to
		 * constructing it in the first place), so we do it here.
1215 1216 1217 1218
		 */
		setRuleCheckAsUser((Node *) rule->actions, relation->rd_rel->relowner);
		setRuleCheckAsUser(rule->qual, relation->rd_rel->relowner);

1219
		if (numlocks >= maxlocks)
1220 1221
		{
			maxlocks *= 2;
1222 1223
			rules = (RewriteRule **)
				repalloc(rules, sizeof(RewriteRule *) * maxlocks);
1224
		}
1225
		rules[numlocks++] = rule;
1226
	}
1227

1228
	/*
B
Bruce Momjian 已提交
1229
	 * end the scan and close the attribute relation
1230
	 */
1231
	systable_endscan(rewrite_scan);
1232
	heap_close(rewrite_desc, AccessShareLock);
1233

1234
	/*
B
Bruce Momjian 已提交
1235
	 * form a RuleLock and insert into relation
1236
	 */
1237
	rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
1238 1239 1240 1241
	rulelock->numLocks = numlocks;
	rulelock->rules = rules;

	relation->rd_rules = rulelock;
1242 1243
}

1244
/*
1245 1246 1247 1248 1249 1250 1251 1252 1253
 *		equalRuleLocks
 *
 *		Determine whether two RuleLocks are equivalent
 *
 *		Probably this should be in the rules code someplace...
 */
static bool
equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
{
1254
	int			i;
1255

1256
	/*
B
Bruce Momjian 已提交
1257
	 * As of 7.3 we assume the rule ordering is repeatable, because
B
Bruce Momjian 已提交
1258 1259
	 * RelationBuildRuleLock should read 'em in a consistent order.  So just
	 * compare corresponding slots.
1260
	 */
1261 1262 1263 1264 1265 1266 1267 1268 1269
	if (rlock1 != NULL)
	{
		if (rlock2 == NULL)
			return false;
		if (rlock1->numLocks != rlock2->numLocks)
			return false;
		for (i = 0; i < rlock1->numLocks; i++)
		{
			RewriteRule *rule1 = rlock1->rules[i];
1270 1271 1272
			RewriteRule *rule2 = rlock2->rules[i];

			if (rule1->ruleId != rule2->ruleId)
1273 1274 1275 1276 1277
				return false;
			if (rule1->event != rule2->event)
				return false;
			if (rule1->attrno != rule2->attrno)
				return false;
1278 1279
			if (rule1->enabled != rule2->enabled)
				return false;
1280 1281
			if (rule1->isInstead != rule2->isInstead)
				return false;
1282
			if (!equal(rule1->qual, rule2->qual))
1283
				return false;
1284
			if (!equal(rule1->actions, rule2->actions))
1285 1286 1287 1288 1289 1290
				return false;
		}
	}
	else if (rlock2 != NULL)
		return false;
	return true;
1291 1292 1293
}


1294
/*
1295 1296
 *		RelationBuildDesc
 *
1297 1298 1299 1300
 *		Build a relation descriptor.  The caller must hold at least
 *		AccessShareLock on the target relid.
 *
 *		The new descriptor is inserted into the hash table if insertIt is true.
1301 1302 1303 1304
 *
 *		Returns NULL if no pg_class row could be found for the given relid
 *		(suggesting we are trying to access a just-deleted relation).
 *		Any other error is reported via elog.
1305
 */
1306
static Relation
1307
RelationBuildDesc(Oid targetRelId, bool insertIt)
1308
{
1309 1310
	Relation	relation;
	Oid			relid;
1311
	Relation    pg_class_relation;
1312
	HeapTuple	pg_class_tuple;
1313
	Form_pg_class relp;
1314

1315
	/*
B
Bruce Momjian 已提交
1316
	 * find the tuple in pg_class corresponding to the given relation id
1317
	 */
1318
	pg_class_tuple = ScanPgRelation(targetRelId, true, &pg_class_relation);
1319

1320
	/*
B
Bruce Momjian 已提交
1321
	 * if no such tuple exists, return NULL
1322 1323 1324 1325
	 */
	if (!HeapTupleIsValid(pg_class_tuple))
		return NULL;

1326
	/*
B
Bruce Momjian 已提交
1327
	 * get information from the pg_class_tuple
1328
	 */
1329
	relid = HeapTupleGetOid(pg_class_tuple);
1330
	relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
1331
	heap_close(pg_class_relation, AccessShareLock);
1332

1333
	/*
B
Bruce Momjian 已提交
1334
	 * allocate storage for the relation descriptor, and copy pg_class_tuple
1335
	 * to relation->rd_rel and new fields into relation->rd_newfields.
1336
	 */
1337
	relation = AllocateRelationDesc(relp);
1338

1339
	/*
B
Bruce Momjian 已提交
1340
	 * initialize the relation's relation id (relation->rd_id)
1341
	 */
1342
	RelationGetRelid(relation) = relid;
1343

1344
	/*
B
Bruce Momjian 已提交
1345 1346 1347
	 * normal relations are not nailed into the cache; nor can a pre-existing
	 * relation be new.  It could be temp though.  (Actually, it could be new
	 * too, but it's okay to forget that fact if forced to flush the entry.)
1348
	 */
1349
	relation->rd_refcnt = 0;
1350
	relation->rd_isnailed = false;
1351
	relation->rd_createSubid = InvalidSubTransactionId;
1352
	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1353
	relation->rd_istemp = isTempOrToastNamespace(relation->rd_rel->relnamespace);
1354 1355
	relation->rd_issyscat = (strncmp(relation->rd_rel->relname.data, "pg_", 3) == 0);

1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366
	/*
	 * CDB: On QEs, temp relations must use shared buffer cache so data
	 * will be visible to all segmates.  On QD, sequence objects must
	 * use shared buffer cache so data will be visible to sequence server.
	 */
	if (relation->rd_istemp &&
		relation->rd_rel->relkind != RELKIND_SEQUENCE &&
		Gp_role != GP_ROLE_EXECUTE)
		relation->rd_isLocalBuf = true;
	else
		relation->rd_isLocalBuf = false;
1367

1368
	/*
B
Bruce Momjian 已提交
1369
	 * initialize the tuple descriptor (relation->rd_att).
1370
	 */
1371
	RelationBuildTupleDesc(relation);
1372

1373
	/*
B
Bruce Momjian 已提交
1374
	 * Fetch rules and triggers that affect this relation
1375
	 */
1376
	if (relation->rd_rel->relhasrules)
1377 1378
		RelationBuildRuleLock(relation);
	else
1379
	{
1380
		relation->rd_rules = NULL;
1381 1382
		relation->rd_rulescxt = NULL;
	}
1383

1384
	if (relation->rd_rel->reltriggers > 0)
1385 1386 1387 1388
		RelationBuildTriggers(relation);
	else
		relation->trigdesc = NULL;

1389
	/*
1390
	 * if it's an index, initialize index-related information
1391
	 */
1392
	if (OidIsValid(relation->rd_rel->relam))
1393
		RelationInitIndexAccessInfo(relation);
1394

1395 1396 1397 1398 1399 1400 1401 1402 1403
	/*
	 * if it's an append-only table, get information from pg_appendonly
	 */
	if (relation->rd_rel->relstorage == RELSTORAGE_AOROWS ||
		relation->rd_rel->relstorage == RELSTORAGE_AOCOLS)
	{
		RelationInitAppendOnlyInfo(relation);
	}

1404 1405 1406
	/* extract reloptions if any */
	RelationParseRelOptions(relation, pg_class_tuple);

1407
	/*
B
Bruce Momjian 已提交
1408
	 * initialize the relation lock manager information
1409 1410 1411
	 */
	RelationInitLockInfo(relation);		/* see lmgr.c */

1412 1413 1414 1415
	/*
	 * initialize physical addressing information for the relation
	 */
	RelationInitPhysicalAddr(relation);
1416

1417
	/* make sure relation is marked as having no open file yet */
1418
	relation->rd_smgr = NULL;
1419

1420 1421 1422 1423 1424 1425 1426 1427 1428
    /*
     * initialize Greenplum Database partitioning info
     */
    if (relation->rd_rel->relkind == RELKIND_RELATION &&
        !IsSystemRelation(relation))
        relation->rd_cdbpolicy = GpPolicyFetch(CacheMemoryContext, targetRelId);

    relation->rd_cdbDefaultStatsWarningIssued = false;

B
Bruce Momjian 已提交
1429 1430 1431 1432 1433
	/*
	 * now we can free the memory allocated for pg_class_tuple
	 */
	heap_freetuple(pg_class_tuple);

1434
	/*
1435
	 * Insert newly created relation into relcache hash table, if requested.
1436
	 */
1437 1438
	if (insertIt)
		RelationCacheInsert(relation);
1439

1440 1441 1442
	/* It's fully valid */
	relation->rd_isvalid = true;

1443
	return relation;
1444 1445
}

1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462
/*
 * Initialize the physical addressing info (RelFileNode) for a relcache entry
 */
static void
RelationInitPhysicalAddr(Relation relation)
{
	if (relation->rd_rel->reltablespace)
		relation->rd_node.spcNode = relation->rd_rel->reltablespace;
	else
		relation->rd_node.spcNode = MyDatabaseTableSpace;
	if (relation->rd_rel->relisshared)
		relation->rd_node.dbNode = InvalidOid;
	else
		relation->rd_node.dbNode = MyDatabaseId;
	relation->rd_node.relNode = relation->rd_rel->relfilenode;
}

1463 1464 1465 1466 1467
/*
 * Initialize index-access-method support data for an index relation
 */
void
RelationInitIndexAccessInfo(Relation relation)
1468
{
1469 1470
	HeapTuple	tuple;
	Form_pg_am	aform;
1471
	Datum		indclassDatum;
1472
	Datum		indoptionDatum;
1473
	bool		isnull;
1474
	oidvector  *indclass;
B
Bruce Momjian 已提交
1475
	int2vector *indoption;
1476
	MemoryContext indexcxt;
1477
	MemoryContext oldcontext;
1478
	int			natts;
1479 1480
	uint16		amstrategies;
	uint16		amsupport;
1481 1482

	/*
1483
	 * Make a copy of the pg_index entry for the index.  Since pg_index
B
Bruce Momjian 已提交
1484 1485
	 * contains variable-length and possibly-null fields, we have to do this
	 * honestly rather than just treating it as a Form_pg_index struct.
1486 1487 1488 1489 1490
	 */
	tuple = SearchSysCache(INDEXRELID,
						   ObjectIdGetDatum(RelationGetRelid(relation)),
						   0, 0, 0);
	if (!HeapTupleIsValid(tuple))
1491
		elog(ERROR, "cache lookup failed for index %u",
1492
			 RelationGetRelid(relation));
1493 1494 1495 1496
	oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_indextuple = heap_copytuple(tuple);
	relation->rd_index = (Form_pg_index) GETSTRUCT(relation->rd_indextuple);
	MemoryContextSwitchTo(oldcontext);
1497 1498 1499 1500 1501 1502 1503 1504 1505
	ReleaseSysCache(tuple);

	/*
	 * Make a copy of the pg_am entry for the index's access method
	 */
	tuple = SearchSysCache(AMOID,
						   ObjectIdGetDatum(relation->rd_rel->relam),
						   0, 0, 0);
	if (!HeapTupleIsValid(tuple))
1506
		elog(ERROR, "cache lookup failed for access method %u",
1507 1508 1509 1510 1511
			 relation->rd_rel->relam);
	aform = (Form_pg_am) MemoryContextAlloc(CacheMemoryContext, sizeof *aform);
	memcpy(aform, GETSTRUCT(tuple), sizeof *aform);
	ReleaseSysCache(tuple);
	relation->rd_am = aform;
1512 1513

	natts = relation->rd_rel->relnatts;
1514
	if (natts != relation->rd_index->indnatts)
1515
		elog(ERROR, "relnatts disagrees with indnatts for index %u",
1516
			 RelationGetRelid(relation));
1517 1518
	amstrategies = aform->amstrategies;
	amsupport = aform->amsupport;
1519

1520
	/*
B
Bruce Momjian 已提交
1521 1522 1523
	 * Make the private context to hold index access info.	The reason we need
	 * a context, and not just a couple of pallocs, is so that we won't leak
	 * any subsidiary info attached to fmgr lookup records.
1524 1525 1526 1527 1528 1529
	 *
	 * Context parameters are set on the assumption that it'll probably not
	 * contain much data.
	 */
	indexcxt = AllocSetContextCreate(CacheMemoryContext,
									 RelationGetRelationName(relation),
1530 1531 1532
									 ALLOCSET_SMALL_MINSIZE,
									 ALLOCSET_SMALL_INITSIZE,
									 ALLOCSET_SMALL_MAXSIZE);
1533 1534 1535 1536 1537
	relation->rd_indexcxt = indexcxt;

	/*
	 * Allocate arrays to hold data
	 */
1538 1539 1540
	relation->rd_aminfo = (RelationAmInfo *)
		MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));

1541 1542 1543 1544 1545
	relation->rd_opfamily = (Oid *)
		MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
	relation->rd_opcintype = (Oid *)
		MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));

1546
	if (amstrategies > 0)
1547
		relation->rd_operator = (Oid *)
1548 1549
			MemoryContextAllocZero(indexcxt,
								   natts * amstrategies * sizeof(Oid));
1550
	else
1551
		relation->rd_operator = NULL;
1552

1553
	if (amsupport > 0)
1554
	{
1555
		int			nsupport = natts * amsupport;
1556

1557
		relation->rd_support = (RegProcedure *)
1558
			MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
1559
		relation->rd_supportinfo = (FmgrInfo *)
1560
			MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
1561 1562
	}
	else
1563
	{
1564 1565
		relation->rd_support = NULL;
		relation->rd_supportinfo = NULL;
1566
	}
1567

1568 1569 1570
	relation->rd_indoption = (int16 *)
		MemoryContextAllocZero(indexcxt, natts * sizeof(int16));

1571 1572
	/*
	 * indclass cannot be referenced directly through the C struct, because it
B
Bruce Momjian 已提交
1573 1574
	 * comes after the variable-width indkey field.  Must extract the datum
	 * the hard way...
1575 1576 1577 1578 1579 1580 1581
	 */
	indclassDatum = fastgetattr(relation->rd_indextuple,
								Anum_pg_index_indclass,
								GetPgIndexDescriptor(),
								&isnull);
	Assert(!isnull);
	indclass = (oidvector *) DatumGetPointer(indclassDatum);
1582

1583
	/*
B
Bruce Momjian 已提交
1584 1585 1586
	 * Fill the operator and support procedure OID arrays, as well as the info
	 * about opfamilies and opclass input types.  (aminfo and supportinfo are
	 * left as zeroes, and are filled on-the-fly when used)
1587
	 */
1588 1589 1590
	IndexSupportInitialize(indclass,
						   relation->rd_operator, relation->rd_support,
						   relation->rd_opfamily, relation->rd_opcintype,
1591
						   amstrategies, amsupport, natts);
1592

1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603
	/*
	 * Similarly extract indoption and copy it to the cache entry
	 */
	indoptionDatum = fastgetattr(relation->rd_indextuple,
								 Anum_pg_index_indoption,
								 GetPgIndexDescriptor(),
								 &isnull);
	Assert(!isnull);
	indoption = (int2vector *) DatumGetPointer(indoptionDatum);
	memcpy(relation->rd_indoption, indoption->values, natts * sizeof(int16));

1604 1605 1606 1607 1608
	/*
	 * expressions and predicate cache will be filled later
	 */
	relation->rd_indexprs = NIL;
	relation->rd_indpred = NIL;
1609
	relation->rd_amcache = NULL;
1610 1611
}

1612
/*
1613
 * IndexSupportInitialize
1614
 *		Initializes an index's cached opclass information,
1615
 *		given the index's pg_index.indclass entry.
1616
 *
1617 1618
 * Data is returned into *indexOperator, *indexSupport, *opFamily, and
 * *opcInType, which are arrays allocated by the caller.
1619 1620 1621 1622 1623 1624 1625
 *
 * The caller also passes maxStrategyNumber, maxSupportNumber, and
 * maxAttributeNumber, since these indicate the size of the arrays
 * it has allocated --- but in practice these numbers must always match
 * those obtainable from the system catalog entries for the index and
 * access method.
 */
1626
void
1627
IndexSupportInitialize(oidvector *indclass,
1628 1629
					   Oid *indexOperator,
					   RegProcedure *indexSupport,
1630 1631
					   Oid *opFamily,
					   Oid *opcInType,
1632 1633 1634 1635 1636 1637 1638 1639 1640 1641
					   StrategyNumber maxStrategyNumber,
					   StrategyNumber maxSupportNumber,
					   AttrNumber maxAttributeNumber)
{
	int			attIndex;

	for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
	{
		OpClassCacheEnt *opcentry;

1642
		if (!OidIsValid(indclass->values[attIndex]))
1643
			elog(ERROR, "bogus pg_index tuple");
1644 1645

		/* look up the info for this opclass, using a cache */
1646
		opcentry = LookupOpclassInfo(indclass->values[attIndex],
1647 1648 1649
									 maxStrategyNumber,
									 maxSupportNumber);

1650
		/* copy cached data into relcache entry */
1651 1652
		opFamily[attIndex] = opcentry->opcfamily;
		opcInType[attIndex] = opcentry->opcintype;
1653
		if (maxStrategyNumber > 0)
1654 1655 1656
			memcpy(&indexOperator[attIndex * maxStrategyNumber],
				   opcentry->operatorOids,
				   maxStrategyNumber * sizeof(Oid));
1657
		if (maxSupportNumber > 0)
1658 1659 1660
			memcpy(&indexSupport[attIndex * maxSupportNumber],
				   opcentry->supportProcs,
				   maxSupportNumber * sizeof(RegProcedure));
1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675
	}
}

/*
 * LookupOpclassInfo
 *
 * This routine maintains a per-opclass cache of the information needed
 * by IndexSupportInitialize().  This is more efficient than relying on
 * the catalog cache, because we can load all the info about a particular
 * opclass in a single indexscan of pg_amproc or pg_amop.
 *
 * The information from pg_am about expected range of strategy and support
 * numbers is passed in, rather than being looked up, mainly because the
 * caller will have it already.
 *
1676 1677 1678
 * Note there is no provision for flushing the cache.  This is OK at the
 * moment because there is no way to ALTER any interesting properties of an
 * existing opclass --- all you can do is drop it, which will result in
1679
 * a useless but harmless dead entry in the cache.  To support altering
1680 1681 1682
 * opclass membership (not the same as opfamily membership!), we'd need to
 * be able to flush this cache as well as the contents of relcache entries
 * for indexes.
1683 1684 1685 1686 1687 1688 1689 1690
 */
static OpClassCacheEnt *
LookupOpclassInfo(Oid operatorClassOid,
				  StrategyNumber numStrats,
				  StrategyNumber numSupport)
{
	OpClassCacheEnt *opcentry;
	bool		found;
1691 1692
	Relation	rel;
	SysScanDesc scan;
1693
	ScanKeyData skey[3];
1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707
	HeapTuple	htup;
	bool		indexOK;

	if (OpClassCache == NULL)
	{
		/* First time through: initialize the opclass cache */
		HASHCTL		ctl;

		if (!CacheMemoryContext)
			CreateCacheMemoryContext();

		MemSet(&ctl, 0, sizeof(ctl));
		ctl.keysize = sizeof(Oid);
		ctl.entrysize = sizeof(OpClassCacheEnt);
1708
		ctl.hash = oid_hash;
1709 1710 1711 1712 1713 1714 1715 1716
		OpClassCache = hash_create("Operator class cache", 64,
								   &ctl, HASH_ELEM | HASH_FUNCTION);
	}

	opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
											   (void *) &operatorClassOid,
											   HASH_ENTER, &found);

1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738
	if (!found)
	{
		/* Need to allocate memory for new entry */
		opcentry->valid = false;	/* until known OK */
		opcentry->numStrats = numStrats;
		opcentry->numSupport = numSupport;

		if (numStrats > 0)
			opcentry->operatorOids = (Oid *)
				MemoryContextAllocZero(CacheMemoryContext,
									   numStrats * sizeof(Oid));
		else
			opcentry->operatorOids = NULL;

		if (numSupport > 0)
			opcentry->supportProcs = (RegProcedure *)
				MemoryContextAllocZero(CacheMemoryContext,
									   numSupport * sizeof(RegProcedure));
		else
			opcentry->supportProcs = NULL;
	}
	else
1739 1740 1741 1742 1743
	{
		Assert(numStrats == opcentry->numStrats);
		Assert(numSupport == opcentry->numSupport);
	}

1744 1745 1746 1747 1748 1749 1750 1751 1752 1753
	/*
	 * When testing for cache-flush hazards, we intentionally disable the
	 * operator class cache and force reloading of the info on each call.
	 * This is helpful because we want to test the case where a cache flush
	 * occurs while we are loading the info, and it's very hard to provoke
	 * that if this happens only once per opclass per backend.
	 */
#if defined(CLOBBER_CACHE_ALWAYS)
	opcentry->valid = false;
#endif
1754

1755 1756
	if (opcentry->valid)
		return opcentry;
1757 1758

	/*
1759 1760
	 * Need to fill in new entry.
	 *
B
Bruce Momjian 已提交
1761 1762 1763
	 * To avoid infinite recursion during startup, force heap scans if we're
	 * looking up info for the opclasses used by the indexes we would like to
	 * reference here.
1764 1765 1766 1767 1768
	 */
	indexOK = criticalRelcachesBuilt ||
		(operatorClassOid != OID_BTREE_OPS_OID &&
		 operatorClassOid != INT2_BTREE_OPS_OID);

1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796
	/*
	 * We have to fetch the pg_opclass row to determine its opfamily and
	 * opcintype, which are needed to look up the operators and functions.
	 * It'd be convenient to use the syscache here, but that probably doesn't
	 * work while bootstrapping.
	 */
	ScanKeyInit(&skey[0],
				ObjectIdAttributeNumber,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(operatorClassOid));
	rel = heap_open(OperatorClassRelationId, AccessShareLock);
	scan = systable_beginscan(rel, OpclassOidIndexId, indexOK,
							  SnapshotNow, 1, skey);

	if (HeapTupleIsValid(htup = systable_getnext(scan)))
	{
		Form_pg_opclass opclassform = (Form_pg_opclass) GETSTRUCT(htup);

		opcentry->opcfamily = opclassform->opcfamily;
		opcentry->opcintype = opclassform->opcintype;
	}
	else
		elog(ERROR, "could not find tuple for opclass %u", operatorClassOid);

	systable_endscan(scan);
	heap_close(rel, AccessShareLock);


1797
	/*
B
Bruce Momjian 已提交
1798
	 * Scan pg_amop to obtain operators for the opclass.  We only fetch the
1799
	 * default ones (those with lefttype = righttype = opcintype).
1800 1801 1802
	 */
	if (numStrats > 0)
	{
1803
		ScanKeyInit(&skey[0],
1804
					Anum_pg_amop_amopfamily,
1805
					BTEqualStrategyNumber, F_OIDEQ,
1806
					ObjectIdGetDatum(opcentry->opcfamily));
1807
		ScanKeyInit(&skey[1],
1808 1809 1810 1811 1812
					Anum_pg_amop_amoplefttype,
					BTEqualStrategyNumber, F_OIDEQ,
					ObjectIdGetDatum(opcentry->opcintype));
		ScanKeyInit(&skey[2],
					Anum_pg_amop_amoprighttype,
1813
					BTEqualStrategyNumber, F_OIDEQ,
1814
					ObjectIdGetDatum(opcentry->opcintype));
1815 1816
		rel = heap_open(AccessMethodOperatorRelationId, AccessShareLock);
		scan = systable_beginscan(rel, AccessMethodStrategyIndexId, indexOK,
1817
								  SnapshotNow, 3, skey);
1818 1819

		while (HeapTupleIsValid(htup = systable_getnext(scan)))
1820 1821 1822 1823 1824
		{
			Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(htup);

			if (amopform->amopstrategy <= 0 ||
				(StrategyNumber) amopform->amopstrategy > numStrats)
1825
				elog(ERROR, "invalid amopstrategy number %d for opclass %u",
1826 1827 1828 1829 1830
					 amopform->amopstrategy, operatorClassOid);
			opcentry->operatorOids[amopform->amopstrategy - 1] =
				amopform->amopopr;
		}

1831 1832
		systable_endscan(scan);
		heap_close(rel, AccessShareLock);
1833 1834 1835
	}

	/*
B
Bruce Momjian 已提交
1836
	 * Scan pg_amproc to obtain support procs for the opclass.	We only fetch
1837
	 * the default ones (those with lefttype = righttype = opcintype).
1838 1839 1840
	 */
	if (numSupport > 0)
	{
1841
		ScanKeyInit(&skey[0],
1842
					Anum_pg_amproc_amprocfamily,
1843
					BTEqualStrategyNumber, F_OIDEQ,
1844
					ObjectIdGetDatum(opcentry->opcfamily));
1845
		ScanKeyInit(&skey[1],
1846
					Anum_pg_amproc_amproclefttype,
1847
					BTEqualStrategyNumber, F_OIDEQ,
1848 1849 1850 1851 1852
					ObjectIdGetDatum(opcentry->opcintype));
		ScanKeyInit(&skey[2],
					Anum_pg_amproc_amprocrighttype,
					BTEqualStrategyNumber, F_OIDEQ,
					ObjectIdGetDatum(opcentry->opcintype));
1853 1854
		rel = heap_open(AccessMethodProcedureRelationId, AccessShareLock);
		scan = systable_beginscan(rel, AccessMethodProcedureIndexId, indexOK,
1855
								  SnapshotNow, 3, skey);
1856 1857

		while (HeapTupleIsValid(htup = systable_getnext(scan)))
1858 1859 1860 1861 1862
		{
			Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);

			if (amprocform->amprocnum <= 0 ||
				(StrategyNumber) amprocform->amprocnum > numSupport)
1863
				elog(ERROR, "invalid amproc number %d for opclass %u",
1864 1865 1866 1867 1868 1869
					 amprocform->amprocnum, operatorClassOid);

			opcentry->supportProcs[amprocform->amprocnum - 1] =
				amprocform->amproc;
		}

1870 1871
		systable_endscan(scan);
		heap_close(rel, AccessShareLock);
1872 1873 1874 1875 1876 1877 1878 1879 1880 1881
	}

	opcentry->valid = true;
	return opcentry;
}


/*
 *		formrdesc
 *
1882 1883
 *		This is a special cut-down version of RelationBuildDesc(),
 *		used while initializing the relcache.
1884
 *		The relation descriptor is built just from the supplied parameters,
1885 1886
 *		without actually looking at any system table entries.  We cheat
 *		quite a lot since we only need to work for a few basic system
1887 1888
 *		catalogs.
 *
1889 1890 1891
 * formrdesc is currently used for: pg_database, pg_authid, pg_auth_members,
 * pg_class, pg_attribute, pg_proc, and pg_type
 * (see RelationCacheInitializePhase2/3).
1892
 *
1893 1894
 * Note that these catalogs can't have constraints (except attnotnull),
 * default values, rules, or triggers, since we don't cope with any of that.
1895 1896 1897
 * (Well, actually, this only matters for properties that need to be valid
 * during bootstrap or before RelationCacheInitializePhase3 runs, and none of
 * these properties matter then...)
1898
 *
1899
 * NOTE: we assume we are already switched into CacheMemoryContext.
1900 1901
 */
static void
1902
formrdesc(const char *relationName, Oid relationReltype,
1903 1904
		  bool isshared, bool hasoids,
		  int natts, const FormData_pg_attribute *attrs)
1905
{
1906
	Relation	relation;
1907
	int			i;
1908
	bool		has_not_null;
1909

1910
	/*
1911
	 * allocate new relation desc, clear all fields of reldesc
1912
	 */
1913
	relation = (Relation) palloc0(sizeof(RelationData));
1914 1915 1916
	relation->rd_targblock = InvalidBlockNumber;

	/* make sure relation is marked as having no open file yet */
1917
	relation->rd_smgr = NULL;
1918

1919
	/*
1920
	 * initialize reference count: 1 because it is nailed in cache
1921
	 */
1922
	relation->rd_refcnt = 1;
1923

1924
	/*
B
Bruce Momjian 已提交
1925 1926
	 * all entries built with this routine are nailed-in-cache; none are for
	 * new or temp relations.
1927
	 */
1928
	relation->rd_isnailed = true;
1929
	relation->rd_createSubid = InvalidSubTransactionId;
1930
	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1931
	relation->rd_istemp = false;
1932 1933
	relation->rd_issyscat = (strncmp(relationName, "pg_", 3) == 0);	/* GP */
    relation->rd_isLocalBuf = false;    /*CDB*/
1934

1935
	/*
B
Bruce Momjian 已提交
1936
	 * initialize relation tuple form
1937
	 *
1938 1939
	 * The data we insert here is pretty incomplete/bogus, but it'll serve to
	 * get us launched.  RelationCacheInitializePhase2() will read the real
1940 1941 1942
	 * data from pg_class and replace what we've done here.  Note in particular
	 * that relowner is left as zero; this cues RelationCacheInitializePhase2
	 * that the real data isn't there yet.
1943
	 */
1944
	relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
1945

1946 1947
	namestrcpy(&relation->rd_rel->relname, relationName);
	relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
1948
	relation->rd_rel->reltype = relationReltype;
1949 1950

	/*
B
Bruce Momjian 已提交
1951
	 * It's important to distinguish between shared and non-shared relations,
1952
	 * even at bootstrap time, to make sure we know where they are stored.
1953
	 */
1954 1955 1956
	relation->rd_rel->relisshared = isshared;
	if (isshared)
		relation->rd_rel->reltablespace = GLOBALTABLESPACE_OID;
1957

1958 1959
	relation->rd_rel->relpages = 0;
	relation->rd_rel->reltuples = 0;
1960
	relation->rd_rel->relkind = RELKIND_RELATION;
1961
	relation->rd_rel->relstorage = RELSTORAGE_HEAP;
1962
	relation->rd_rel->relhasoids = hasoids;
1963
	relation->rd_rel->relnatts = (int16) natts;
1964

1965 1966 1967 1968 1969 1970
	/*
	 * Physical file-system information.
	 */
	relation->rd_segfile0_relationnodeinfo.isPresent = false;
	relation->rd_segfile0_relationnodeinfo.tidAllowedToBeZero = false;
	
1971
	/*
B
Bruce Momjian 已提交
1972
	 * initialize attribute tuple form
1973
	 *
B
Bruce Momjian 已提交
1974
	 * Unlike the case with the relation tuple, this data had better be right
B
Bruce Momjian 已提交
1975 1976
	 * because it will never be replaced.  The input values must be correctly
	 * defined by macros in src/include/catalog/ headers.
1977
	 */
1978
	relation->rd_att = CreateTemplateTupleDesc(natts, hasoids);
1979 1980
	relation->rd_att->tdrefcount = 1;	/* mark as refcounted */

1981 1982
	relation->rd_att->tdtypeid = relationReltype;
	relation->rd_att->tdtypmod = -1;	/* unnecessary, but... */
1983

1984
	/*
B
Bruce Momjian 已提交
1985
	 * initialize tuple desc info
1986
	 */
1987
	has_not_null = false;
1988 1989
	for (i = 0; i < natts; i++)
	{
1990
		memcpy(relation->rd_att->attrs[i],
1991 1992 1993
			   &attrs[i],
			   ATTRIBUTE_FIXED_PART_SIZE);
		has_not_null |= attrs[i].attnotnull;
1994 1995
		/* make sure attcacheoff is valid */
		relation->rd_att->attrs[i]->attcacheoff = -1;
1996 1997
	}

1998 1999 2000
	/* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
	relation->rd_att->attrs[0]->attcacheoff = 0;

2001 2002 2003 2004 2005 2006 2007 2008 2009
	/* mark not-null status */
	if (has_not_null)
	{
		TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

		constr->has_not_null = true;
		relation->rd_att->constr = constr;
	}

2010
	/*
2011
	 * initialize relation id from info in att array (my, this is ugly)
2012
	 */
2013
	RelationGetRelid(relation) = relation->rd_att->attrs[0]->attrelid;
2014
	relation->rd_rel->relfilenode = RelationGetRelid(relation);
2015

2016
	/*
2017
	 * initialize the relation lock manager information
2018 2019 2020
	 */
	RelationInitLockInfo(relation);		/* see lmgr.c */

2021 2022 2023 2024
	/*
	 * initialize physical addressing information for the relation
	 */
	RelationInitPhysicalAddr(relation);
2025

2026
	/*
B
Bruce Momjian 已提交
2027
	 * initialize the rel-has-index flag, using hardwired knowledge
2028
	 */
2029 2030 2031 2032 2033 2034
	if (IsBootstrapProcessingMode())
	{
		/* In bootstrap mode, we have no indexes */
		relation->rd_rel->relhasindex = false;
	}
	else
2035
	{
2036 2037
		/* Otherwise, all the rels formrdesc is used for have indexes */
		relation->rd_rel->relhasindex = true;
2038 2039
	}

2040
	/*
B
Bruce Momjian 已提交
2041
	 * add new reldesc to relcache
2042
	 */
2043
	RelationCacheInsert(relation);
2044 2045 2046

	/* It's fully valid */
	relation->rd_isvalid = true;
2047 2048 2049
}


2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092
static void
RelationInitAppendOnlyInfo(Relation relation)
{
	Relation	pg_appendonly_rel;
	HeapTuple	tuple;
	MemoryContext oldcontext;
	SysScanDesc scan;
	ScanKeyData skey;

	/*
	 * Check the pg_appendonly relation to be certain the ao table
	 * is there.
	 */
	pg_appendonly_rel = heap_open(AppendOnlyRelationId, AccessShareLock);

	ScanKeyInit(&skey,
				Anum_pg_appendonly_relid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
	/* FIXME: isn't there a mode in relcache code to *not* use an index? Should
	 * we do something here to obey it?
	 */
	scan = systable_beginscan(pg_appendonly_rel, AppendOnlyRelidIndexId, true,
							  SnapshotNow, 1, &skey);

	tuple = systable_getnext(scan);
	if (!tuple)
		elog(ERROR, "could not find pg_appendonly tuple for relation \"%s\"",
			 RelationGetRelationName(relation));

	/*
	 * Make a copy of the pg_appendonly entry for the table.
	 */
	oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_aotuple = heap_copytuple(tuple);
	relation->rd_appendonly = (Form_pg_appendonly) GETSTRUCT(relation->rd_aotuple);
	MemoryContextSwitchTo(oldcontext);
	systable_endscan(scan);
	heap_close(pg_appendonly_rel, AccessShareLock);

}


2093
/* ----------------------------------------------------------------
2094
 *				 Relation Descriptor Lookup Interface
2095 2096 2097
 * ----------------------------------------------------------------
 */

2098
/*
2099
 *		RelationIdGetRelation
2100
 *
2101
 *		Lookup a reldesc by OID; make one if not already in cache.
2102
 *
2103 2104 2105
 *		Returns NULL if no pg_class row could be found for the given relid
 *		(suggesting we are trying to access a just-deleted relation).
 *		Any other error is reported via elog.
2106
 *
2107 2108 2109 2110
 *		NB: caller should already have at least AccessShareLock on the
 *		relation ID, else there are nasty race conditions.
 *
 *		NB: relation ref count is incremented, or set to 1 if new entry.
2111 2112
 *		Caller should eventually decrement count.  (Usually,
 *		that happens by calling RelationClose().)
2113 2114
 */
Relation
2115
RelationIdGetRelation(Oid relationId)
2116
{
2117
	Relation	rd;
2118

2119 2120 2121
	/*
	 * first try to find reldesc in the cache
	 */
2122 2123 2124
	RelationIdCacheLookup(relationId, rd);

	if (RelationIsValid(rd))
2125
	{
2126
		RelationIncrementReferenceCount(rd);
2127
		/* revalidate cache entry if necessary */
2128
		if (!rd->rd_isvalid)
2129 2130 2131 2132 2133 2134 2135
		{
			/*
			 * Indexes only have a limited number of possible schema changes,
			 * and we don't want to use the full-blown procedure because it's
			 * a headache for indexes that reload itself depends on.
			 */
			if (rd->rd_rel->relkind == RELKIND_INDEX)
2136
				RelationReloadIndexInfo(rd);
2137 2138
			else
				RelationClearRelation(rd, true);
2139
		}
2140
		return rd;
2141
	}
2142

2143
	/*
B
Bruce Momjian 已提交
2144 2145
	 * no reldesc in the cache, so have RelationBuildDesc() build one and add
	 * it.
2146
	 */
2147
	rd = RelationBuildDesc(relationId, true);
2148 2149
	if (RelationIsValid(rd))
		RelationIncrementReferenceCount(rd);
2150

2151 2152 2153 2154
	return rd;
}

/* ----------------------------------------------------------------
2155
 *				cache invalidation support routines
2156 2157 2158
 * ----------------------------------------------------------------
 */

2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182
/*
 * RelationIncrementReferenceCount
 *		Increments relation reference count.
 *
 * Note: bootstrap mode has its own weird ideas about relation refcount
 * behavior; we ought to fix it someday, but for now, just disable
 * reference count ownership tracking in bootstrap mode.
 */
void
RelationIncrementReferenceCount(Relation rel)
{
	ResourceOwnerEnlargeRelationRefs(CurrentResourceOwner);
	rel->rd_refcnt += 1;
	if (!IsBootstrapProcessingMode())
		ResourceOwnerRememberRelationRef(CurrentResourceOwner, rel);
}

/*
 * RelationDecrementReferenceCount
 *		Decrements relation reference count.
 */
void
RelationDecrementReferenceCount(Relation rel)
{
2183 2184 2185 2186 2187 2188 2189 2190 2191 2192
	if (rel->rd_refcnt <= 0)
	{
		elog(ERROR,
			 "Relation decrement reference count found relation %u/%u/%u with bad count (reference count %d)",
			 rel->rd_node.spcNode,
			 rel->rd_node.dbNode,
			 rel->rd_node.relNode,
			 rel->rd_refcnt);
	}
	
2193 2194 2195 2196 2197
	rel->rd_refcnt -= 1;
	if (!IsBootstrapProcessingMode())
		ResourceOwnerForgetRelationRef(CurrentResourceOwner, rel);
}

2198
/*
2199 2200
 * RelationClose - close an open relation
 *
2201 2202 2203 2204 2205 2206 2207
 *	Actually, we just decrement the refcount.
 *
 *	NOTE: if compiled with -DRELCACHE_FORCE_RELEASE then relcache entries
 *	will be freed as soon as their refcount goes to zero.  In combination
 *	with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
 *	to catch references to already-released relcache entries.  It slows
 *	things down quite a bit, however.
2208 2209 2210 2211
 */
void
RelationClose(Relation relation)
{
2212 2213
	/* Note: no locking manipulations needed */
	RelationDecrementReferenceCount(relation);
2214 2215

#ifdef RELCACHE_FORCE_RELEASE
2216
	if (RelationHasReferenceCountZero(relation) &&
2217 2218
		relation->rd_createSubid == InvalidSubTransactionId &&
		relation->rd_newRelfilenodeSubid == InvalidSubTransactionId)
2219 2220
		RelationClearRelation(relation, false);
#endif
2221 2222
}

2223
/*
2224
 * RelationReloadIndexInfo - reload minimal information for an open index
2225
 *
2226 2227 2228 2229 2230 2231 2232
 *	This function is used only for indexes.  A relcache inval on an index
 *	can mean that its pg_class or pg_index row changed.  There are only
 *	very limited changes that are allowed to an existing index's schema,
 *	so we can update the relcache entry without a complete rebuild; which
 *	is fortunate because we can't rebuild an index entry that is "nailed"
 *	and/or in active use.  We support full replacement of the pg_class row,
 *	as well as updates of a few simple fields of the pg_index row.
2233
 *
2234
 *	We can't necessarily reread the catalog rows right away; we might be
2235 2236
 *	in a failed transaction when we receive the SI notification.  If so,
 *	RelationClearRelation just marks the entry as invalid by setting
2237
 *	rd_isvalid to false.  This routine is called to fix the entry when it
2238
 *	is next needed.
2239 2240 2241 2242
 *
 *	We assume that at the time we are called, we have at least AccessShareLock
 *	on the target index.  (Note: in the calls from RelationClearRelation,
 *	this is legitimate because we know the rel has positive refcount.)
2243 2244 2245 2246 2247 2248
 *
 *	If the target index is an index on pg_class or pg_index, we'd better have
 *	previously gotten at least AccessShareLock on its underlying catalog,
 *	else we are at risk of deadlock against someone trying to exclusive-lock
 *	the heap and index in that order.  This is ensured in current usage by
 *	only applying this to indexes being opened or having positive refcount.
H
Hiroshi Inoue 已提交
2249 2250
 */
static void
2251
RelationReloadIndexInfo(Relation relation)
H
Hiroshi Inoue 已提交
2252
{
2253
	bool		indexOK;
H
Hiroshi Inoue 已提交
2254
	HeapTuple	pg_class_tuple;
B
Bruce Momjian 已提交
2255
	Form_pg_class relp;
H
Hiroshi Inoue 已提交
2256

2257 2258 2259 2260 2261
	/* Should be called only for invalidated indexes */
	Assert(relation->rd_rel->relkind == RELKIND_INDEX &&
		   !relation->rd_isvalid);
	/* Should be closed at smgr level */
	Assert(relation->rd_smgr == NULL);
B
Bruce Momjian 已提交
2262

2263 2264 2265 2266 2267 2268
	/* Make sure targblock is reset in case rel was truncated */
	relation->rd_targblock = InvalidBlockNumber;
	/* Must free any AM cached data, too */
	if (relation->rd_amcache)
		pfree(relation->rd_amcache);
	relation->rd_amcache = NULL;
2269

2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282
	/*
	 * If it's a shared index, we might be called before backend startup has
	 * finished selecting a database, in which case we have no way to read
	 * pg_class yet.  However, a shared index can never have any significant
	 * schema updates, so it's okay to ignore the invalidation signal.  Just
	 * mark it valid and return without doing anything more.
	 */
	if (relation->rd_rel->relisshared && !criticalRelcachesBuilt)
	{
		relation->rd_isvalid = true;
		return;
	}

2283
	/*
2284 2285
	 * Read the pg_class row
	 *
2286 2287
	 * Don't try to use an indexscan of pg_class_oid_index to reload the info
	 * for pg_class_oid_index ...
2288
	 */
2289
	indexOK = (RelationGetRelid(relation) != ClassOidIndexId);
2290
	pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK, NULL);
H
Hiroshi Inoue 已提交
2291
	if (!HeapTupleIsValid(pg_class_tuple))
2292
		elog(ERROR, "could not find pg_class tuple for index %u",
2293
			 RelationGetRelid(relation));
H
Hiroshi Inoue 已提交
2294
	relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
2295
	memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
2296
	/* Reload reloptions in case they changed */
B
Bruce Momjian 已提交
2297 2298
	if (relation->rd_options)
		pfree(relation->rd_options);
2299 2300
	RelationParseRelOptions(relation, pg_class_tuple);
	/* done with pg_class tuple */
H
Hiroshi Inoue 已提交
2301
	heap_freetuple(pg_class_tuple);
2302 2303
	/* We must recalculate physical address in case it changed */
	RelationInitPhysicalAddr(relation);
2304 2305 2306 2307

	/* Forget gp_relation_node information -- it may have changed. */
	MemSet(&relation->rd_segfile0_relationnodeinfo, 0, sizeof(RelationNodeInfo));

2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324
	/*
	 * For a non-system index, there are fields of the pg_index row that are
	 * allowed to change, so re-read that row and update the relcache entry.
	 * Most of the info derived from pg_index (such as support function lookup
	 * info) cannot change, and indeed the whole point of this routine is to
	 * update the relcache entry without clobbering that data; so wholesale
	 * replacement is not appropriate.
	 */
	if (!IsSystemRelation(relation))
	{
		HeapTuple	tuple;
		Form_pg_index index;

		tuple = SearchSysCache(INDEXRELID,
							   ObjectIdGetDatum(RelationGetRelid(relation)),
							   0, 0, 0);
		if (!HeapTupleIsValid(tuple))
B
Bruce Momjian 已提交
2325 2326
			elog(ERROR, "cache lookup failed for index %u",
				 RelationGetRelid(relation));
2327 2328
		index = (Form_pg_index) GETSTRUCT(tuple);

2329 2330 2331 2332 2333 2334 2335 2336 2337
		/*
		 * Basically, let's just copy all the bool fields.  There are one or
		 * two of these that can't actually change in the current code, but
		 * it's not worth it to track exactly which ones they are.  None of
		 * the array fields are allowed to change, though.
		 */
		relation->rd_index->indisunique = index->indisunique;
		relation->rd_index->indisprimary = index->indisprimary;
		relation->rd_index->indisclustered = index->indisclustered;
2338
		relation->rd_index->indisvalid = index->indisvalid;
2339 2340
		relation->rd_index->indcheckxmin = index->indcheckxmin;
		relation->rd_index->indisready = index->indisready;
2341 2342

		/* Copy xmin too, as that is needed to make sense of indcheckxmin */
2343 2344
		HeapTupleHeaderSetXmin(relation->rd_indextuple->t_data,
							   HeapTupleHeaderGetXmin(tuple->t_data));
2345 2346 2347

		ReleaseSysCache(tuple);
	}
2348

2349
	/* Okay, now it's valid again */
2350
	relation->rd_isvalid = true;
H
Hiroshi Inoue 已提交
2351
}
2352

2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381
/*
 * RelationDestroyRelation
 *
 *	Physically delete a relation cache entry and all subsidiary data.
 *	Caller must already have unhooked the entry from the hash table.
 */
static void
RelationDestroyRelation(Relation relation)
{
	Assert(RelationHasReferenceCountZero(relation));

	/*
	 * Make sure smgr and lower levels close the relation's files, if they
	 * weren't closed already.  (This was probably done by caller, but let's
	 * just be real sure.)
	 */
	RelationCloseSmgr(relation);

	/*
	 * Free all the subsidiary data structures of the relcache entry,
	 * then the entry itself.
	 */
	if (relation->rd_rel)
		pfree(relation->rd_rel);
	/* can't use DecrTupleDescRefCount here */
	Assert(relation->rd_att->tdrefcount > 0);
	if (--relation->rd_att->tdrefcount == 0)
		FreeTupleDesc(relation->rd_att);
	list_free(relation->rd_indexlist);
2382
	bms_free(relation->rd_indexattr);
2383 2384 2385 2386 2387
	FreeTriggerDesc(relation->trigdesc);
	if (relation->rd_options)
		pfree(relation->rd_options);
	if (relation->rd_indextuple)
		pfree(relation->rd_indextuple);
2388 2389
	if (relation->rd_aotuple)
		pfree(relation->rd_aotuple);
2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401
	if (relation->rd_am)
		pfree(relation->rd_am);
	if (relation->rd_indexcxt)
		MemoryContextDelete(relation->rd_indexcxt);
	if (relation->rd_rulescxt)
		MemoryContextDelete(relation->rd_rulescxt);
	if (relation->rd_cdbpolicy)
		pfree(relation->rd_cdbpolicy);

	pfree(relation);
}

2402
/*
2403
 * RelationClearRelation
2404
 *
2405 2406
 *	 Physically blow away a relation cache entry, or reset it and rebuild
 *	 it from scratch (that is, from catalog entries).  The latter path is
2407 2408
 *	 used when we are notified of a change to an open relation (one with
 *	 refcount > 0).
2409
 *
2410 2411 2412 2413 2414 2415
 *	 NB: when rebuilding, we'd better hold some lock on the relation,
 *	 else the catalog data we need to read could be changing under us.
 *	 Also, a rel to be rebuilt had better have refcnt > 0.  This is because
 *	 an sinval reset could happen while we're accessing the catalogs, and
 *	 the rel would get blown away underneath us by RelationCacheInvalidate
 *	 if it has zero refcnt.
2416
 *
2417 2418 2419
 *	 The "rebuild" parameter is redundant in current usage because it has
 *	 to match the relation's refcnt status, but we keep it as a crosscheck
 *	 that we're doing what the caller expects.
2420
 */
2421
static void
2422
RelationClearRelation(Relation relation, bool rebuild)
2423
{
2424 2425 2426 2427 2428 2429 2430
	/*
	 * As per notes above, a rel to be rebuilt MUST have refcnt > 0; while
	 * of course it would be a bad idea to blow away one with nonzero refcnt.
	 */
	Assert(rebuild ?
		   !RelationHasReferenceCountZero(relation) :
		   RelationHasReferenceCountZero(relation));
2431 2432

	/*
2433
	 * Make sure smgr and lower levels close the relation's files, if they
B
Bruce Momjian 已提交
2434 2435 2436 2437
	 * weren't closed already.  If the relation is not getting deleted, the
	 * next smgr access should reopen the files automatically.	This ensures
	 * that the low-level file access state is updated after, say, a vacuum
	 * truncation.
2438
	 */
2439
	RelationCloseSmgr(relation);
2440

2441
	/*
B
Bruce Momjian 已提交
2442 2443 2444
	 * Never, never ever blow away a nailed-in system relation, because we'd
	 * be unable to recover.  However, we must reset rd_targblock, in case we
	 * got called because of a relation cache flush that was triggered by
2445
	 * VACUUM.  Likewise reset the fsm and vm size info.
2446
	 *
2447 2448 2449
	 * If it's a nailed index, then we need to re-read the pg_class row to see
	 * if its relfilenode changed.	We can't necessarily do that here, because
	 * we might be in a failed transaction.  We assume it's okay to do it if
B
Bruce Momjian 已提交
2450 2451 2452
	 * there are open references to the relcache entry (cf notes for
	 * AtEOXact_RelationCache).  Otherwise just mark the entry as possibly
	 * invalid, and it'll be fixed when next opened.
2453 2454
	 */
	if (relation->rd_isnailed)
H
Hiroshi Inoue 已提交
2455
	{
2456
		relation->rd_targblock = InvalidBlockNumber;
2457 2458
		if (relation->rd_rel->relkind == RELKIND_INDEX)
		{
B
Bruce Momjian 已提交
2459
			relation->rd_isvalid = false;		/* needs to be revalidated */
2460
			if (relation->rd_refcnt > 1)
2461
				RelationReloadIndexInfo(relation);
2462
		}
2463
		return;
H
Hiroshi Inoue 已提交
2464
	}
2465

2466 2467 2468 2469
	/*
	 * Even non-system indexes should not be blown away if they are open and
	 * have valid index support information.  This avoids problems with active
	 * use of the index support information.  As with nailed indexes, we
B
Bruce Momjian 已提交
2470
	 * re-read the pg_class row to handle possible physical relocation of the
2471
	 * index, and we check for pg_index updates too.
2472 2473 2474 2475 2476
	 */
	if (relation->rd_rel->relkind == RELKIND_INDEX &&
		relation->rd_refcnt > 0 &&
		relation->rd_indexcxt != NULL)
	{
B
Bruce Momjian 已提交
2477
		relation->rd_isvalid = false;	/* needs to be revalidated */
2478
		RelationReloadIndexInfo(relation);
2479 2480 2481
		return;
	}

2482 2483
	/* Mark it invalid until we've finished rebuild */
	relation->rd_isvalid = false;
2484

2485
	/*
2486
	 * If we're really done with the relcache entry, blow it away. But if
B
Bruce Momjian 已提交
2487 2488 2489
	 * someone is still using it, reconstruct the whole deal without moving
	 * the physical RelationData record (so that the someone's pointer is
	 * still valid).
2490
	 */
2491
	if (!rebuild)
2492
	{
2493 2494 2495 2496 2497
		/* Remove it from the hash table */
		RelationCacheDelete(relation);

		/* And release storage */
		RelationDestroyRelation(relation);
2498 2499 2500
	}
	else
	{
2501
		/*
2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516
		 * Our strategy for rebuilding an open relcache entry is to build
		 * a new entry from scratch, swap its contents with the old entry,
		 * and finally delete the new entry (along with any infrastructure
		 * swapped over from the old entry).  This is to avoid trouble in case
		 * an error causes us to lose control partway through.  The old entry
		 * will still be marked !rd_isvalid, so we'll try to rebuild it again
		 * on next access.  Meanwhile it's not any less valid than it was
		 * before, so any code that might expect to continue accessing it
		 * isn't hurt by the rebuild failure.  (Consider for example a
		 * subtransaction that ALTERs a table and then gets cancelled partway
		 * through the cache entry rebuild.  The outer transaction should
		 * still see the not-modified cache entry as valid.)  The worst
		 * consequence of an error is leaking the necessarily-unreferenced
		 * new entry, and this shouldn't happen often enough for that to be
		 * a big problem.
2517
		 *
2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533
		 * When rebuilding an open relcache entry, we must preserve ref count
		 * and rd_createSubid/rd_newRelfilenodeSubid state.  Also attempt to
		 * preserve the pg_class entry (rd_rel), tupledesc, and rewrite-rule
		 * substructures in place, because various places assume that these
		 * structures won't move while they are working with an open relcache
		 * entry.  (Note: the refcount mechanism for tupledescs might someday
		 * allow us to remove this hack for the tupledesc.)
 		 *
 		 * Note that this process does not touch CurrentResourceOwner; which
 		 * is good because whatever ref counts the entry may have do not
 		 * necessarily belong to that resource owner.
 		 */
		Relation	newrel;
 		Oid			save_relid = RelationGetRelid(relation);
		bool		keep_tupdesc;
		bool		keep_rules;
2534
		bool		keep_pt_info;
2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547

		/* Build temporary entry, but don't link it into hashtable */
		newrel = RelationBuildDesc(save_relid, false);
		if (newrel == NULL)
 		{
 			/* Should only get here if relation was deleted */
			RelationCacheDelete(relation);
			RelationDestroyRelation(relation);
 			elog(ERROR, "relation %u deleted while still in use", save_relid);
 		}
 
		keep_tupdesc = equalTupleDescs(relation->rd_att, newrel->rd_att, true);
		keep_rules = equalRuleLocks(relation->rd_rules, newrel->rd_rules);
2548 2549
		keep_pt_info = (relation->rd_rel->relfilenode ==
						newrel->rd_rel->relfilenode);
2550 2551 2552 2553 2554 2555 2556 2557 2558 2559

		/*
		 * Perform swapping of the relcache entry contents.  Within this
		 * process the old entry is momentarily invalid, so there *must*
		 * be no possibility of CHECK_FOR_INTERRUPTS within this sequence.
		 * Do it in all-in-line code for safety.
		 *
		 * Since the vast majority of fields should be swapped, our method
		 * is to swap the whole structures and then re-swap those few fields
		 * we didn't want swapped.
2560
		 */
2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574
#define SWAPFIELD(fldtype, fldname) \
		do { \
			fldtype _tmp = newrel->fldname; \
			newrel->fldname = relation->fldname; \
			relation->fldname = _tmp; \
		} while (0)

		/* swap all Relation struct fields */
 		{
			RelationData tmpstruct;

			memcpy(&tmpstruct, newrel, sizeof(RelationData));
			memcpy(newrel, relation, sizeof(RelationData));
			memcpy(relation, &tmpstruct, sizeof(RelationData));
2575
		}
2576 2577 2578 2579 2580 2581 2582 2583 2584

		/* rd_smgr must not be swapped, due to back-links from smgr level */
		SWAPFIELD(SMgrRelation, rd_smgr);
		/* rd_refcnt must be preserved */
		SWAPFIELD(int, rd_refcnt);
		/* isnailed shouldn't change */
		Assert(newrel->rd_isnailed == relation->rd_isnailed);
		/* creation sub-XIDs must be preserved */
		SWAPFIELD(SubTransactionId, rd_createSubid);
2585
		SWAPFIELD(SubTransactionId, rd_newRelfilenodeSubid);
2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601
		/* un-swap rd_rel pointers, swap contents instead */
		SWAPFIELD(Form_pg_class, rd_rel);
		/* ... but actually, we don't have to update newrel->rd_rel */
		memcpy(relation->rd_rel, newrel->rd_rel, CLASS_TUPLE_SIZE);
		/* preserve old tupledesc and rules if no logical change */
		if (keep_tupdesc)
			SWAPFIELD(TupleDesc, rd_att);
		if (keep_rules)
 		{
			SWAPFIELD(RuleLock *, rd_rules);
			SWAPFIELD(MemoryContext, rd_rulescxt);
 		}
		/* pgstat_info must be preserved */
		SWAPFIELD(struct PgStat_TableStatus *, pgstat_info);

		/* preserve persistent table information for the relation  */
2602 2603
		if (keep_pt_info)
			SWAPFIELD(struct RelationNodeInfo, rd_segfile0_relationnodeinfo);
2604 2605 2606 2607 2608

#undef SWAPFIELD

		/* And now we can throw away the temporary entry */
		RelationDestroyRelation(newrel);
2609
	}
2610 2611
}

2612
/*
2613 2614 2615 2616 2617
 * RelationFlushRelation
 *
 *	 Rebuild the relation if it is open (refcount > 0), else blow it away.
 */
static void
2618
RelationFlushRelation(Relation relation)
2619
{
2620 2621
	if (relation->rd_createSubid != InvalidSubTransactionId ||
		relation->rd_newRelfilenodeSubid != InvalidSubTransactionId)
2622 2623
	{
		/*
2624 2625
		 * New relcache entries are always rebuilt, not flushed; else we'd
		 * forget the "new" status of the relation, which is a useful
2626
		 * optimization to have.  Ditto for the new-relfilenode status.
2627 2628 2629 2630
		 *
		 * The rel could have zero refcnt here, so temporarily increment
		 * the refcnt to ensure it's safe to rebuild it.  We can assume that
		 * the current transaction has some lock on the rel already.
2631
		 */
2632 2633 2634
		RelationIncrementReferenceCount(relation);
		RelationClearRelation(relation, true);
		RelationDecrementReferenceCount(relation);
2635 2636 2637 2638
	}
	else
	{
		/*
2639
		 * Pre-existing rels can be dropped from the relcache if not open.
2640
		 */
2641
		bool	rebuild = !RelationHasReferenceCountZero(relation);
2642

2643 2644
		RelationClearRelation(relation, rebuild);
	}
2645 2646
}

2647
/*
2648
 * RelationForgetRelation - unconditionally remove a relcache entry
2649
 *
2650 2651
 *		   External interface for destroying a relcache entry when we
 *		   drop the relation.
2652 2653
 */
void
2654
RelationForgetRelation(Oid rid)
2655
{
2656
	Relation	relation;
2657 2658 2659

	RelationIdCacheLookup(rid, relation);

2660 2661 2662 2663
	if (!PointerIsValid(relation))
		return;					/* not in cache, nothing to do */

	if (!RelationHasReferenceCountZero(relation))
2664
		elog(ERROR, "relation %u is still open", rid);
2665 2666 2667

	/* Unconditionally destroy the relcache entry */
	RelationClearRelation(relation, false);
2668 2669
}

2670
/*
2671
 *		RelationCacheInvalidateEntry
2672 2673 2674
 *
 *		This routine is invoked for SI cache flush messages.
 *
2675 2676
 * Any relcache entry matching the relid must be flushed.  (Note: caller has
 * already determined that the relid belongs to our database or is a shared
2677
 * relation.)
2678 2679 2680 2681 2682 2683
 *
 * We used to skip local relations, on the grounds that they could
 * not be targets of cross-backend SI update messages; but it seems
 * safer to process them, so that our *own* SI update messages will
 * have the same effects during CommandCounterIncrement for both
 * local and nonlocal relations.
2684 2685
 */
void
2686
RelationCacheInvalidateEntry(Oid relationId)
2687
{
2688
	Relation	relation;
2689 2690 2691

	RelationIdCacheLookup(relationId, relation);

2692
	if (PointerIsValid(relation))
2693
	{
2694
		relcacheInvalsReceived++;
2695
		RelationFlushRelation(relation);
2696
	}
2697 2698 2699 2700
}

/*
 * RelationCacheInvalidate
2701
 *	 Blow away cached relation descriptors that have zero reference counts,
B
Bruce Momjian 已提交
2702
 *	 and rebuild those with positive reference counts.	Also reset the smgr
2703
 *	 relation cache.
2704
 *
2705
 *	 This is currently used only to recover from SI message buffer overflow,
2706
 *	 so we do not touch new-in-transaction relations; they cannot be targets
2707 2708
 *	 of cross-backend SI updates (and our own updates now go through a
 *	 separate linked list that isn't limited by the SI message buffer size).
2709 2710 2711
 *	 Likewise, we need not discard new-relfilenode-in-transaction hints,
 *	 since any invalidation of those would be a local event.
 *
2712 2713
 *	 We do this in two phases: the first pass deletes deletable items, and
 *	 the second one rebuilds the rebuildable items.  This is essential for
2714
 *	 safety, because hash_seq_search only copes with concurrent deletion of
B
Bruce Momjian 已提交
2715
 *	 the element it is currently visiting.	If a second SI overflow were to
2716 2717 2718 2719
 *	 occur while we are walking the table, resulting in recursive entry to
 *	 this routine, we could crash because the inner invocation blows away
 *	 the entry next to be visited by the outer scan.  But this way is OK,
 *	 because (a) during the first pass we won't process any more SI messages,
2720
 *	 so hash_seq_search will complete safely; (b) during the second pass we
2721
 *	 only hold onto pointers to nondeletable entries.
2722 2723 2724 2725 2726 2727
 *
 *	 The two-phase approach also makes it easy to ensure that we process
 *	 nailed-in-cache indexes before other nondeletable items, and that we
 *	 process pg_class_oid_index first of all.  In scenarios where a nailed
 *	 index has been given a new relfilenode, we have to detect that update
 *	 before the nailed index is used in reloading any other relcache entry.
2728 2729
 */
void
2730
RelationCacheInvalidate(void)
2731
{
2732
	HASH_SEQ_STATUS status;
2733
	RelIdCacheEnt *idhentry;
2734
	Relation	relation;
2735
	List	   *rebuildFirstList = NIL;
B
Bruce Momjian 已提交
2736
	List	   *rebuildList = NIL;
2737
	ListCell   *l;
2738 2739

	/* Phase 1 */
2740
	hash_seq_init(&status, RelationIdCache);
2741

2742
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2743
	{
2744
		relation = idhentry->reldesc;
2745

2746
		/* Must close all smgr references to avoid leaving dangling ptrs */
2747
		RelationCloseSmgr(relation);
2748

2749
		/* Ignore new relations, since they are never SI targets */
2750
		if (relation->rd_createSubid != InvalidSubTransactionId)
2751
			continue;
2752

2753 2754
		relcacheInvalsReceived++;

2755
		if (RelationHasReferenceCountZero(relation))
2756 2757
		{
			/* Delete this entry immediately */
2758
			Assert(!relation->rd_isnailed);
2759 2760 2761 2762
			RelationClearRelation(relation, false);
		}
		else
		{
2763 2764
			/*
			 * Add this entry to list of stuff to rebuild in second pass.
B
Bruce Momjian 已提交
2765 2766
			 * pg_class_oid_index goes on the front of rebuildFirstList, other
			 * nailed indexes on the back, and everything else into
2767 2768 2769 2770 2771
			 * rebuildList (in no particular order).
			 */
			if (relation->rd_isnailed &&
				relation->rd_rel->relkind == RELKIND_INDEX)
			{
2772
				if (RelationGetRelid(relation) == ClassOidIndexId)
2773 2774 2775 2776 2777 2778
					rebuildFirstList = lcons(relation, rebuildFirstList);
				else
					rebuildFirstList = lappend(rebuildFirstList, relation);
			}
			else
				rebuildList = lcons(relation, rebuildList);
2779
		}
2780
	}
2781

2782
	/*
B
Bruce Momjian 已提交
2783 2784 2785
	 * Now zap any remaining smgr cache entries.  This must happen before we
	 * start to rebuild entries, since that may involve catalog fetches which
	 * will re-open catalog files.
2786 2787 2788
	 */
	smgrcloseall();

2789
	/* Phase 2: rebuild the items found to need rebuild in phase 1 */
2790 2791 2792 2793 2794 2795
	foreach(l, rebuildFirstList)
	{
		relation = (Relation) lfirst(l);
		RelationClearRelation(relation, true);
	}
	list_free(rebuildFirstList);
2796
	foreach(l, rebuildList)
2797
	{
2798 2799
		relation = (Relation) lfirst(l);
		RelationClearRelation(relation, true);
2800
	}
2801
	list_free(rebuildList);
2802
}
2803

2804
/*
2805
 * AtEOXact_RelationCache
2806
 *
2807
 *	Clean up the relcache at main-transaction commit or abort.
2808 2809 2810 2811 2812
 *
 * Note: this must be called *before* processing invalidation messages.
 * In the case of abort, we don't want to try to rebuild any invalidated
 * cache entries (since we can't safely do database accesses).  Therefore
 * we must reset refcnts before handling pending invalidations.
2813 2814 2815 2816 2817 2818
 *
 * As of PostgreSQL 8.1, relcache refcnts should get released by the
 * ResourceOwner mechanism.  This routine just does a debugging
 * cross-check that no pins remain.  However, we also need to do special
 * cleanup when the current transaction created any relations or made use
 * of forced index lists.
2819 2820
 */
void
2821
AtEOXact_RelationCache(bool isCommit)
2822
{
2823
	HASH_SEQ_STATUS status;
2824
	RelIdCacheEnt *idhentry;
2825

2826 2827
	/*
	 * To speed up transaction exit, we want to avoid scanning the relcache
B
Bruce Momjian 已提交
2828 2829 2830 2831
	 * unless there is actually something for this routine to do.  Other than
	 * the debug-only Assert checks, most transactions don't create any work
	 * for us to do here, so we keep a static flag that gets set if there is
	 * anything to do.	(Currently, this means either a relation is created in
2832 2833 2834 2835
	 * the current xact, or one is given a new relfilenode, or an index list
	 * is forced.)  For simplicity, the flag remains set till end of top-level
	 * transaction, even though we could clear it at subtransaction end in
	 * some cases.
2836 2837 2838
	 *
	 * MPP-3333: READERS need to *always* scan, otherwise they will not be able
	 * to maintain a coherent view of the storage layer.
2839 2840
	 */
	if (!need_eoxact_work
2841
		&& DistributedTransactionContext != DTX_CONTEXT_QE_READER
2842 2843 2844 2845 2846 2847
#ifdef USE_ASSERT_CHECKING
		&& !assert_enabled
#endif
		)
		return;

2848
	hash_seq_init(&status, RelationIdCache);
2849

2850
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2851
	{
2852
		Relation	relation = idhentry->reldesc;
2853 2854 2855 2856 2857

		/*
		 * The relcache entry's ref count should be back to its normal
		 * not-in-a-transaction state: 0 unless it's nailed in cache.
		 *
B
Bruce Momjian 已提交
2858 2859 2860
		 * In bootstrap mode, this is NOT true, so don't check it --- the
		 * bootstrap code expects relations to stay open across start/commit
		 * transaction calls.  (That seems bogus, but it's not worth fixing.)
2861 2862 2863 2864 2865 2866 2867 2868 2869 2870
		 */
#ifdef USE_ASSERT_CHECKING
		if (!IsBootstrapProcessingMode())
		{
			int			expected_refcnt;

			expected_refcnt = relation->rd_isnailed ? 1 : 0;
			Assert(relation->rd_refcnt == expected_refcnt);
		}
#endif
2871

2872 2873 2874 2875 2876 2877 2878 2879 2880
		/*
		 * QE-readers aren't properly enrolled in transactions, they
		 * just get the snapshot which corresponds -- so here, where
		 * we are maintaining their relcache, we want to just clean
		 * up (almost as if we had aborted). (MPP-3338)
		 */
		if (DistributedTransactionContext == DTX_CONTEXT_QE_ENTRY_DB_SINGLETON ||
			DistributedTransactionContext == DTX_CONTEXT_QE_READER)
		{
2881
			RelationClearRelation(relation, relation->rd_isnailed ? true : false);
2882 2883 2884
			continue;
		}

2885 2886 2887
		/*
		 * Is it a relation created in the current transaction?
		 *
B
Bruce Momjian 已提交
2888 2889 2890 2891 2892 2893
		 * During commit, reset the flag to zero, since we are now out of the
		 * creating transaction.  During abort, simply delete the relcache
		 * entry --- it isn't interesting any longer.  (NOTE: if we have
		 * forgotten the new-ness of a new relation due to a forced cache
		 * flush, the entry will get deleted anyway by shared-cache-inval
		 * processing of the aborted pg_class insertion.)
2894
		 */
2895
		if (relation->rd_createSubid != InvalidSubTransactionId)
2896
		{
2897
			if (isCommit)
2898
				relation->rd_createSubid = InvalidSubTransactionId;
2899 2900
			else
			{
2901 2902 2903 2904 2905
				/*
				 * In abort, delete the error log file before forgetting
				 * this relation.
				 */
				ErrorLogDelete(MyDatabaseId, RelationGetRelid(relation));
2906 2907 2908 2909 2910
				RelationClearRelation(relation, false);
				continue;
			}
		}

2911 2912 2913
		/*
		 * Likewise, reset the hint about the relfilenode being new.
		 */
2914
		relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
2915

2916 2917 2918 2919 2920
		/*
		 * Flush any temporary index list.
		 */
		if (relation->rd_indexvalid == 2)
		{
2921
			list_free(relation->rd_indexlist);
2922
			relation->rd_indexlist = NIL;
2923
			relation->rd_oidindex = InvalidOid;
2924 2925
			relation->rd_indexvalid = 0;
		}
2926
	}
2927

2928 2929
	/* Once done with the transaction, we can reset need_eoxact_work */
	need_eoxact_work = false;
2930
}
2931

2932 2933 2934 2935 2936 2937 2938 2939
/*
 * AtEOSubXact_RelationCache
 *
 *	Clean up the relcache at sub-transaction commit or abort.
 *
 * Note: this must be called *before* processing invalidation messages.
 */
void
2940 2941
AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
						  SubTransactionId parentSubid)
2942 2943 2944 2945
{
	HASH_SEQ_STATUS status;
	RelIdCacheEnt *idhentry;

2946
	/*
2947 2948
	 * Skip the relcache scan if nothing to do --- see notes for
	 * AtEOXact_RelationCache.
2949
	 */
2950 2951
	if (!need_eoxact_work &&
		DistributedTransactionContext != DTX_CONTEXT_QE_READER)
2952 2953
		return;

2954 2955 2956 2957 2958 2959
	hash_seq_init(&status, RelationIdCache);

	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
	{
		Relation	relation = idhentry->reldesc;

2960 2961 2962 2963 2964 2965 2966 2967
		/*
		 * As opposed to AtEOXact_RelationCache, subtransactions
		 * in readers are only caused by internal commands, and
		 * there shouldn't be interaction with global transactions,
		 * (reader gangs commit their transaction independently)
		 * we must not clear the relcache here.
		 */

2968 2969 2970
		/*
		 * Is it a relation created in the current subtransaction?
		 *
2971 2972
		 * During subcommit, mark it as belonging to the parent, instead.
		 * During subabort, simply delete the relcache entry.
2973
		 */
2974
		if (relation->rd_createSubid == mySubid)
2975 2976
		{
			if (isCommit)
2977
				relation->rd_createSubid = parentSubid;
2978
			else if (RelationHasReferenceCountZero(relation))
2979
			{
2980 2981 2982 2983 2984
				/*
				 * In abort, delete the error log file before forgetting
				 * this relation.
				 */
				ErrorLogDelete(MyDatabaseId, RelationGetRelid(relation));
2985

2986 2987 2988
				RelationClearRelation(relation, false);
				continue;
			}
2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002
			else
			{
				/*
				 * Hmm, somewhere there's a (leaked?) reference to the
				 * relation.  We daren't remove the entry for fear of
				 * dereferencing a dangling pointer later.  Bleat, and mark it
				 * as not belonging to the current transaction.  Hopefully
				 * it'll get cleaned up eventually.  This must be just a
				 * WARNING to avoid error-during-error-recovery loops.
				 */
				relation->rd_createSubid = InvalidSubTransactionId;
				elog(WARNING, "cannot remove relcache entry for \"%s\" because it has nonzero refcount",
					 RelationGetRelationName(relation));
			}
3003 3004
		}

3005
		/*
B
Bruce Momjian 已提交
3006 3007
		 * Likewise, update or drop any new-relfilenode-in-subtransaction
		 * hint.
3008
		 */
3009 3010 3011 3012 3013
		if (relation->rd_newRelfilenodeSubid == mySubid)
		{
			if (isCommit)
				relation->rd_newRelfilenodeSubid = parentSubid;
			else
3014
				relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3015
		}
3016 3017 3018 3019 3020 3021 3022 3023

		/*
		 * Flush any temporary index list.
		 */
		if (relation->rd_indexvalid == 2)
		{
			list_free(relation->rd_indexlist);
			relation->rd_indexlist = NIL;
3024
			relation->rd_oidindex = InvalidOid;
3025 3026 3027 3028 3029
			relation->rd_indexvalid = 0;
		}
	}
}

3030 3031 3032 3033
/*
 * RelationCacheMarkNewRelfilenode
 *
 *	Mark the rel as having been given a new relfilenode in the current
B
Bruce Momjian 已提交
3034
 *	(sub) transaction.	This is a hint that can be used to optimize
3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046
 *	later operations on the rel in the same transaction.
 */
void
RelationCacheMarkNewRelfilenode(Relation rel)
{
	/* Mark it... */
	rel->rd_newRelfilenodeSubid = GetCurrentSubTransactionId();
	/* ... and now we have eoxact cleanup work to do */
	need_eoxact_work = true;
}


3047
/*
3048 3049 3050
 *		RelationBuildLocalRelation
 *			Build a relcache entry for an about-to-be-created relation,
 *			and enter it into the relcache.
3051
 */
3052 3053
Relation
RelationBuildLocalRelation(const char *relname,
3054
						   Oid relnamespace,
3055
						   TupleDesc tupDesc,
3056 3057
						   Oid relid,
						   Oid reltablespace,
3058
			               char relkind,            /*CDB*/
3059
						   bool shared_relation)
3060
{
3061
	Relation	rel;
3062
	MemoryContext oldcxt;
3063 3064
	int			natts = tupDesc->natts;
	int			i;
3065
	bool		has_not_null;
3066
	bool		nailit;
3067

3068
	AssertArg(natts >= 0);
3069

3070 3071 3072
	/*
	 * check for creation of a rel that must be nailed in cache.
	 *
3073 3074
	 * XXX this list had better match the relations specially handled in
	 * RelationCacheInitializePhase2/3.
3075 3076 3077
	 */
	switch (relid)
	{
3078 3079 3080
		case DatabaseRelationId:
		case AuthIdRelationId:
		case AuthMemRelationId:
3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091
		case RelationRelationId:
		case AttributeRelationId:
		case ProcedureRelationId:
		case TypeRelationId:
			nailit = true;
			break;
		default:
			nailit = false;
			break;
	}

3092 3093
	/*
	 * check that hardwired list of shared rels matches what's in the
B
Bruce Momjian 已提交
3094 3095 3096
	 * bootstrap .bki file.  If you get a failure here during initdb, you
	 * probably need to fix IsSharedRelation() to match whatever you've done
	 * to the set of shared relations.
3097 3098 3099 3100 3101
	 */
	if (shared_relation != IsSharedRelation(relid))
		elog(ERROR, "shared_relation flag for \"%s\" does not match IsSharedRelation(%u)",
			 relname, relid);

3102 3103 3104 3105 3106
	/*
	 * switch to the cache context to create the relcache entry.
	 */
	if (!CacheMemoryContext)
		CreateCacheMemoryContext();
3107

3108 3109
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

3110
	/*
3111
	 * allocate a new relation descriptor and fill in basic state fields.
3112
	 */
3113
	rel = (Relation) palloc0(sizeof(RelationData));
3114

3115 3116 3117
	rel->rd_targblock = InvalidBlockNumber;

	/* make sure relation is marked as having no open file yet */
3118
	rel->rd_smgr = NULL;
3119

3120 3121 3122
	/* mark it nailed if appropriate */
	rel->rd_isnailed = nailit;

3123
	rel->rd_refcnt = nailit ? 1 : 0;
3124

3125
	/* it's being created in this transaction */
3126
	rel->rd_createSubid = GetCurrentSubTransactionId();
3127
	rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3128

3129
	/* must flag that we have rels created in this transaction */
3130
	need_eoxact_work = true;
3131

3132
	/* is it a temporary relation? */
3133
	rel->rd_istemp = isTempOrToastNamespace(relnamespace);
3134

3135 3136 3137
	/* is it a system catalog? */
	rel->rd_issyscat = (strncmp(relname, "pg_", 3) == 0);

3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148
	/*
	 * CDB: On QEs, temp relations must use shared buffer cache so data
	 * will be visible to all segmates.  On QD, sequence objects must
	 * use shared buffer cache so data will be visible to sequence server.
	 */
	if (rel->rd_istemp &&
		relkind != RELKIND_SEQUENCE &&
		Gp_role != GP_ROLE_EXECUTE)
		rel->rd_isLocalBuf = true;
	else
		rel->rd_isLocalBuf = false;
3149

3150
	/*
3151
	 * create a new tuple descriptor from the one passed in.  We do this
B
Bruce Momjian 已提交
3152 3153 3154 3155
	 * partly to copy it into the cache context, and partly because the new
	 * relation can't have any defaults or constraints yet; they have to be
	 * added in later steps, because they require additions to multiple system
	 * catalogs.  We can copy attnotnull constraints here, however.
3156
	 */
3157
	rel->rd_att = CreateTupleDescCopy(tupDesc);
3158
	rel->rd_att->tdrefcount = 1;	/* mark as refcounted */
3159
	has_not_null = false;
3160
	for (i = 0; i < natts; i++)
3161
	{
3162
		rel->rd_att->attrs[i]->attnotnull = tupDesc->attrs[i]->attnotnull;
3163 3164 3165 3166 3167 3168 3169 3170 3171 3172
		has_not_null |= tupDesc->attrs[i]->attnotnull;
	}

	if (has_not_null)
	{
		TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

		constr->has_not_null = true;
		rel->rd_att->constr = constr;
	}
3173 3174 3175 3176

	/*
	 * initialize relation tuple form (caller may add/override data later)
	 */
3177
	rel->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
3178

3179 3180
	namestrcpy(&rel->rd_rel->relname, relname);
	rel->rd_rel->relnamespace = relnamespace;
3181 3182

	rel->rd_rel->relkind = RELKIND_UNCATALOGED;
3183
	rel->rd_rel->relstorage = RELSTORAGE_HEAP;
3184
	rel->rd_rel->relhasoids = rel->rd_att->tdhasoid;
3185 3186
	rel->rd_rel->relnatts = natts;
	rel->rd_rel->reltype = InvalidOid;
3187 3188
	/* needed when bootstrapping: */
	rel->rd_rel->relowner = BOOTSTRAP_SUPERUSERID;
3189

3190 3191 3192 3193 3194 3195 3196
	/*
	 * Create zeroed-out gp_relation_node data.  It will be filled in when the
	 * disk file is created.
	 */
	rel->rd_segfile0_relationnodeinfo.isPresent = false;
	rel->rd_segfile0_relationnodeinfo.tidAllowedToBeZero = false;

3197
	/*
B
Bruce Momjian 已提交
3198 3199 3200
	 * Insert relation physical and logical identifiers (OIDs) into the right
	 * places.	Note that the physical ID (relfilenode) is initially the same
	 * as the logical ID (OID).
3201
	 */
3202
	rel->rd_rel->relisshared = shared_relation;
3203 3204 3205 3206 3207 3208

	RelationGetRelid(rel) = relid;

	for (i = 0; i < natts; i++)
		rel->rd_att->attrs[i]->attrelid = relid;

3209 3210
	rel->rd_rel->relfilenode = relid;
	rel->rd_rel->reltablespace = reltablespace;
3211

3212
	RelationInitLockInfo(rel);	/* see lmgr.c */
3213

3214 3215
	RelationInitPhysicalAddr(rel);

3216 3217 3218 3219
	/*
	 * Okay to insert into the relcache hash tables.
	 */
	RelationCacheInsert(rel);
3220

3221 3222 3223
	/*
	 * done building relcache entry.
	 */
3224
	MemoryContextSwitchTo(oldcxt);
3225

3226 3227 3228
	/* It's fully valid */
	rel->rd_isvalid = true;

3229 3230 3231 3232 3233
	/*
	 * Caller expects us to pin the returned entry.
	 */
	RelationIncrementReferenceCount(rel);

3234
	return rel;
3235 3236
}

3237
/*
3238
 *		RelationCacheInitialize
3239
 *
3240 3241
 *		This initializes the relation descriptor cache.  At the time
 *		that this is invoked, we can't do database access yet (mainly
3242 3243 3244 3245 3246
 *		because the transaction subsystem is not up); all we are doing
 *		is making an empty cache hashtable.  This must be done before
 *		starting the initialization transaction, because otherwise
 *		AtEOXact_RelationCache would crash if that transaction aborts
 *		before we can get the relcache set up.
3247 3248
 */

3249
#define INITRELCACHESIZE		400
3250 3251

void
3252
RelationCacheInitialize(void)
3253
{
3254 3255
	MemoryContext oldcxt;
	HASHCTL		ctl;
3256

3257
	/*
3258
	 * make sure cache memory context exists
3259
	 */
3260 3261
	if (!CacheMemoryContext)
		CreateCacheMemoryContext();
3262

3263 3264 3265
    /*
	 * switch to cache memory context
	 */
3266
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3267

3268
	/*
3269
	 * create hashtable that indexes the relcache
3270
	 */
3271
	MemSet(&ctl, 0, sizeof(ctl));
3272
	ctl.keysize = sizeof(Oid);
3273
	ctl.entrysize = sizeof(RelIdCacheEnt);
3274
	ctl.hash = oid_hash;
3275 3276
	RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
								  &ctl, HASH_ELEM | HASH_FUNCTION);
3277

3278 3279 3280 3281 3282 3283
	MemoryContextSwitchTo(oldcxt);
}

/*
 *		RelationCacheInitializePhase2
 *
3284 3285 3286 3287 3288 3289 3290
 *		This is called to prepare for access to shared catalogs during startup.
 *		We must at least set up nailed reldescs for pg_database, pg_authid,
 *		and pg_auth_members.  Ideally we'd like to have reldescs for their
 *		indexes, too.  We attempt to load this information from the shared
 *		relcache init file.  If that's missing or broken, just make phony
 *		entries for the catalogs themselves.  RelationCacheInitializePhase3
 *		will clean up as needed.
3291 3292 3293
 */
void
RelationCacheInitializePhase2(void)
3294 3295 3296 3297 3298 3299 3300 3301 3302 3303 3304 3305 3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316 3317 3318 3319 3320 3321 3322 3323 3324 3325 3326 3327 3328 3329 3330 3331 3332 3333 3334 3335 3336 3337 3338 3339 3340 3341 3342 3343
{
	MemoryContext oldcxt;

	/*
	 * In bootstrap mode, the shared catalogs aren't there yet anyway, so do
	 * nothing.
	 */
	if (IsBootstrapProcessingMode())
		return;

	/*
	 * switch to cache memory context
	 */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

	/*
	 * Try to load the shared relcache cache file.	If unsuccessful, bootstrap
	 * the cache with pre-made descriptors for the critical shared catalogs.
	 */
	if (!load_relcache_init_file(true))
	{
		formrdesc("pg_database", PG_DATABASE_RELTYPE_OID, true,
				  true, Natts_pg_database, Desc_pg_database);
		formrdesc("pg_authid", PG_AUTHID_RELTYPE_OID, true,
				  true, Natts_pg_authid, Desc_pg_authid);
		formrdesc("pg_auth_members", PG_AUTH_MEMBERS_RELTYPE_OID, true,
				  false, Natts_pg_auth_members, Desc_pg_auth_members);

#define NUM_CRITICAL_SHARED_RELS	3	/* fix if you change list above */
	}

	MemoryContextSwitchTo(oldcxt);
}

/*
 *		RelationCacheInitializePhase3
 *
 *		This is called as soon as the catcache and transaction system
 *		are functional and we have determined MyDatabaseId.  At this point
 *		we can actually read data from the database's system catalogs.
 *		We first try to read pre-computed relcache entries from the local
 *		relcache init file.  If that's missing or broken, make phony entries
 *		for the minimum set of nailed-in-cache relations.  Then (unless
 *		bootstrapping) make sure we have entries for the critical system
 *		indexes.  Once we've done all this, we have enough infrastructure to
 *		open any system catalog or use any catcache.  The last step is to
 *		rewrite the cache files if needed.
 */
void
RelationCacheInitializePhase3(void)
3344 3345 3346 3347
{
	HASH_SEQ_STATUS status;
	RelIdCacheEnt *idhentry;
	MemoryContext oldcxt;
3348
	bool		needNewCacheFile = !criticalSharedRelcachesBuilt;
3349

3350
	/*
3351 3352 3353 3354 3355
	 * switch to cache memory context
	 */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

	/*
3356 3357
	 * Try to load the local relcache cache file.  If unsuccessful, bootstrap
	 * the cache with pre-made descriptors for the critical "nailed-in" system
3358
	 * catalogs.
3359
	 */
3360
	if (IsBootstrapProcessingMode() ||
3361
		!load_relcache_init_file(false))
3362
	{
3363 3364
		needNewCacheFile = true;

3365
		formrdesc("pg_class", PG_CLASS_RELTYPE_OID, false,
3366
				  true, Natts_pg_class, Desc_pg_class);
3367
		formrdesc("pg_attribute", PG_ATTRIBUTE_RELTYPE_OID, false,
3368
				  false, Natts_pg_attribute, Desc_pg_attribute);
3369
		formrdesc("pg_proc", PG_PROC_RELTYPE_OID, false,
3370
				  true, Natts_pg_proc, Desc_pg_proc);
3371
		formrdesc("pg_type", PG_TYPE_RELTYPE_OID, false,
3372
				  true, Natts_pg_type, Desc_pg_type);
3373

3374
#define NUM_CRITICAL_LOCAL_RELS 4		/* fix if you change list above */
3375
	}
3376 3377

	MemoryContextSwitchTo(oldcxt);
3378

3379
	/* In bootstrap mode, the faked-up formrdesc info is all we'll have */
3380 3381 3382
	if (IsBootstrapProcessingMode())
		return;

3383
	/*
B
Bruce Momjian 已提交
3384
	 * If we didn't get the critical system indexes loaded into relcache, do
3385 3386
	 * so now.	These are critical because the catcache and/or opclass cache
	 * depend on them for fetches done during relcache load.  Thus, we have an
B
Bruce Momjian 已提交
3387 3388 3389 3390 3391 3392
	 * infinite-recursion problem.	We can break the recursion by doing
	 * heapscans instead of indexscans at certain key spots. To avoid hobbling
	 * performance, we only want to do that until we have the critical indexes
	 * loaded into relcache.  Thus, the flag criticalRelcachesBuilt is used to
	 * decide whether to do heapscan or indexscan at the key spots, and we set
	 * it true after we've loaded the critical indexes.
3393
	 *
B
Bruce Momjian 已提交
3394 3395 3396 3397 3398 3399
	 * The critical indexes are marked as "nailed in cache", partly to make it
	 * easy for load_relcache_init_file to count them, but mainly because we
	 * cannot flush and rebuild them once we've set criticalRelcachesBuilt to
	 * true.  (NOTE: perhaps it would be possible to reload them by
	 * temporarily setting criticalRelcachesBuilt to false again.  For now,
	 * though, we just nail 'em in.)
3400 3401 3402 3403
	 *
	 * RewriteRelRulenameIndexId and TriggerRelidNameIndexId are not critical
	 * in the same way as the others, because the critical catalogs don't
	 * (currently) have any rules or triggers, and so these indexes can be
B
Bruce Momjian 已提交
3404
	 * rebuilt without inducing recursion.	However they are used during
3405 3406
	 * relcache load when a rel does have rules or triggers, so we choose to
	 * nail them for performance reasons.
3407
	 */
B
Bruce Momjian 已提交
3408
	if (!criticalRelcachesBuilt)
3409
	{
3410 3411 3412 3413 3414 3415 3416 3417 3418 3419 3420 3421
		load_critical_index(ClassOidIndexId,
							RelationRelationId);
		load_critical_index(AttributeRelidNumIndexId,
							AttributeRelationId);
		load_critical_index(IndexRelidIndexId,
							IndexRelationId);
		load_critical_index(AccessMethodStrategyIndexId,
							AccessMethodOperatorRelationId);
		load_critical_index(OpclassOidIndexId,
							OperatorClassRelationId);
		load_critical_index(AccessMethodProcedureIndexId,
							AccessMethodProcedureRelationId);
3422 3423
		load_critical_index(OperatorOidIndexId,
							OperatorRelationId);
3424 3425 3426 3427 3428
		load_critical_index(RewriteRelRulenameIndexId,
							RewriteRelationId);
		load_critical_index(TriggerRelidNameIndexId,
							TriggerRelationId);

3429
#define NUM_CRITICAL_LOCAL_INDEXES	9	/* fix if you change list above */
3430 3431 3432 3433

		criticalRelcachesBuilt = true;
	}

3434 3435 3436 3437 3438 3439 3440 3441 3442 3443 3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 3461
	/*
	 * Process critical shared indexes too.
	 *
	 * DatabaseNameIndexId isn't critical for relcache loading, but rather for
	 * initial lookup of MyDatabaseId, without which we'll never find any
	 * non-shared catalogs at all.	Autovacuum calls InitPostgres with a
	 * database OID, so it instead depends on DatabaseOidIndexId.  We also
	 * need to nail up some indexes on pg_authid and pg_auth_members for use
	 * during client authentication.
	 */
	if (!criticalSharedRelcachesBuilt)
	{
		load_critical_index(DatabaseNameIndexId,
							DatabaseRelationId);
		load_critical_index(DatabaseOidIndexId,
							DatabaseRelationId);
		load_critical_index(AuthIdRolnameIndexId,
							AuthIdRelationId);
		load_critical_index(AuthIdOidIndexId,
							AuthIdRelationId);
		load_critical_index(AuthMemMemRoleIndexId,
							AuthMemRelationId);

#define NUM_CRITICAL_SHARED_INDEXES 5	/* fix if you change list above */

		criticalSharedRelcachesBuilt = true;
	}

3462
	/*
B
Bruce Momjian 已提交
3463 3464 3465 3466 3467 3468
	 * Now, scan all the relcache entries and update anything that might be
	 * wrong in the results from formrdesc or the relcache cache file. If we
	 * faked up relcache entries using formrdesc, then read the real pg_class
	 * rows and replace the fake entries with them. Also, if any of the
	 * relcache entries have rules or triggers, load that info the hard way
	 * since it isn't recorded in the cache file.
3469
	 *
3470 3471
	 * Whenever we access the catalogs to read data, there is a possibility
	 * of a shared-inval cache flush causing relcache entries to be removed.
3472 3473 3474 3475 3476
	 * Since hash_seq_search only guarantees to still work after the *current*
	 * entry is removed, it's unsafe to continue the hashtable scan afterward.
	 * We handle this by restarting the scan from scratch after each access.
	 * This is theoretically O(N^2), but the number of entries that actually
	 * need to be fixed is small enough that it doesn't matter.
3477
	 */
3478
	hash_seq_init(&status, RelationIdCache);
3479

3480
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3481
	{
3482
		Relation	relation = idhentry->reldesc;
3483 3484 3485 3486 3487 3488
		bool		restart = false;

		/*
		 * Make sure *this* entry doesn't get flushed while we work with it.
		 */
		RelationIncrementReferenceCount(relation);
3489

3490
		/*
3491
		 * If it's a faked-up entry, read the real pg_class tuple.
3492
		 */
3493
		if (relation->rd_rel->relowner == InvalidOid)
3494 3495 3496
		{
			HeapTuple	htup;
			Form_pg_class relp;
B
Bruce Momjian 已提交
3497

3498
			htup = SearchSysCache(RELOID,
3499
							   ObjectIdGetDatum(RelationGetRelid(relation)),
3500 3501
								  0, 0, 0);
			if (!HeapTupleIsValid(htup))
3502 3503
				elog(FATAL, "cache lookup failed for relation %u",
					 RelationGetRelid(relation));
3504
			relp = (Form_pg_class) GETSTRUCT(htup);
B
Bruce Momjian 已提交
3505

3506 3507 3508 3509 3510
			/*
			 * Copy tuple to relation->rd_rel. (See notes in
			 * AllocateRelationDesc())
			 */
			memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
3511

3512 3513 3514 3515 3516
			/* Update rd_options while we have the tuple */
			if (relation->rd_options)
				pfree(relation->rd_options);
			RelationParseRelOptions(relation, htup);

3517
			/*
3518
			 * Check the values in rd_att were set up correctly.  (We cannot
3519 3520 3521
			 * just copy them over now: formrdesc must have set up the
			 * rd_att data correctly to start with, because it may already
			 * have been copied into one or more catcache entries.)
3522
			 */
3523 3524 3525
			Assert(relation->rd_att->tdtypeid == relp->reltype);
			Assert(relation->rd_att->tdtypmod == -1);
			Assert(relation->rd_att->tdhasoid == relp->relhasoids);
3526

3527
			ReleaseSysCache(htup);
3528 3529 3530 3531 3532 3533 3534

			/* relowner had better be OK now, else we'll loop forever */
			if (relation->rd_rel->relowner == InvalidOid)
				elog(ERROR, "invalid relowner in pg_class entry for \"%s\"",
					 RelationGetRelationName(relation));

			restart = true;
3535 3536 3537 3538
		}

		/*
		 * Fix data that isn't saved in relcache cache file.
3539 3540 3541 3542 3543
		 *
		 * relhasrules or relhastriggers could possibly be wrong or out of
		 * date.  If we don't actually find any rules or triggers, clear the
		 * local copy of the flag so that we don't get into an infinite loop
		 * here.  We don't make any attempt to fix the pg_class entry, though.
3544 3545
		 */
		if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
3546
		{
3547
			RelationBuildRuleLock(relation);
3548 3549 3550 3551
			if (relation->rd_rules == NULL)
				relation->rd_rel->relhasrules = false;
			restart = true;
		}
3552
		if (relation->rd_rel->reltriggers > 0 && relation->trigdesc == NULL)
3553
		{
3554
			RelationBuildTriggers(relation);
3555 3556 3557 3558 3559 3560 3561 3562 3563 3564 3565 3566 3567 3568
			if (relation->trigdesc == NULL)
				relation->rd_rel->reltriggers = 0;
			restart = true;
		}

		/* Release hold on the relation */
		RelationDecrementReferenceCount(relation);

		/* Now, restart the hashtable scan if needed */
		if (restart)
		{
			hash_seq_term(&status);
			hash_seq_init(&status, RelationIdCache);
		}
3569
	}
3570

3571
	/*
3572 3573
	 * Lastly, write out new relcache cache files if needed.  We don't bother
	 * to distinguish cases where only one of the two needs an update.
3574
	 */
3575 3576 3577
	if (needNewCacheFile)
	{
		/*
B
Bruce Momjian 已提交
3578 3579 3580
		 * Force all the catcaches to finish initializing and thereby open the
		 * catalogs and indexes they use.  This will preload the relcache with
		 * entries for all the most important system catalogs and indexes, so
3581
		 * that the init files will be most useful for future backends.
3582 3583 3584
		 */
		InitCatalogCachePhase2();

3585 3586 3587 3588 3589 3590
		/* reset initFileRelationIds list; we'll fill it during write */
		initFileRelationIds = NIL;

		/* now write the files */
		write_relcache_init_file(true);
		write_relcache_init_file(false);
3591 3592 3593
	}
}

3594 3595 3596 3597 3598 3599 3600 3601 3602 3603 3604 3605 3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616 3617 3618 3619 3620 3621
/*
 * Load one critical system index into the relcache
 *
 * indexoid is the OID of the target index, heapoid is the OID of the catalog
 * it belongs to.
 */
static void
load_critical_index(Oid indexoid, Oid heapoid)
{
	Relation	ird;

	/*
	 * We must lock the underlying catalog before locking the index to avoid
	 * deadlock, since RelationBuildDesc might well need to read the catalog,
	 * and if anyone else is exclusive-locking this catalog and index they'll
	 * be doing it in that order.
	 */
	LockRelationOid(heapoid, AccessShareLock);
	LockRelationOid(indexoid, AccessShareLock);
	ird = RelationBuildDesc(indexoid, true);
	if (ird == NULL)
		elog(PANIC, "could not open critical system index %u", indexoid);
	ird->rd_isnailed = true;
	ird->rd_refcnt = 1;
	UnlockRelationOid(indexoid, AccessShareLock);
	UnlockRelationOid(heapoid, AccessShareLock);
}

3622
/*
3623
 * GetPgClassDescriptor -- get a predefined tuple descriptor for pg_class
3624 3625 3626
 * GetPgIndexDescriptor -- get a predefined tuple descriptor for pg_index
 *
 * We need this kluge because we have to be able to access non-fixed-width
3627 3628 3629 3630 3631 3632
 * fields of pg_class and pg_index before we have the standard catalog caches
 * available.  We use predefined data that's set up in just the same way as
 * the bootstrapped reldescs used by formrdesc().  The resulting tupdesc is
 * not 100% kosher: it does not have the correct rowtype OID in tdtypeid, nor
 * does it have a TupleConstr field.  But it's good enough for the purpose of
 * extracting fields.
3633 3634
 */
static TupleDesc
3635 3636
BuildHardcodedDescriptor(int natts, const FormData_pg_attribute *attrs,
						 bool hasoids)
3637
{
3638
	TupleDesc	result;
3639 3640 3641 3642 3643
	MemoryContext oldcxt;
	int			i;

	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

3644
	result = CreateTemplateTupleDesc(natts, hasoids);
B
Bruce Momjian 已提交
3645
	result->tdtypeid = RECORDOID;		/* not right, but we don't care */
3646
	result->tdtypmod = -1;
3647

3648
	for (i = 0; i < natts; i++)
3649
	{
3650
		memcpy(result->attrs[i], &attrs[i], ATTRIBUTE_FIXED_PART_SIZE);
3651
		/* make sure attcacheoff is valid */
3652
		result->attrs[i]->attcacheoff = -1;
3653 3654 3655
	}

	/* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
3656
	result->attrs[0]->attcacheoff = 0;
3657 3658 3659 3660 3661

	/* Note: we don't bother to set up a TupleConstr entry */

	MemoryContextSwitchTo(oldcxt);

3662 3663 3664 3665 3666 3667 3668 3669 3670 3671 3672 3673 3674 3675 3676 3677 3678 3679 3680 3681 3682 3683 3684 3685 3686 3687 3688 3689
	return result;
}

static TupleDesc
GetPgClassDescriptor(void)
{
	static TupleDesc pgclassdesc = NULL;

	/* Already done? */
	if (pgclassdesc == NULL)
		pgclassdesc = BuildHardcodedDescriptor(Natts_pg_class,
											   Desc_pg_class,
											   true);

	return pgclassdesc;
}

static TupleDesc
GetPgIndexDescriptor(void)
{
	static TupleDesc pgindexdesc = NULL;

	/* Already done? */
	if (pgindexdesc == NULL)
		pgindexdesc = BuildHardcodedDescriptor(Natts_pg_index,
											   Desc_pg_index,
											   false);

3690 3691 3692
	return pgindexdesc;
}

3693 3694 3695
/*
 * Load any default attribute value definitions for the relation.
 */
3696
static void
3697
AttrDefaultFetch(Relation relation)
3698
{
3699 3700 3701
	AttrDefault *attrdef = relation->rd_att->constr->defval;
	int			ndef = relation->rd_att->constr->num_defval;
	Relation	adrel;
3702 3703
	SysScanDesc adscan;
	ScanKeyData skey;
H
Hiroshi Inoue 已提交
3704
	HeapTuple	htup;
3705
	Datum		val;
3706 3707 3708
	bool		isnull;
	int			found;
	int			i;
3709

3710 3711 3712 3713
	ScanKeyInit(&skey,
				Anum_pg_attrdef_adrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
3714

3715 3716 3717
	adrel = heap_open(AttrDefaultRelationId, AccessShareLock);
	adscan = systable_beginscan(adrel, AttrDefaultIndexId, true,
								SnapshotNow, 1, &skey);
3718
	found = 0;
3719

3720
	while (HeapTupleIsValid(htup = systable_getnext(adscan)))
3721
	{
3722
		Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
3723

3724 3725 3726 3727
		for (i = 0; i < ndef; i++)
		{
			if (adform->adnum != attrdef[i].adnum)
				continue;
3728
			if (attrdef[i].adbin != NULL)
3729
				elog(WARNING, "multiple attrdef records found for attr %s of rel %s",
B
Bruce Momjian 已提交
3730
				NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
3731
					 RelationGetRelationName(relation));
3732 3733
			else
				found++;
3734

3735 3736 3737
			val = fastgetattr(htup,
							  Anum_pg_attrdef_adbin,
							  adrel->rd_att, &isnull);
3738
			if (isnull)
3739
				elog(WARNING, "null adbin for attr %s of rel %s",
B
Bruce Momjian 已提交
3740
				NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
3741
					 RelationGetRelationName(relation));
3742 3743
			else
				attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext,
3744
												   TextDatumGetCString(val));
3745 3746
			break;
		}
3747

3748
		if (i >= ndef)
3749 3750
			elog(WARNING, "unexpected attrdef record found for attr %d of rel %s",
				 adform->adnum, RelationGetRelationName(relation));
3751 3752
	}

3753
	systable_endscan(adscan);
3754
	heap_close(adrel, AccessShareLock);
3755 3756

	if (found != ndef)
3757
		elog(WARNING, "%d attrdef record(s) missing for rel %s",
3758
			 ndef - found, RelationGetRelationName(relation));
3759 3760
}

3761 3762 3763
/*
 * Load any check constraints for the relation.
 */
3764
static void
3765
CheckConstraintFetch(Relation relation)
3766
{
3767 3768
	ConstrCheck *check = relation->rd_att->constr->check;
	int			ncheck = relation->rd_att->constr->num_check;
3769
	Relation	conrel;
3770 3771
	SysScanDesc conscan;
	ScanKeyData skey[1];
H
Hiroshi Inoue 已提交
3772
	HeapTuple	htup;
3773
	Datum		val;
3774
	bool		isnull;
3775
	int			found = 0;
3776

3777 3778 3779 3780
	ScanKeyInit(&skey[0],
				Anum_pg_constraint_conrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
3781

3782 3783 3784
	conrel = heap_open(ConstraintRelationId, AccessShareLock);
	conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true,
								 SnapshotNow, 1, skey);
3785

3786
	while (HeapTupleIsValid(htup = systable_getnext(conscan)))
3787
	{
3788 3789 3790 3791 3792 3793
		Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);

		/* We want check constraints only */
		if (conform->contype != CONSTRAINT_CHECK)
			continue;

3794
		if (found >= ncheck)
3795 3796 3797
			elog(ERROR,
			     "pg_class reports %d constraint record(s) for rel %s, but found extra in pg_constraint",
			     ncheck, RelationGetRelationName(relation));
3798

3799
		check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
B
Bruce Momjian 已提交
3800
												  NameStr(conform->conname));
3801 3802

		/* Grab and test conbin is actually set */
3803
		val = fastgetattr(htup,
3804 3805
						  Anum_pg_constraint_conbin,
						  conrel->rd_att, &isnull);
3806
		if (isnull)
3807
			elog(ERROR, "null conbin for rel %s",
3808
				 RelationGetRelationName(relation));
3809

3810
		check[found].ccbin = MemoryContextStrdup(CacheMemoryContext,
3811
												 TextDatumGetCString(val));
3812 3813 3814
		found++;
	}

3815
	systable_endscan(conscan);
3816
	heap_close(conrel, AccessShareLock);
3817 3818

	if (found != ncheck)
3819 3820 3821
		elog(ERROR,
		     "found %d in pg_constraint, but pg_class reports %d constraint record(s) for rel %s",
		     found, ncheck, RelationGetRelationName(relation));
3822 3823
}

3824 3825 3826 3827 3828 3829 3830 3831 3832 3833 3834 3835 3836 3837 3838

/*
 * RelationGetPartitioningKey -- get GpPolicy struct for distributed relation
 *
 * Returns a copy of the relation's GpPolicy object, palloc'd in
 * the caller's context.  Caller should pfree() it.  If NULL is
 * returned, relation should be accessed locally.
 */
GpPolicy*
RelationGetPartitioningKey(Relation relation)
{
    return GpPolicyCopy(CurrentMemoryContext, relation->rd_cdbpolicy);
}                                       /* RelationGetPartitioningKey */


3839 3840 3841 3842 3843 3844
/*
 * RelationGetIndexList -- get a list of OIDs of indexes on this relation
 *
 * The index list is created only if someone requests it.  We scan pg_index
 * to find relevant indexes, and add the list to the relcache entry so that
 * we won't have to compute it again.  Note that shared cache inval of a
3845
 * relcache entry will delete the old list and set rd_indexvalid to 0,
3846 3847 3848
 * so that we must recompute the index list on next request.  This handles
 * creation or deletion of an index.
 *
3849 3850 3851 3852 3853 3854
 * The returned list is guaranteed to be sorted in order by OID.  This is
 * needed by the executor, since for index types that we obtain exclusive
 * locks on when updating the index, all backends must lock the indexes in
 * the same order or we will get deadlocks (see ExecOpenIndices()).  Any
 * consistent ordering would do, but ordering by OID is easy.
 *
3855 3856
 * Since shared cache inval causes the relcache's copy of the list to go away,
 * we return a copy of the list palloc'd in the caller's context.  The caller
B
Bruce Momjian 已提交
3857
 * may list_free() the returned list after scanning it. This is necessary
3858 3859
 * since the caller will typically be doing syscache lookups on the relevant
 * indexes, and syscache lookup could cause SI messages to be processed!
3860 3861 3862 3863 3864
 *
 * We also update rd_oidindex, which this module treats as effectively part
 * of the index list.  rd_oidindex is valid when rd_indexvalid isn't zero;
 * it is the pg_class OID of a unique index on OID when the relation has one,
 * and InvalidOid if there is no such index.
3865 3866 3867 3868 3869
 */
List *
RelationGetIndexList(Relation relation)
{
	Relation	indrel;
3870 3871
	SysScanDesc indscan;
	ScanKeyData skey;
3872
	HeapTuple	htup;
3873
	List	   *result;
3874
	Oid			oidIndex;
3875 3876 3877
	MemoryContext oldcxt;

	/* Quick exit if we already computed the list. */
3878
	if (relation->rd_indexvalid != 0)
3879
		return list_copy(relation->rd_indexlist);
3880 3881

	/*
B
Bruce Momjian 已提交
3882 3883 3884 3885
	 * We build the list we intend to return (in the caller's context) while
	 * doing the scan.	After successfully completing the scan, we copy that
	 * list into the relcache entry.  This avoids cache-context memory leakage
	 * if we get some sort of error partway through.
3886 3887
	 */
	result = NIL;
3888
	oidIndex = InvalidOid;
B
Bruce Momjian 已提交
3889

3890
	/* Prepare to scan pg_index for entries having indrelid = this rel. */
3891 3892 3893 3894
	ScanKeyInit(&skey,
				Anum_pg_index_indrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
3895

3896
	indrel = heap_open(IndexRelationId, AccessShareLock);
3897 3898
	indscan = systable_beginscan(indrel, IndexIndrelidIndexId, true,
								 SnapshotNow, 1, &skey);
3899

3900
	while (HeapTupleIsValid(htup = systable_getnext(indscan)))
3901 3902
	{
		Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
3903

3904
		/* Add index's OID to result list in the proper order */
3905
		result = insert_ordered_oid(result, index->indexrelid);
3906 3907

		/* Check to see if it is a unique, non-partial btree index on OID */
3908 3909
		if (IndexIsValid(index) &&
			index->indnatts == 1 &&
3910 3911 3912 3913 3914
			index->indisunique &&
			index->indkey.values[0] == ObjectIdAttributeNumber &&
			index->indclass.values[0] == OID_BTREE_OPS_OID &&
			heap_attisnull(htup, Anum_pg_index_indpred))
			oidIndex = index->indexrelid;
3915 3916
	}

3917
	systable_endscan(indscan);
3918 3919
	heap_close(indrel, AccessShareLock);

3920
	/* Now save a copy of the completed list in the relcache entry. */
3921
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3922
	relation->rd_indexlist = list_copy(result);
3923
	relation->rd_oidindex = oidIndex;
3924
	relation->rd_indexvalid = 1;
3925 3926 3927 3928 3929
	MemoryContextSwitchTo(oldcxt);

	return result;
}

3930 3931 3932 3933 3934 3935 3936 3937 3938 3939 3940 3941
/*
 * insert_ordered_oid
 *		Insert a new Oid into a sorted list of Oids, preserving ordering
 *
 * Building the ordered list this way is O(N^2), but with a pretty small
 * constant, so for the number of entries we expect it will probably be
 * faster than trying to apply qsort().  Most tables don't have very many
 * indexes...
 */
static List *
insert_ordered_oid(List *list, Oid datum)
{
B
Bruce Momjian 已提交
3942
	ListCell   *prev;
3943 3944

	/* Does the datum belong at the front? */
3945 3946
	if (list == NIL || datum < linitial_oid(list))
		return lcons_oid(datum, list);
3947
	/* No, so find the entry it belongs after */
3948
	prev = list_head(list);
3949 3950
	for (;;)
	{
B
Bruce Momjian 已提交
3951
		ListCell   *curr = lnext(prev);
3952

3953
		if (curr == NULL || datum < lfirst_oid(curr))
B
Bruce Momjian 已提交
3954
			break;				/* it belongs after 'prev', before 'curr' */
3955 3956

		prev = curr;
3957
	}
3958 3959
	/* Insert datum into list after 'prev' */
	lappend_cell_oid(list, prev, datum);
3960 3961 3962
	return list;
}

3963 3964 3965 3966
/*
 * RelationSetIndexList -- externally force the index list contents
 *
 * This is used to temporarily override what we think the set of valid
3967 3968
 * indexes is (including the presence or absence of an OID index).
 * The forcing will be valid only until transaction commit or abort.
3969 3970 3971 3972 3973 3974
 *
 * This should only be applied to nailed relations, because in a non-nailed
 * relation the hacked index list could be lost at any time due to SI
 * messages.  In practice it is only used on pg_class (see REINDEX).
 *
 * It is up to the caller to make sure the given list is correctly ordered.
3975 3976 3977 3978 3979 3980 3981
 *
 * We deliberately do not change rd_indexattr here: even when operating
 * with a temporary partial index list, HOT-update decisions must be made
 * correctly with respect to the full index set.  It is up to the caller
 * to ensure that a correct rd_indexattr set has been cached before first
 * calling RelationSetIndexList; else a subsequent inquiry might cause a
 * wrong rd_indexattr set to get computed and cached.
3982 3983
 */
void
3984
RelationSetIndexList(Relation relation, List *indexIds, Oid oidIndex)
3985 3986 3987
{
	MemoryContext oldcxt;

3988
	Assert(relation->rd_isnailed);
3989 3990
	/* Copy the list into the cache context (could fail for lack of mem) */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3991
	indexIds = list_copy(indexIds);
3992 3993
	MemoryContextSwitchTo(oldcxt);
	/* Okay to replace old list */
3994
	list_free(relation->rd_indexlist);
3995
	relation->rd_indexlist = indexIds;
3996
	relation->rd_oidindex = oidIndex;
B
Bruce Momjian 已提交
3997
	relation->rd_indexvalid = 2;	/* mark list as forced */
3998
	/* must flag that we have a forced index list */
3999
	need_eoxact_work = true;
4000 4001
}

4002 4003 4004 4005 4006 4007 4008 4009 4010 4011 4012
/*
 * RelationGetOidIndex -- get the pg_class OID of the relation's OID index
 *
 * Returns InvalidOid if there is no such index.
 */
Oid
RelationGetOidIndex(Relation relation)
{
	List	   *ilist;

	/*
B
Bruce Momjian 已提交
4013 4014 4015
	 * If relation doesn't have OIDs at all, caller is probably confused. (We
	 * could just silently return InvalidOid, but it seems better to throw an
	 * assertion.)
4016 4017 4018 4019 4020 4021 4022 4023 4024 4025 4026 4027 4028 4029
	 */
	Assert(relation->rd_rel->relhasoids);

	if (relation->rd_indexvalid == 0)
	{
		/* RelationGetIndexList does the heavy lifting. */
		ilist = RelationGetIndexList(relation);
		list_free(ilist);
		Assert(relation->rd_indexvalid != 0);
	}

	return relation->rd_oidindex;
}

4030 4031 4032 4033 4034 4035 4036 4037 4038 4039 4040 4041 4042 4043 4044 4045 4046 4047 4048 4049 4050 4051 4052 4053 4054 4055 4056 4057
/*
 * RelationGetIndexExpressions -- get the index expressions for an index
 *
 * We cache the result of transforming pg_index.indexprs into a node tree.
 * If the rel is not an index or has no expressional columns, we return NIL.
 * Otherwise, the returned tree is copied into the caller's memory context.
 * (We don't want to return a pointer to the relcache copy, since it could
 * disappear due to relcache invalidation.)
 */
List *
RelationGetIndexExpressions(Relation relation)
{
	List	   *result;
	Datum		exprsDatum;
	bool		isnull;
	char	   *exprsString;
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_indexprs)
		return (List *) copyObject(relation->rd_indexprs);

	/* Quick exit if there is nothing to do. */
	if (relation->rd_indextuple == NULL ||
		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs))
		return NIL;

	/*
B
Bruce Momjian 已提交
4058 4059 4060
	 * We build the tree we intend to return in the caller's context. After
	 * successfully completing the work, we copy it into the relcache entry.
	 * This avoids problems if we get some sort of error partway through.
4061 4062 4063 4064 4065
	 */
	exprsDatum = heap_getattr(relation->rd_indextuple,
							  Anum_pg_index_indexprs,
							  GetPgIndexDescriptor(),
							  &isnull);
4066
	Assert(!isnull);
4067
	exprsString = TextDatumGetCString(exprsDatum);
4068 4069 4070 4071
	result = (List *) stringToNode(exprsString);
	pfree(exprsString);

	/*
4072 4073 4074 4075
	 * Run the expressions through eval_const_expressions. This is not just an
	 * optimization, but is necessary, because the planner will be comparing
	 * them to similarly-processed qual clauses, and may fail to detect valid
	 * matches without this.  We don't bother with canonicalize_qual, however.
4076
	 */
4077
	result = (List *) eval_const_expressions(NULL, (Node *) result);
4078

4079 4080 4081 4082 4083 4084
	/*
	 * Also mark any coercion format fields as "don't care", so that the
	 * planner can match to both explicit and implicit coercions.
	 */
	set_coercionform_dontcare((Node *) result);

4085 4086 4087 4088
	/* May as well fix opfuncids too */
	fix_opfuncids((Node *) result);

	/* Now save a copy of the completed tree in the relcache entry. */
4089
	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
4090 4091 4092 4093 4094 4095 4096 4097 4098
	relation->rd_indexprs = (List *) copyObject(result);
	MemoryContextSwitchTo(oldcxt);

	return result;
}

/*
 * RelationGetIndexPredicate -- get the index predicate for an index
 *
4099 4100
 * We cache the result of transforming pg_index.indpred into an implicit-AND
 * node tree (suitable for ExecQual).
4101 4102 4103 4104 4105 4106 4107 4108 4109 4110 4111 4112 4113 4114 4115 4116 4117 4118 4119 4120 4121 4122 4123 4124
 * If the rel is not an index or has no predicate, we return NIL.
 * Otherwise, the returned tree is copied into the caller's memory context.
 * (We don't want to return a pointer to the relcache copy, since it could
 * disappear due to relcache invalidation.)
 */
List *
RelationGetIndexPredicate(Relation relation)
{
	List	   *result;
	Datum		predDatum;
	bool		isnull;
	char	   *predString;
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_indpred)
		return (List *) copyObject(relation->rd_indpred);

	/* Quick exit if there is nothing to do. */
	if (relation->rd_indextuple == NULL ||
		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indpred))
		return NIL;

	/*
B
Bruce Momjian 已提交
4125 4126 4127
	 * We build the tree we intend to return in the caller's context. After
	 * successfully completing the work, we copy it into the relcache entry.
	 * This avoids problems if we get some sort of error partway through.
4128 4129 4130 4131 4132
	 */
	predDatum = heap_getattr(relation->rd_indextuple,
							 Anum_pg_index_indpred,
							 GetPgIndexDescriptor(),
							 &isnull);
4133
	Assert(!isnull);
4134
	predString = TextDatumGetCString(predDatum);
4135 4136 4137 4138
	result = (List *) stringToNode(predString);
	pfree(predString);

	/*
4139 4140 4141 4142 4143
	 * Run the expression through const-simplification and canonicalization.
	 * This is not just an optimization, but is necessary, because the planner
	 * will be comparing it to similarly-processed qual clauses, and may fail
	 * to detect valid matches without this.  This must match the processing
	 * done to qual clauses in preprocess_expression()!  (We can skip the
B
Bruce Momjian 已提交
4144 4145
	 * stuff involving subqueries, however, since we don't allow any in index
	 * predicates.)
4146
	 */
4147
	result = (List *) eval_const_expressions(NULL, (Node *) result);
4148

4149 4150
	result = (List *) canonicalize_qual((Expr *) result);

4151 4152 4153 4154 4155 4156
	/*
	 * Also mark any coercion format fields as "don't care", so that the
	 * planner can match to both explicit and implicit coercions.
	 */
	set_coercionform_dontcare((Node *) result);

4157 4158 4159
	/* Also convert to implicit-AND format */
	result = make_ands_implicit((Expr *) result);

4160 4161 4162 4163
	/* May as well fix opfuncids too */
	fix_opfuncids((Node *) result);

	/* Now save a copy of the completed tree in the relcache entry. */
4164
	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
4165 4166 4167 4168 4169 4170
	relation->rd_indpred = (List *) copyObject(result);
	MemoryContextSwitchTo(oldcxt);

	return result;
}

4171 4172 4173 4174 4175 4176 4177 4178 4179 4180 4181
/*
 * RelationGetIndexAttrBitmap -- get a bitmap of index attribute numbers
 *
 * The result has a bit set for each attribute used anywhere in the index
 * definitions of all the indexes on this relation.  (This includes not only
 * simple index keys, but attributes used in expressions and partial-index
 * predicates.)
 *
 * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
 * we can include system attributes (e.g., OID) in the bitmap representation.
 *
4182 4183 4184 4185
 * Caller had better hold at least RowExclusiveLock on the target relation
 * to ensure that it has a stable set of indexes.  This also makes it safe
 * (deadlock-free) for us to take locks on the relation's indexes.
 *
4186 4187 4188 4189 4190 4191
 * The returned result is palloc'd in the caller's memory context and should
 * be bms_free'd when not needed anymore.
 */
Bitmapset *
RelationGetIndexAttrBitmap(Relation relation)
{
B
Bruce Momjian 已提交
4192 4193 4194
	Bitmapset  *indexattrs;
	List	   *indexoidlist;
	ListCell   *l;
4195 4196 4197 4198 4199 4200 4201 4202 4203 4204 4205 4206 4207 4208 4209 4210 4211 4212 4213 4214 4215
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_indexattr != NULL)
		return bms_copy(relation->rd_indexattr);

	/* Fast path if definitely no indexes */
	if (!RelationGetForm(relation)->relhasindex)
		return NULL;

	/*
	 * Get cached list of index OIDs
	 */
	indexoidlist = RelationGetIndexList(relation);

	/* Fall out if no indexes (but relhasindex was set) */
	if (indexoidlist == NIL)
		return NULL;

	/*
	 * For each index, add referenced attributes to indexattrs.
4216 4217 4218 4219 4220
	 *
	 * Note: we consider all indexes returned by RelationGetIndexList, even if
	 * they are not indisready or indisvalid.  This is important because an
	 * index for which CREATE INDEX CONCURRENTLY has just started must be
	 * included in HOT-safety decisions (see README.HOT).
4221 4222 4223 4224 4225 4226 4227
	 */
	indexattrs = NULL;
	foreach(l, indexoidlist)
	{
		Oid			indexOid = lfirst_oid(l);
		Relation	indexDesc;
		IndexInfo  *indexInfo;
B
Bruce Momjian 已提交
4228
		int			i;
4229 4230 4231 4232 4233 4234 4235 4236 4237

		indexDesc = index_open(indexOid, AccessShareLock);

		/* Extract index key information from the index's pg_index row */
		indexInfo = BuildIndexInfo(indexDesc);

		/* Collect simple attribute references */
		for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
		{
B
Bruce Momjian 已提交
4238
			int			attrnum = indexInfo->ii_KeyAttrNumbers[i];
4239 4240 4241

			if (attrnum != 0)
				indexattrs = bms_add_member(indexattrs,
B
Bruce Momjian 已提交
4242
							   attrnum - FirstLowInvalidHeapAttributeNumber);
4243 4244 4245 4246 4247 4248 4249 4250 4251 4252 4253 4254 4255 4256 4257 4258 4259 4260 4261 4262 4263 4264
		}

		/* Collect all attributes used in expressions, too */
		pull_varattnos((Node *) indexInfo->ii_Expressions, &indexattrs);

		/* Collect all attributes in the index predicate, too */
		pull_varattnos((Node *) indexInfo->ii_Predicate, &indexattrs);

		index_close(indexDesc, AccessShareLock);
	}

	list_free(indexoidlist);

	/* Now save a copy of the bitmap in the relcache entry. */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_indexattr = bms_copy(indexattrs);
	MemoryContextSwitchTo(oldcxt);

	/* We return our original working copy for caller to play with */
	return indexattrs;
}

4265

4266
/*
4267
 *	load_relcache_init_file, write_relcache_init_file
4268
 *
4269 4270 4271
 *		In late 1992, we started regularly having databases with more than
 *		a thousand classes in them.  With this number of classes, it became
 *		critical to do indexed lookups on the system catalogs.
4272
 *
4273 4274 4275 4276
 *		Bootstrapping these lookups is very hard.  We want to be able to
 *		use an index on pg_attribute, for example, but in order to do so,
 *		we must have read pg_attribute for the attributes in the index,
 *		which implies that we need to use the index.
4277
 *
4278
 *		In order to get around the problem, we do the following:
4279
 *
4280
 *		   +  When the database system is initialized (at initdb time), we
4281
 *			  don't use indexes.  We do sequential scans.
4282
 *
4283 4284 4285
 *		   +  When the backend is started up in normal mode, we load an image
 *			  of the appropriate relation descriptors, in internal format,
 *			  from an initialization file in the data/base/... directory.
4286
 *
4287
 *		   +  If the initialization file isn't there, then we create the
4288
 *			  relation descriptors using sequential scans and write 'em to
4289
 *			  the initialization file for use by subsequent backends.
4290
 *
4291
 *		We could dispense with the initialization files and just build the
4292
 *		critical reldescs the hard way on every backend startup, but that
4293 4294 4295 4296 4297 4298
 *		slows down backend startup noticeably.
 *
 *		We can in fact go further, and save more relcache entries than
 *		just the ones that are absolutely critical; this allows us to speed
 *		up backend startup by not having to build such entries the hard way.
 *		Presently, all the catalog and index entries that are referred to
4299
 *		by catcaches are stored in the initialization files.
4300
 *
T
Tom Lane 已提交
4301 4302
 *		The same mechanism that detects when catcache and relcache entries
 *		need to be invalidated (due to catalog updates) also arranges to
4303 4304
 *		unlink the initialization files when the contents may be out of date.
 *		The files will then be rebuilt during the next backend startup.
4305 4306
 */

4307 4308 4309 4310
/*
 * load_relcache_init_file -- attempt to load cache from the init file
 *
 * If successful, return TRUE and set criticalRelcachesBuilt to true.
4311
 * If not successful, return FALSE.
4312 4313 4314 4315
 *
 * NOTE: we assume we are already switched into CacheMemoryContext.
 */
static bool
4316
load_relcache_init_file(bool shared)
4317
{
4318 4319 4320 4321 4322 4323 4324
	FILE	   *fp;
	char		initfilename[MAXPGPATH];
	Relation   *rels;
	int			relno,
				num_rels,
				max_rels,
				nailed_rels,
4325 4326
				nailed_indexes,
				magic;
4327
	int			i;
4328

4329 4330 4331 4332 4333 4334
	if (shared)
		snprintf(initfilename, sizeof(initfilename), "global/%s",
				 RELCACHE_INIT_FILENAME);
	else
		snprintf(initfilename, sizeof(initfilename), "%s/%s",
				 DatabasePath, RELCACHE_INIT_FILENAME);
4335 4336 4337 4338

	fp = AllocateFile(initfilename, PG_BINARY_R);
	if (fp == NULL)
		return false;
4339

4340
	/*
B
Bruce Momjian 已提交
4341 4342 4343
	 * Read the index relcache entries from the file.  Note we will not enter
	 * any of them into the cache if the read fails partway through; this
	 * helps to guard against broken init files.
4344 4345 4346 4347 4348 4349 4350
	 */
	max_rels = 100;
	rels = (Relation *) palloc(max_rels * sizeof(Relation));
	num_rels = 0;
	nailed_rels = nailed_indexes = 0;
	initFileRelationIds = NIL;

4351 4352 4353 4354 4355 4356
	/* check for correct magic number (compatible version) */
	if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
		goto read_failed;
	if (magic != RELCACHE_INIT_FILEMAGIC)
		goto read_failed;

B
Bruce Momjian 已提交
4357
	for (relno = 0;; relno++)
4358
	{
4359 4360 4361 4362
		Size		len;
		size_t		nread;
		Relation	rel;
		Form_pg_class relform;
4363
		bool		has_not_null;
4364

4365
		/* first read the relation descriptor length */
4366 4367 4368 4369
		if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
		{
			if (nread == 0)
				break;			/* end of file */
4370
			goto read_failed;
4371
		}
4372

4373 4374
		/* safety check for incompatible relcache layout */
		if (len != sizeof(RelationData))
4375
			goto read_failed;
4376

4377 4378 4379 4380 4381 4382
		/* allocate another relcache header */
		if (num_rels >= max_rels)
		{
			max_rels *= 2;
			rels = (Relation *) repalloc(rels, max_rels * sizeof(Relation));
		}
4383

4384
		rel = rels[num_rels++] = (Relation) palloc(len);
4385

4386 4387
		/* then, read the Relation structure */
		if ((nread = fread(rel, 1, len, fp)) != len)
4388
			goto read_failed;
4389 4390

		/* next read the relation tuple form */
4391
		if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
4392
			goto read_failed;
4393 4394

		relform = (Form_pg_class) palloc(len);
4395
		if ((nread = fread(relform, 1, len, fp)) != len)
4396
			goto read_failed;
4397

4398
		rel->rd_rel = relform;
4399 4400

		/* initialize attribute tuple forms */
4401 4402
		rel->rd_att = CreateTemplateTupleDesc(relform->relnatts,
											  relform->relhasoids);
4403 4404
		rel->rd_att->tdrefcount = 1;	/* mark as refcounted */

4405
		rel->rd_att->tdtypeid = relform->reltype;
B
Bruce Momjian 已提交
4406
		rel->rd_att->tdtypmod = -1;		/* unnecessary, but... */
4407 4408

		/* next read all the attribute tuple form data entries */
4409
		has_not_null = false;
4410 4411
		for (i = 0; i < relform->relnatts; i++)
		{
4412
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
4413
				goto read_failed;
4414
			if (len != ATTRIBUTE_FIXED_PART_SIZE)
4415
				goto read_failed;
4416
			if ((nread = fread(rel->rd_att->attrs[i], 1, len, fp)) != len)
4417
				goto read_failed;
4418 4419 4420 4421

			has_not_null |= rel->rd_att->attrs[i]->attnotnull;
		}

B
Bruce Momjian 已提交
4422 4423 4424 4425 4426 4427 4428 4429
		/* next read the access method specific field */
		if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
			goto read_failed;
		if (len > 0)
		{
			rel->rd_options = palloc(len);
			if ((nread = fread(rel->rd_options, 1, len, fp)) != len)
				goto read_failed;
4430
			if (len != VARSIZE(rel->rd_options))
B
Bruce Momjian 已提交
4431
				goto read_failed;		/* sanity check */
B
Bruce Momjian 已提交
4432 4433 4434 4435 4436 4437
		}
		else
		{
			rel->rd_options = NULL;
		}

4438 4439 4440 4441 4442 4443 4444
		/* mark not-null status */
		if (has_not_null)
		{
			TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

			constr->has_not_null = true;
			rel->rd_att->constr = constr;
4445 4446
		}

4447 4448 4449 4450 4451
		/* If it's an index, there's more to do */
		if (rel->rd_rel->relkind == RELKIND_INDEX)
		{
			Form_pg_am	am;
			MemoryContext indexcxt;
4452 4453
			Oid		   *opfamily;
			Oid		   *opcintype;
4454 4455
			Oid		   *operator;
			RegProcedure *support;
4456
			int			nsupport;
4457
			int16	   *indoption;
4458 4459 4460 4461 4462

			/* Count nailed indexes to ensure we have 'em all */
			if (rel->rd_isnailed)
				nailed_indexes++;

4463
			/* next, read the pg_index tuple */
4464 4465
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;
4466

4467 4468
			rel->rd_indextuple = (HeapTuple) palloc(len);
			if ((nread = fread(rel->rd_indextuple, 1, len, fp)) != len)
4469
				goto read_failed;
4470

4471 4472 4473 4474
			/* Fix up internal pointers in the tuple -- see heap_copytuple */
			rel->rd_indextuple->t_data = (HeapTupleHeader) ((char *) rel->rd_indextuple + HEAPTUPLESIZE);
			rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);

4475 4476 4477
			/* next, read the access method tuple form */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;
4478

4479 4480 4481 4482
			am = (Form_pg_am) palloc(len);
			if ((nread = fread(am, 1, len, fp)) != len)
				goto read_failed;
			rel->rd_am = am;
4483

4484 4485 4486 4487 4488 4489
			/*
			 * prepare index info context --- parameters should match
			 * RelationInitIndexAccessInfo
			 */
			indexcxt = AllocSetContextCreate(CacheMemoryContext,
											 RelationGetRelationName(rel),
4490 4491 4492
											 ALLOCSET_SMALL_MINSIZE,
											 ALLOCSET_SMALL_INITSIZE,
											 ALLOCSET_SMALL_MAXSIZE);
4493 4494
			rel->rd_indexcxt = indexcxt;

4495 4496 4497 4498 4499 4500 4501 4502 4503 4504 4505 4506 4507 4508 4509 4510 4511 4512 4513 4514
			/* next, read the vector of opfamily OIDs */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			opfamily = (Oid *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(opfamily, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_opfamily = opfamily;

			/* next, read the vector of opcintype OIDs */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			opcintype = (Oid *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(opcintype, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_opcintype = opcintype;

4515 4516 4517 4518 4519 4520 4521 4522 4523
			/* next, read the vector of operator OIDs */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			operator = (Oid *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(operator, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_operator = operator;
4524

4525
			/* next, read the vector of support procedures */
4526 4527 4528 4529 4530 4531 4532 4533
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;
			support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(support, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_support = support;

4534 4535 4536 4537 4538 4539 4540 4541 4542 4543
			/* finally, read the vector of indoption values */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			indoption = (int16 *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(indoption, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_indoption = indoption;

4544 4545 4546
			/* set up zeroed fmgr-info vectors */
			rel->rd_aminfo = (RelationAmInfo *)
				MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));
4547 4548
			nsupport = relform->relnatts * am->amsupport;
			rel->rd_supportinfo = (FmgrInfo *)
4549
				MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
4550 4551 4552 4553 4554 4555 4556 4557
		}
		else
		{
			/* Count nailed rels to ensure we have 'em all */
			if (rel->rd_isnailed)
				nailed_rels++;

			Assert(rel->rd_index == NULL);
4558
			Assert(rel->rd_indextuple == NULL);
4559 4560
			Assert(rel->rd_am == NULL);
			Assert(rel->rd_indexcxt == NULL);
4561
			Assert(rel->rd_aminfo == NULL);
4562 4563
			Assert(rel->rd_opfamily == NULL);
			Assert(rel->rd_opcintype == NULL);
4564 4565 4566
			Assert(rel->rd_operator == NULL);
			Assert(rel->rd_support == NULL);
			Assert(rel->rd_supportinfo == NULL);
4567
			Assert(rel->rd_indoption == NULL);
4568 4569 4570 4571
		}

		/*
		 * Rules and triggers are not saved (mainly because the internal
B
Bruce Momjian 已提交
4572
		 * format is complex and subject to change).  They must be rebuilt if
4573
		 * needed by RelationCacheInitializePhase3.  This is not expected to
4574 4575
		 * be a big performance hit since few system catalogs have such. Ditto
		 * for index expressions and predicates.
4576 4577 4578 4579
		 */
		rel->rd_rules = NULL;
		rel->rd_rulescxt = NULL;
		rel->trigdesc = NULL;
4580 4581
		rel->rd_indexprs = NIL;
		rel->rd_indpred = NIL;
4582 4583 4584 4585

		/*
		 * Reset transient-state fields in the relcache entry
		 */
4586
		rel->rd_smgr = NULL;
4587 4588
		rel->rd_targblock = InvalidBlockNumber;
		if (rel->rd_isnailed)
4589
			rel->rd_refcnt = 1;
4590
		else
4591
			rel->rd_refcnt = 0;
4592
		rel->rd_indexvalid = 0;
4593
		rel->rd_indexlist = NIL;
4594
		rel->rd_indexattr = NULL;
4595
		rel->rd_oidindex = InvalidOid;
4596
		rel->rd_createSubid = InvalidSubTransactionId;
4597
		rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
4598
		rel->rd_amcache = NULL;
4599
		MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
4600 4601
        rel->rd_cdbpolicy = NULL;
        rel->rd_cdbDefaultStatsWarningIssued = false;
4602

4603
		/*
4604
		 * Recompute lock and physical addressing info.  This is needed in
B
Bruce Momjian 已提交
4605 4606
		 * case the pg_internal.init file was copied from some other database
		 * by CREATE DATABASE.
4607 4608
		 */
		RelationInitLockInfo(rel);
4609
		RelationInitPhysicalAddr(rel);
4610 4611 4612
	}

	/*
B
Bruce Momjian 已提交
4613 4614 4615
	 * We reached the end of the init file without apparent problem. Did we
	 * get the right number of nailed items?  (This is a useful crosscheck in
	 * case the set of critical rels or indexes changes.)
4616
	 */
4617 4618 4619 4620 4621 4622 4623 4624 4625 4626 4627 4628 4629
	if (shared)
	{
		if (nailed_rels != NUM_CRITICAL_SHARED_RELS ||
			nailed_indexes != NUM_CRITICAL_SHARED_INDEXES)
			goto read_failed;
	}
	else
	{
		if (nailed_rels != NUM_CRITICAL_LOCAL_RELS ||
			nailed_indexes != NUM_CRITICAL_LOCAL_INDEXES)
			goto read_failed;
	}

4630 4631 4632 4633 4634 4635 4636 4637 4638 4639

	/*
	 * OK, all appears well.
	 *
	 * Now insert all the new relcache entries into the cache.
	 */
	for (relno = 0; relno < num_rels; relno++)
	{
		RelationCacheInsert(rels[relno]);
		/* also make a list of their OIDs, for RelationIdIsInInitFile */
4640 4641 4642
		if (!shared)
			initFileRelationIds = lcons_oid(RelationGetRelid(rels[relno]),
											initFileRelationIds);
4643
	}
4644

4645 4646 4647
	pfree(rels);
	FreeFile(fp);

4648 4649 4650 4651
	if (shared)
		criticalSharedRelcachesBuilt = true;
	else
		criticalRelcachesBuilt = true;
4652
	return true;
4653

4654
	/*
B
Bruce Momjian 已提交
4655 4656 4657
	 * init file is broken, so do it the hard way.	We don't bother trying to
	 * free the clutter we just allocated; it's not in the relcache so it
	 * won't hurt.
4658
	 */
4659
read_failed:
4660 4661 4662 4663
	pfree(rels);
	FreeFile(fp);

	return false;
4664 4665
}

4666 4667 4668 4669
/*
 * Write out a new initialization file with the current contents
 * of the relcache.
 */
4670
static void
4671
write_relcache_init_file(bool shared)
4672
{
4673
	FILE	   *fp;
4674 4675
	char		tempfilename[MAXPGPATH];
	char		finalfilename[MAXPGPATH];
4676
	int			magic;
4677
	HASH_SEQ_STATUS status;
4678
	RelIdCacheEnt *idhentry;
4679 4680
	MemoryContext oldcxt;
	int			i;
4681 4682

	/*
4683
	 * We must write a temporary file and rename it into place. Otherwise,
B
Bruce Momjian 已提交
4684 4685
	 * another backend starting at about the same time might crash trying to
	 * read the partially-complete file.
4686
	 */
4687 4688 4689 4690 4691 4692 4693 4694 4695 4696 4697 4698 4699 4700
	if (shared)
	{
		snprintf(tempfilename, sizeof(tempfilename), "global/%s.%d",
				 RELCACHE_INIT_FILENAME, MyProcPid);
		snprintf(finalfilename, sizeof(finalfilename), "global/%s",
				 RELCACHE_INIT_FILENAME);
	}
	else
	{
		snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
				 DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
		snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
				 DatabasePath, RELCACHE_INIT_FILENAME);
	}
4701

4702 4703 4704 4705
	unlink(tempfilename);		/* in case it exists w/wrong permissions */

	fp = AllocateFile(tempfilename, PG_BINARY_W);
	if (fp == NULL)
4706 4707 4708 4709 4710
	{
		/*
		 * We used to consider this a fatal error, but we might as well
		 * continue with backend startup ...
		 */
4711 4712
		ereport(WARNING,
				(errcode_for_file_access(),
4713
				 errmsg("could not create relation-cache initialization file \"%s\": %m",
4714
						tempfilename),
B
Bruce Momjian 已提交
4715
			  errdetail("Continuing anyway, but there's something wrong.")));
4716 4717
		return;
	}
4718

4719
	/*
B
Bruce Momjian 已提交
4720
	 * Write a magic number to serve as a file version identifier.	We can
4721 4722 4723 4724 4725 4726
	 * change the magic number whenever the relcache layout changes.
	 */
	magic = RELCACHE_INIT_FILEMAGIC;
	if (fwrite(&magic, 1, sizeof(magic), fp) != sizeof(magic))
		elog(FATAL, "could not write init file");

4727
	/*
4728
	 * Write all the reldescs (in no particular order).
H
Hiroshi Inoue 已提交
4729
	 */
4730
	hash_seq_init(&status, RelationIdCache);
4731

4732
	initFileRelationIds = NIL;
4733

4734
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
4735
	{
4736
		Relation	rel = idhentry->reldesc;
4737
		Form_pg_class relform = rel->rd_rel;
4738

4739 4740 4741 4742
		/* ignore if not correct group */
		if (relform->relisshared != shared)
			continue;

B
Bruce Momjian 已提交
4743 4744
		/* first write the relcache entry proper */
		write_item(rel, sizeof(RelationData), fp);
4745 4746

		/* next write the relation tuple form */
B
Bruce Momjian 已提交
4747
		write_item(relform, CLASS_TUPLE_SIZE, fp);
4748 4749 4750 4751

		/* next, do all the attribute tuple form data entries */
		for (i = 0; i < relform->relnatts; i++)
		{
4752
			write_item(rel->rd_att->attrs[i], ATTRIBUTE_FIXED_PART_SIZE, fp);
4753 4754
		}

B
Bruce Momjian 已提交
4755 4756
		/* next, do the access method specific field */
		write_item(rel->rd_options,
4757
				   (rel->rd_options ? VARSIZE(rel->rd_options) : 0),
4758
				   fp);
B
Bruce Momjian 已提交
4759

4760 4761 4762 4763
		/* If it's an index, there's more to do */
		if (rel->rd_rel->relkind == RELKIND_INDEX)
		{
			Form_pg_am	am = rel->rd_am;
4764

4765 4766
			/* write the pg_index tuple */
			/* we assume this was created by heap_copytuple! */
B
Bruce Momjian 已提交
4767
			write_item(rel->rd_indextuple,
4768 4769
					   HEAPTUPLESIZE + rel->rd_indextuple->t_len,
					   fp);
4770 4771

			/* next, write the access method tuple form */
B
Bruce Momjian 已提交
4772
			write_item(am, sizeof(FormData_pg_am), fp);
4773

4774 4775 4776 4777 4778 4779 4780 4781 4782 4783
			/* next, write the vector of opfamily OIDs */
			write_item(rel->rd_opfamily,
					   relform->relnatts * sizeof(Oid),
					   fp);

			/* next, write the vector of opcintype OIDs */
			write_item(rel->rd_opcintype,
					   relform->relnatts * sizeof(Oid),
					   fp);

4784
			/* next, write the vector of operator OIDs */
4785 4786 4787
			write_item(rel->rd_operator,
					   relform->relnatts * (am->amstrategies * sizeof(Oid)),
					   fp);
4788

4789
			/* next, write the vector of support procedures */
4790
			write_item(rel->rd_support,
B
Bruce Momjian 已提交
4791
				  relform->relnatts * (am->amsupport * sizeof(RegProcedure)),
4792
					   fp);
4793 4794 4795 4796 4797

			/* finally, write the vector of indoption values */
			write_item(rel->rd_indoption,
					   relform->relnatts * sizeof(int16),
					   fp);
4798
		}
4799

4800
		/* also make a list of their OIDs, for RelationIdIsInInitFile */
4801 4802 4803 4804 4805 4806 4807
		if (!shared)
		{
			oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
			initFileRelationIds = lcons_oid(RelationGetRelid(rel),
											initFileRelationIds);
			MemoryContextSwitchTo(oldcxt);
		}
4808
	}
4809

4810 4811
	if (FreeFile(fp))
		elog(FATAL, "could not write init file");
4812

4813
	/*
4814
	 * Now we have to check whether the data we've so painstakingly
B
Bruce Momjian 已提交
4815 4816 4817 4818 4819
	 * accumulated is already obsolete due to someone else's just-committed
	 * catalog changes.  If so, we just delete the temp file and leave it to
	 * the next backend to try again.  (Our own relcache entries will be
	 * updated by SI message processing, but we can't be sure whether what we
	 * wrote out was up-to-date.)
4820
	 *
4821 4822
	 * This mustn't run concurrently with the code that unlinks an init file
	 * and sends SI messages, so grab a serialization lock for the duration.
4823
	 */
4824 4825 4826 4827 4828 4829
	LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);

	/* Make sure we have seen all incoming SI messages */
	AcceptInvalidationMessages();

	/*
B
Bruce Momjian 已提交
4830 4831
	 * If we have received any SI relcache invals since backend start, assume
	 * we may have written out-of-date data.
4832 4833
	 */
	if (relcacheInvalsReceived == 0L)
4834 4835
	{
		/*
4836 4837
		 * OK, rename the temp file to its final name, deleting any
		 * previously-existing init file.
4838
		 *
4839 4840 4841 4842
		 * Note: a failure here is possible under Cygwin, if some other
		 * backend is holding open an unlinked-but-not-yet-gone init file. So
		 * treat this as a noncritical failure; just remove the useless temp
		 * file on failure.
4843
		 */
4844 4845
		if (rename(tempfilename, finalfilename) < 0)
			unlink(tempfilename);
4846 4847 4848 4849
	}
	else
	{
		/* Delete the already-obsolete temp file */
4850 4851
		unlink(tempfilename);
	}
4852 4853

	LWLockRelease(RelCacheInitLock);
4854 4855
}

4856 4857 4858 4859 4860 4861 4862 4863 4864 4865
/* write a chunk of data preceded by its length */
static void
write_item(const void *data, Size len, FILE *fp)
{
	if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
		elog(FATAL, "could not write init file");
	if (fwrite(data, 1, len, fp) != len)
		elog(FATAL, "could not write init file");
}

4866 4867 4868 4869 4870 4871 4872 4873 4874 4875 4876 4877
/*
 * Detect whether a given relation (identified by OID) is one of the ones
 * we store in the init file.
 *
 * Note that we effectively assume that all backends running in a database
 * would choose to store the same set of relations in the init file;
 * otherwise there are cases where we'd fail to detect the need for an init
 * file invalidation.  This does not seem likely to be a problem in practice.
 */
bool
RelationIdIsInInitFile(Oid relationId)
{
4878
	return list_member_oid(initFileRelationIds, relationId);
4879 4880 4881 4882 4883
}

/*
 * Invalidate (remove) the init file during commit of a transaction that
 * changed one or more of the relation cache entries that are kept in the
4884
 * local init file.
4885
 *
4886 4887 4888 4889 4890 4891 4892 4893 4894 4895 4896 4897
 * To be safe against concurrent inspection or rewriting of the init file,
 * we must take RelCacheInitLock, then remove the old init file, then send
 * the SI messages that include relcache inval for such relations, and then
 * release RelCacheInitLock.  This serializes the whole affair against
 * write_relcache_init_file, so that we can be sure that any other process
 * that's concurrently trying to create a new init file won't move an
 * already-stale version into place after we unlink.  Also, because we unlink
 * before sending the SI messages, a backend that's currently starting cannot
 * read the now-obsolete init file and then miss the SI messages that will
 * force it to update its relcache entries.  (This works because the backend
 * startup sequence gets into the sinval array before trying to load the init
 * file.)
4898
 *
4899 4900 4901
 * We take the lock and do the unlink in RelationCacheInitFilePreInvalidate,
 * then release the lock in RelationCacheInitFilePostInvalidate.  Caller must
 * send any pending SI messages between those calls.
4902 4903 4904 4905 4906 4907 4908
 *
 * Notice this deals only with the local init file, not the shared init file.
 * The reason is that there can never be a "significant" change to the
 * relcache entry of a shared relation; the most that could happen is
 * updates of noncritical fields such as relpages/reltuples.  So, while
 * it's worth updating the shared init file from time to time, it can never
 * be invalid enough to make it necessary to remove it.
4909 4910
 */
void
4911
RelationCacheInitFilePreInvalidate(void)
4912 4913 4914 4915 4916 4917
{
	char		initfilename[MAXPGPATH];

	snprintf(initfilename, sizeof(initfilename), "%s/%s",
			 DatabasePath, RELCACHE_INIT_FILENAME);

4918 4919 4920
	LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);

	if (unlink(initfilename) < 0)
4921 4922
	{
		/*
4923 4924 4925 4926
		 * The file might not be there if no backend has been started since
		 * the last removal.  But complain about failures other than ENOENT.
		 * Fortunately, it's not too late to abort the transaction if we
		 * can't get rid of the would-be-obsolete init file.
4927
		 */
4928 4929 4930 4931 4932
		if (errno != ENOENT)
			ereport(ERROR,
					(errcode_for_file_access(),
					 errmsg("could not remove cache file \"%s\": %m",
							initfilename)));
4933
	}
4934
}
4935

4936 4937 4938 4939 4940 4941
void
RelationCacheInitFilePostInvalidate(void)
{
	LWLockRelease(RelCacheInitLock);
}

4942
/*
4943 4944 4945 4946 4947 4948 4949
 * Remove the init files during postmaster startup.
 *
 * We used to keep the init files across restarts, but that is unsafe even in simple
 * crash-recovery cases as there are windows for the init files to become out-of-sync
 * with the database. So now we just remove them during startup and expect the
 * first backend launch to rebuild them. Of course, this has to happen in each
 * database of the cluster.
4950 4951
 */
void
4952 4953 4954 4955 4956 4957 4958 4959 4960 4961 4962 4963 4964 4965 4966 4967 4968 4969 4970
RelationCacheInitFileRemove(void)
{
	char		path[MAXPGPATH];

	/*
	 * We zap the shared cache file too.  In theory it can't get out of sync
	 * enough to be a problem, but in data-corruption cases, who knows ...
	 */
	snprintf(path, sizeof(path), "global/%s",
			 RELCACHE_INIT_FILENAME);
	unlink_initfile(path);

	/* Scan everything in the default tablespace */
	RelationCacheInitFileRemoveInDir("base");
}

/* Process one per-tablespace directory for RelationCacheInitFileRemove */
static void
RelationCacheInitFileRemoveInDir(const char *tblspcpath)
4971
{
4972 4973
	DIR		   *dir;
	struct dirent *de;
4974 4975
	char		initfilename[MAXPGPATH];

4976 4977 4978 4979 4980 4981 4982 4983 4984 4985 4986 4987 4988 4989 4990 4991 4992 4993 4994 4995 4996 4997 4998 4999 5000 5001 5002 5003 5004 5005 5006 5007
	/* Scan the tablespace directory to find per-database directories */
	dir = AllocateDir(tblspcpath);
	if (dir == NULL)
	{
		elog(LOG, "could not open tablespace directory \"%s\": %m",
			 tblspcpath);
		return;
	}

	while ((de = ReadDir(dir, tblspcpath)) != NULL)
	{
		if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
		{
			/* Try to remove the init file in each database */
			snprintf(initfilename, sizeof(initfilename), "%s/%s/%s",
					 tblspcpath, de->d_name, RELCACHE_INIT_FILENAME);
			unlink_initfile(initfilename);
		}
	}

	FreeDir(dir);
}

static void
unlink_initfile(const char *initfilename)
{
	if (unlink(initfilename) < 0)
	{
		/* It might not be there, but log any error other than ENOENT */
		if (errno != ENOENT)
			elog(LOG, "could not remove cache file \"%s\": %m", initfilename);
	}
5008
}
5009 5010 5011 5012 5013 5014 5015 5016 5017 5018 5019 5020 5021 5022 5023 5024 5025

void
RelationGetPTInfo(Relation rel,
	ItemPointer persistentTid,
	int64 *persistentSerialNum)
{
	if (! GpPersistent_SkipXLogInfo(rel->rd_node.relNode) &&
		! rel->rd_segfile0_relationnodeinfo.isPresent)
	{
		elog(ERROR,
			 "required Persistent Table information missing for relation %u/%u/%u",
			 rel->rd_node.spcNode, rel->rd_node.dbNode, rel->rd_node.relNode);
	}

	*persistentTid = rel->rd_segfile0_relationnodeinfo.persistentTid;
	*persistentSerialNum = rel->rd_segfile0_relationnodeinfo.persistentSerialNum;
}