relcache.c 149.3 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * relcache.c
4
 *	  POSTGRES relation descriptor cache code
5
 *
6 7
 * Portions Copyright (c) 2005-2009, Greenplum inc.
 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
B
Add:  
Bruce Momjian 已提交
8
 * Portions Copyright (c) 1994, Regents of the University of California
9 10 11
 *
 *
 * IDENTIFICATION
12
 *	  $PostgreSQL: pgsql/src/backend/utils/cache/relcache.c,v 1.266.2.10 2010/09/02 03:17:06 tgl Exp $
13 14 15 16 17
 *
 *-------------------------------------------------------------------------
 */
/*
 * INTERFACE ROUTINES
18
 *		RelationCacheInitialize			- initialize relcache (to empty)
19 20
 *		RelationCacheInitializePhase2	- initialize shared-catalog entries
 *		RelationCacheInitializePhase3	- finish initializing relcache
21 22
 *		RelationIdGetRelation			- get a reldesc by relation id
 *		RelationClose					- close an open relation
23 24
 *
 * NOTES
25 26
 *		The following code contains many undocumented hacks.  Please be
 *		careful....
27
 */
28 29
#include "postgres.h"

30
#include <sys/file.h>
31
#include <fcntl.h>
32
#include <unistd.h>
33

34 35
#include "access/genam.h"
#include "access/heapam.h"
36
#include "access/reloptions.h"
37
#include "access/sysattr.h"
38
#include "access/xact.h"
39
#include "catalog/catalog.h"
40
#include "catalog/index.h"
B
Bruce Momjian 已提交
41
#include "catalog/indexing.h"
42
#include "catalog/namespace.h"
43 44
#include "catalog/pg_amop.h"
#include "catalog/pg_amproc.h"
B
Bruce Momjian 已提交
45
#include "catalog/pg_attrdef.h"
46
#include "catalog/pg_authid.h"
47
#include "catalog/pg_auth_members.h"
48
#include "catalog/pg_constraint.h"
49
#include "catalog/pg_database.h"
50
#include "catalog/pg_namespace.h"
51
#include "catalog/pg_opclass.h"
52
#include "catalog/pg_operator.h"
B
Bruce Momjian 已提交
53
#include "catalog/pg_proc.h"
54
#include "catalog/pg_rewrite.h"
55 56
#include "catalog/pg_tablespace.h"
#include "catalog/pg_trigger.h"
57
#include "catalog/pg_type.h"
58
#include "commands/trigger.h"
B
Bruce Momjian 已提交
59
#include "miscadmin.h"
60 61
#include "optimizer/clauses.h"
#include "optimizer/planmain.h"
62
#include "optimizer/prep.h"
63
#include "optimizer/var.h"
64
#include "rewrite/rewriteDefine.h"
65
#include "storage/fd.h"
B
Bruce Momjian 已提交
66
#include "storage/smgr.h"
67
#include "utils/builtins.h"
68
#include "utils/fmgroids.h"
69
#include "utils/inval.h"
70
#include "utils/memutils.h"
B
Bruce Momjian 已提交
71
#include "utils/relcache.h"
72
#include "utils/relationnode.h"
73
#include "utils/resowner.h"
74
#include "utils/syscache.h"
B
Bruce Momjian 已提交
75

76
#include "catalog/gp_policy.h"         /* GpPolicy */
77 78 79 80 81 82
#include "cdb/cdbtm.h"
#include "cdb/cdbvars.h"        /* Gp_role */
#include "cdb/cdbmirroredflatfile.h"
#include "cdb/cdbpersistentfilesysobj.h"
#include "cdb/cdbsreh.h"

83

84 85 86 87 88
/*
 * name of relcache init file, used to speed up backend startup
 */
#define RELCACHE_INIT_FILENAME	"pg_internal.init"

89
#define RELCACHE_INIT_FILEMAGIC		0x773264	/* version ID value */
90

91
/*
92
 *		hardcoded tuple descriptors.  see include/catalog/pg_attribute.h
93
 */
94 95 96 97 98 99 100 101 102
static const FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
static const FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
static const FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
static const FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
static const FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};

static const FormData_pg_attribute Desc_pg_database[Natts_pg_database] = {Schema_pg_database};
static const FormData_pg_attribute Desc_pg_authid[Natts_pg_authid] = {Schema_pg_authid};
static const FormData_pg_attribute Desc_pg_auth_members[Natts_pg_auth_members] = {Schema_pg_auth_members};
103

104
/*
105
 *		Hash tables that index the relation cache
106
 *
107 108
 *		We used to index the cache by both name and OID, but now there
 *		is only an index by OID.
109
 */
110 111 112 113 114 115
typedef struct relidcacheent
{
	Oid			reloid;
	Relation	reldesc;
} RelIdCacheEnt;

116
static HTAB *RelationIdCache;
117

118 119 120 121
/*
 * This flag is false until we have prepared the critical relcache entries
 * that are needed to do indexscans on the tables read by relcache building.
 */
B
Bruce Momjian 已提交
122
bool		criticalRelcachesBuilt = false;
123

124 125 126 127 128 129
/*
 * This flag is false until we have prepared the critical relcache entries
 * for shared catalogs (which are the tables needed for login).
 */
bool		criticalSharedRelcachesBuilt = false;

130 131
/*
 * This counter counts relcache inval events received since backend startup
B
Bruce Momjian 已提交
132
 * (but only for rels that are actually in cache).	Presently, we use it only
133 134 135 136
 * to detect whether data about to be written by write_relcache_init_file()
 * might already be obsolete.
 */
static long relcacheInvalsReceived = 0L;
137

138
/*
139 140 141 142
 * This list remembers the OIDs of the non-shared relations cached in the
 * database's local relcache init file.  Note that there is no corresponding
 * list for the shared relcache init file, for reasons explained in the
 * comments for RelationCacheInitFileRemove.
143 144
 */
static List *initFileRelationIds = NIL;
145

146
/*
147
 * This flag lets us optimize away work in AtEO(Sub)Xact_RelationCache().
148
 */
149
static bool need_eoxact_work = false;
150

151

152
/*
153
 *		macros to manipulate the lookup hashtables
154 155
 */
#define RelationCacheInsert(RELATION)	\
156
do { \
157
	RelIdCacheEnt *idhentry; bool found; \
158
	idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
159
										   (void *) &(RELATION->rd_id), \
160
										   HASH_ENTER, &found); \
161
	/* used to give notice if found -- now just keep quiet */ \
162 163 164
	idhentry->reldesc = RELATION; \
} while(0)

165
#define RelationIdCacheLookup(ID, RELATION) \
166
do { \
167 168
	RelIdCacheEnt *hentry; \
	hentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
169 170
										 (void *) &(ID), \
										 HASH_FIND, NULL); \
171
	if (hentry) \
172 173 174 175 176 177 178
		RELATION = hentry->reldesc; \
	else \
		RELATION = NULL; \
} while(0)

#define RelationCacheDelete(RELATION) \
do { \
179
	RelIdCacheEnt *idhentry; \
180
	idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
181
										   (void *) &(RELATION->rd_id), \
182
										   HASH_REMOVE, NULL); \
183
	if (idhentry == NULL) \
184
		elog(WARNING, "trying to delete a rd_id reldesc that does not exist"); \
185
} while(0)
186

187 188 189

/*
 * Special cache for opclass-related information
190
 *
191 192
 * Note: only default operators and support procs get cached, ie, those with
 * lefttype = righttype = opcintype.
193 194 195 196 197 198 199
 */
typedef struct opclasscacheent
{
	Oid			opclassoid;		/* lookup key: OID of opclass */
	bool		valid;			/* set TRUE after successful fill-in */
	StrategyNumber numStrats;	/* max # of strategies (from pg_am) */
	StrategyNumber numSupport;	/* max # of support procs (from pg_am) */
200 201
	Oid			opcfamily;		/* OID of opclass's family */
	Oid			opcintype;		/* OID of opclass's declared input type */
202
	Oid		   *operatorOids;	/* strategy operators' OIDs */
B
Bruce Momjian 已提交
203
	RegProcedure *supportProcs; /* support procs */
204 205 206 207 208
} OpClassCacheEnt;

static HTAB *OpClassCache = NULL;


209
/* non-export function prototypes */
210

211
static void RelationDestroyRelation(Relation relation);
212
static void RelationClearRelation(Relation relation, bool rebuild);
B
Bruce Momjian 已提交
213

214
static void RelationReloadIndexInfo(Relation relation);
215
static void RelationFlushRelation(Relation relation);
216 217
static bool load_relcache_init_file(bool shared);
static void write_relcache_init_file(bool shared);
B
Bruce Momjian 已提交
218
static void write_item(const void *data, Size len, FILE *fp);
219

220
static void formrdesc(const char *relationName, Oid relationReltype,
221 222
		  bool isshared, bool hasoids,
		  int natts, const FormData_pg_attribute *att);
223

224 225
static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK, Relation *pg_class_relation);
static Relation AllocateRelationDesc(Form_pg_class relp);
226
static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
227
static void RelationBuildTupleDesc(Relation relation);
228
static Relation RelationBuildDesc(Oid targetRelId, bool insertIt);
229
static void RelationInitPhysicalAddr(Relation relation);
230
static void RelationInitAppendOnlyInfo(Relation relation);
231
static void load_critical_index(Oid indexoid, Oid heapoid);
232
static TupleDesc GetPgClassDescriptor(void);
233
static TupleDesc GetPgIndexDescriptor(void);
234
static void AttrDefaultFetch(Relation relation);
235
static void CheckConstraintFetch(Relation relation);
236
static List *insert_ordered_oid(List *list, Oid datum);
237
static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
B
Bruce Momjian 已提交
238 239
				  StrategyNumber numStrats,
				  StrategyNumber numSupport);
240 241
static void RelationCacheInitFileRemoveInDir(const char *tblspcpath);
static void unlink_initfile(const char *initfilename);
242

243

244
/*
245
 *		ScanPgRelation
246
 *
247 248 249 250 251
 *		This is used by RelationBuildDesc to find a pg_class
 *		tuple matching targetRelId.  The caller must hold at least
 *		AccessShareLock on the target relid to prevent concurrent-update
 *		scenarios --- else our SnapshotNow scan might fail to find any
 *		version that it thinks is live.
252 253 254
 *
 *		NB: the returned tuple has been copied into palloc'd storage
 *		and must eventually be freed with heap_freetuple.
255
 */
256
static HeapTuple
257
ScanPgRelation(Oid targetRelId, bool indexOK, Relation *pg_class_relation)
258
{
259 260
	HeapTuple	pg_class_tuple;
	Relation	pg_class_desc;
261 262
	SysScanDesc pg_class_scan;
	ScanKeyData key[1];
263 264 265 266 267 268 269 270 271 272 273 274 275

	/*
	 * If something goes wrong during backend startup, we might find ourselves
	 * trying to read pg_class before we've selected a database.  That ain't
	 * gonna work, so bail out with a useful error message.  If this happens,
	 * it probably means a relcache entry that needs to be nailed isn't.
	 */
	if (!OidIsValid(MyDatabaseId))
		elog(FATAL, "cannot read pg_class without having selected a database");

	/*
	 * form a scan key
	 */
276 277 278 279
	ScanKeyInit(&key[0],
				ObjectIdAttributeNumber,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(targetRelId));
280 281 282 283 284 285 286 287

	/*
	 * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).  The caller can also force a heap
	 * scan by setting indexOK == false.
	 */
	pg_class_desc = heap_open(RelationRelationId, AccessShareLock);
288 289 290 291
	pg_class_scan = systable_beginscan(pg_class_desc, ClassOidIndexId,
									   indexOK && criticalRelcachesBuilt,
									   SnapshotNow,
									   1, key);
292

293
	pg_class_tuple = systable_getnext(pg_class_scan);
294 295

	/*
296
	 * Must copy tuple before releasing buffer.
297
	 */
298 299
	if (HeapTupleIsValid(pg_class_tuple))
		pg_class_tuple = heap_copytuple(pg_class_tuple);
300 301

	/* all done */
302
	systable_endscan(pg_class_scan);
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
	if (pg_class_relation == NULL)
		heap_close(pg_class_desc, AccessShareLock);
	else
		*pg_class_relation = pg_class_desc;

	return pg_class_tuple;
}

void
GpRelationNodeBeginScan(
	Snapshot	snapshot,
	Relation 	gp_relation_node,
	Oid		relationId,
	Oid 		relfilenode,
	GpRelationNodeScan 	*gpRelationNodeScan)
{
	Assert (relfilenode != 0);

	MemSet(gpRelationNodeScan, 0, sizeof(GpRelationNodeScan));

	/*
	 * form a scan key
	 */
	/* XXX XXX: break this out -- find callers - jic 2011/12/09 */
	/* maybe it's ok - return a cql context ? */

	/* XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX */
	/* no json defs for persistent tables ? */
/*
	cqxx("SELECT * FROM gp_relation_node_relfilenode "
		 " WHERE oid = :1 ",
		 ObjectIdGetDatum(relfilenode));
*/
	/* XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX */

	ScanKeyInit(&gpRelationNodeScan->scankey[0],
				Anum_gp_relation_node_relfilenode_oid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(relfilenode));

	/*
	 * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).  The caller can also force a heap
	 * scan by setting indexOK == false.
	 */
	gpRelationNodeScan->scan = \
		systable_beginscan(gp_relation_node, GpRelationNodeOidIndexId,
						   /* indexOK */ true,
						   snapshot,
						   /* nKeys */ 1, 
						   gpRelationNodeScan->scankey);

	gpRelationNodeScan->gp_relation_node = gp_relation_node;
	gpRelationNodeScan->relationId = relationId;
	gpRelationNodeScan->relfilenode = relfilenode;
}

HeapTuple
GpRelationNodeGetNext(
	GpRelationNodeScan 	*gpRelationNodeScan,
	int32				*segmentFileNum,
	ItemPointer			persistentTid,
	int64				*persistentSerialNum)
{
	HeapTuple tuple;
	
	bool			nulls[Natts_gp_relation_node];
	Datum			values[Natts_gp_relation_node];
	
	Oid actualRelationNode;
	
	int64 createMirrorDataLossTrackingSessionNum;

	tuple = systable_getnext((SysScanDesc)gpRelationNodeScan->scan);

	/*
	 * if no such tuple exists, return NULL
	 */
	if (!HeapTupleIsValid(tuple))
	{
		MemSet(persistentTid, 0, sizeof(ItemPointerData));
		*persistentSerialNum = 0;
		return tuple;
	}
	
	heap_deform_tuple(tuple, RelationGetDescr(gpRelationNodeScan->gp_relation_node), values, nulls);
		
	GpRelationNode_GetValues(
						values,
						&actualRelationNode,
						segmentFileNum,
						&createMirrorDataLossTrackingSessionNum,
						persistentTid,
						persistentSerialNum);
	if (actualRelationNode != gpRelationNodeScan->relfilenode)
399 400
		elog(FATAL, "Index on gp_relation_node broken."
			   "Mismatch in node tuple for gp_relation_node for relation %u, relfilenode %u, relation node %u",
401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416
			 gpRelationNodeScan->relationId, 
			 gpRelationNodeScan->relfilenode,
			 actualRelationNode);

	return tuple;
}


void
GpRelationNodeEndScan(
	GpRelationNodeScan 	*gpRelationNodeScan)
{
	/* all done */
	systable_endscan((SysScanDesc)gpRelationNodeScan->scan);
}

417
static HeapTuple
418 419 420 421 422 423 424 425 426 427
ScanGpRelationNodeTuple(
	Relation 	gp_relation_node,
	Oid 		relfilenode,
	int32		segmentFileNum)
{
	HeapTuple	tuple;
	SysScanDesc scan;
	ScanKeyData key[2];

	Assert (relfilenode != 0);
428

429
	/*
B
Bruce Momjian 已提交
430
	 * form a scan key
431
	 */
432 433 434 435 436 437 438 439 440 441 442

	/* XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX */
/*
	cqxx("SELECT * FROM gp_relation_node "
		 " WHERE relfilenode_oid = :1 "
		 " AND segment_file_num = :2 ",
		 ObjectIdGetDatum(relfilenode),
		 Int32GetDatum(segmentFileNum));
*/
	/* XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX */

443
	ScanKeyInit(&key[0],
444
				Anum_gp_relation_node_relfilenode_oid,
445
				BTEqualStrategyNumber, F_OIDEQ,
446 447 448 449 450
				ObjectIdGetDatum(relfilenode));
	ScanKeyInit(&key[1],
				Anum_gp_relation_node_segment_file_num,
				BTEqualStrategyNumber, F_INT4EQ,
				Int32GetDatum(segmentFileNum));
451

452
	/*
B
Bruce Momjian 已提交
453
	 * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
B
Bruce Momjian 已提交
454 455 456
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).  The caller can also force a heap
	 * scan by setting indexOK == false.
457
	 */
458 459
	scan = systable_beginscan(gp_relation_node, GpRelationNodeOidIndexId,
									   /* indexOK */ true,
460
									   SnapshotNow,
461
									   2, key);
462

463
	tuple = systable_getnext(scan);
B
Bruce Momjian 已提交
464

H
Hiroshi Inoue 已提交
465
	/*
466
	 * Must copy tuple before releasing buffer.
H
Hiroshi Inoue 已提交
467
	 */
468 469
	if (HeapTupleIsValid(tuple))
		tuple = heap_copytuple(tuple);
470

471
	/* all done */
472
	systable_endscan(scan);
473

474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521
	return tuple;
}

HeapTuple
FetchGpRelationNodeTuple(
	Relation 		gp_relation_node,
	Oid 			relfilenode,
	int32			segmentFileNum,
	ItemPointer		persistentTid,
	int64			*persistentSerialNum)
{
	HeapTuple tuple;
	
	bool			nulls[Natts_gp_relation_node];
	Datum			values[Natts_gp_relation_node];
	
	Oid actualRelationNode;
	int32 actualSegmentFileNum;

	int64 createMirrorDataLossTrackingSessionNum;

	Assert (relfilenode != 0);
	
	tuple = ScanGpRelationNodeTuple(
					gp_relation_node,
					relfilenode,
					segmentFileNum);
	
	/*
	 * if no such tuple exists, return NULL
	 */
	if (!HeapTupleIsValid(tuple))
	{
		MemSet(persistentTid, 0, sizeof(ItemPointerData));
		*persistentSerialNum = 0;
		return tuple;
	}
	
	heap_deform_tuple(tuple, RelationGetDescr(gp_relation_node), values, nulls);
		
	GpRelationNode_GetValues(
						values,
						&actualRelationNode,
						&actualSegmentFileNum,
						&createMirrorDataLossTrackingSessionNum,
						persistentTid,
						persistentSerialNum);
	
522 523 524 525 526 527 528 529
	if (actualRelationNode != relfilenode)
	{
		elog(ERROR, "Index on gp_relation_node broken."
			   "Mismatch in node tuple for gp_relation_node intended relfilenode %u, fetched relfilenode %u",
			 relfilenode,
			 actualRelationNode);
	}

530 531 532 533 534 535 536 537 538 539 540 541 542 543
	return tuple;
}

/*
 * Deletes the gp relation node entry for the
 * given segment file.
 */ 
void
DeleteGpRelationNodeTuple(
	Relation 	relation,
	int32		segmentFileNum)
{
	Relation	gp_relation_node;
	HeapTuple	tuple;
544 545
	ItemPointerData     persistentTid;
	int64               persistentSerialNum;
546 547 548

	gp_relation_node = heap_open(GpRelationNodeRelationId, RowExclusiveLock);

549 550 551 552 553 554
	tuple = FetchGpRelationNodeTuple(gp_relation_node,
				relation->rd_rel->relfilenode,
				segmentFileNum,
				&persistentTid,
				&persistentSerialNum);

555 556 557 558 559 560
	if (!HeapTupleIsValid(tuple))
		elog(ERROR, "could not find node tuple for relation %u, relation file node %u, segment file #%d",
			 RelationGetRelid(relation),
			 relation->rd_rel->relfilenode,
			 segmentFileNum);

561
	/* delete the relation tuple from gp_relation_node, and finish up */
562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661
	simple_heap_delete(gp_relation_node, &tuple->t_self);
	heap_freetuple(tuple);

	heap_close(gp_relation_node, RowExclusiveLock);
}

bool
ReadGpRelationNode(
	Oid 			relfilenode,
	
	int32			segmentFileNum,

	ItemPointer		persistentTid,

	int64			*persistentSerialNum)
{
	Relation gp_relation_node;
	HeapTuple tuple;
	bool found;

	MemSet(persistentTid, 0, sizeof(ItemPointerData));
	*persistentSerialNum = 0;

	gp_relation_node = heap_open(GpRelationNodeRelationId, AccessShareLock);

	tuple = FetchGpRelationNodeTuple(
						gp_relation_node,
						relfilenode,
						segmentFileNum,
						persistentTid,
						persistentSerialNum);

	/*
	 * if no such tuple exists, return NULL
	 */
	if (!HeapTupleIsValid(tuple))
	{
		found = false;
	}
	else
	{
		if (Debug_persistent_print)
		{
			TupleVisibilitySummary tupleVisibilitySummary;
			char *tupleVisibilitySummaryString;
			
			GetTupleVisibilitySummary(
									tuple,
									&tupleVisibilitySummary);
			tupleVisibilitySummaryString = GetTupleVisibilitySummaryString(&tupleVisibilitySummary);
			
			elog(Persistent_DebugPrintLevel(), 
				 "ReadGpRelationNode: For relfilenode %u, segment file #%d found persistent serial number " INT64_FORMAT ", TID %s (gp_relation_node tuple visibility: %s)",
				 relfilenode,
				 segmentFileNum,
				 *persistentSerialNum,
				 ItemPointerToString(persistentTid),
				 tupleVisibilitySummaryString);
			pfree(tupleVisibilitySummaryString);
		}

		found = true;
		heap_freetuple(tuple);
	}

	heap_close(gp_relation_node, AccessShareLock);

	return found;
}

void
RelationFetchSegFile0GpRelationNode(
	Relation relation)
{
	if (!relation->rd_segfile0_relationnodeinfo.isPresent)
	{
		if (Persistent_BeforePersistenceWork() || InRecovery)
		{
			MemSet(&relation->rd_segfile0_relationnodeinfo.persistentTid, 0, sizeof(ItemPointerData));
			relation->rd_segfile0_relationnodeinfo.persistentSerialNum = 0;
		
			relation->rd_segfile0_relationnodeinfo.isPresent = true;
			relation->rd_segfile0_relationnodeinfo.tidAllowedToBeZero = true;
			
			return; // The initdb process will load the persistent table once we out of bootstrap mode.
		}

		if (!ReadGpRelationNode(
					relation->rd_node.relNode,
					/* segmentFileNum */ 0,
					&relation->rd_segfile0_relationnodeinfo.persistentTid,
					&relation->rd_segfile0_relationnodeinfo.persistentSerialNum))
		{
			elog(ERROR, "Did not find gp_relation_node entry for relation name %s, relation id %u, relfilenode %u",
				 relation->rd_rel->relname.data,
				 relation->rd_id,
				 relation->rd_node.relNode);
		}

		Assert(!Persistent_BeforePersistenceWork());
662
		if (PersistentStore_IsZeroTid(&relation->rd_segfile0_relationnodeinfo.persistentTid))
663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727
		{	
			elog(ERROR, 
				 "RelationFetchSegFile0GpRelationNode has invalid TID (0,0) into relation %u/%u/%u '%s', serial number " INT64_FORMAT,
				 relation->rd_node.spcNode,
				 relation->rd_node.dbNode,
				 relation->rd_node.relNode,
				 NameStr(relation->rd_rel->relname),
				 relation->rd_segfile0_relationnodeinfo.persistentSerialNum);
		}

		relation->rd_segfile0_relationnodeinfo.isPresent = true;
		
	}

}

// UNDONE: Temporary
void
RelationFetchGpRelationNodeForXLog_Index(
	Relation relation)
{
	static int countInThisBackend = 0;
	static int deep = 0;
	
	deep++;

	countInThisBackend++;

	if (deep >= 2)
	{
		int saveDeep;

		if (Debug_gp_relation_node_fetch_wait_for_debugging)
		{
			/* Code for investigating MPP-16395, will be removed as part of the fix */
			elog(WARNING, "RelationFetchGpRelationNodeForXLog_Index [%d] for non-heap %u/%u/%u (deep %d) -- waiting for debug attach...",
				 countInThisBackend,
				 relation->rd_node.spcNode,
				 relation->rd_node.dbNode,
				 relation->rd_node.relNode,
				 deep);

			for (int i=0; i < 24 * 60; i++)
			{
				pg_usleep(60000000L); /* 60 sec */
			}
		}

		/*
		 * Reset counter in case the user continues to use the session.
		 */
		saveDeep = deep;
		deep = 0;

		elog(ERROR, "RelationFetchGpRelationNodeForXLog_Index [%d] for non-heap %u/%u/%u (deep %d)",
			 countInThisBackend,
			 relation->rd_node.spcNode,
			 relation->rd_node.dbNode,
			 relation->rd_node.relNode,
			 saveDeep);
	}

	RelationFetchSegFile0GpRelationNode(relation);

	deep--;
728 729
}

730
/*
731
 *		AllocateRelationDesc
732
 *
733
 *		This is used to allocate memory for a new relation descriptor
734
 *		and initialize the rd_rel field from the given pg_class tuple.
735
 */
736
static Relation
737
AllocateRelationDesc(Form_pg_class relp)
738
{
739
	Relation	relation;
740
	MemoryContext oldcxt;
741
	Form_pg_class relationForm;
742

743 744
	/* Relcache entries must live in CacheMemoryContext */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
745

746
	/*
747
	 * allocate and zero space for new relation descriptor
748
	 */
749
	relation = (Relation) palloc0(sizeof(RelationData));
750

751
	/*
752
	 * clear fields of reldesc that should initialize to something non-zero
753
	 */
754
	relation->rd_targblock = InvalidBlockNumber;
755

756
	/* make sure relation is marked as having no open file yet */
757
	relation->rd_smgr = NULL;
758

759
	/*
B
Bruce Momjian 已提交
760
	 * Copy the relation tuple form
761
	 *
B
Bruce Momjian 已提交
762 763
	 * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE. The
	 * variable-length fields (relacl, reloptions) are NOT stored in the
764 765
	 * relcache --- there'd be little point in it, since we don't copy the
	 * tuple's nulls bitmap and hence wouldn't know if the values are valid.
B
Bruce Momjian 已提交
766 767 768 769
	 * Bottom line is that relacl *cannot* be retrieved from the relcache. Get
	 * it from the syscache if you need it.  The same goes for the original
	 * form of reloptions (however, we do store the parsed form of reloptions
	 * in rd_options).
770 771
	 */
	relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
772

773
	memcpy(relationForm, relp, CLASS_TUPLE_SIZE);
774 775

	/* initialize relation tuple form */
776
	relation->rd_rel = relationForm;
777

778 779 780 781 782 783 784
	/*
	 * This part MUST be remain as a fetch on demand, otherwise you end up
	 * needing it to open pg_class and then relation_open does infinite recursion...
	 */
	relation->rd_segfile0_relationnodeinfo.isPresent = false;
	relation->rd_segfile0_relationnodeinfo.tidAllowedToBeZero = false;

785
	/* and allocate attribute tuple form storage */
786 787
	relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts,
											   relationForm->relhasoids);
788 789
	/* which we mark as a reference-counted tupdesc */
	relation->rd_att->tdrefcount = 1;
790 791 792

	MemoryContextSwitchTo(oldcxt);

793
	return relation;
794 795
}

B
Bruce Momjian 已提交
796
/*
797 798 799 800 801 802
 * RelationParseRelOptions
 *		Convert pg_class.reloptions into pre-parsed rd_options
 *
 * tuple is the real pg_class tuple (not rd_rel!) for relation
 *
 * Note: rd_rel and (if an index) rd_am must be valid already
B
Bruce Momjian 已提交
803 804
 */
static void
805
RelationParseRelOptions(Relation relation, HeapTuple tuple)
B
Bruce Momjian 已提交
806
{
807 808 809
	Datum		datum;
	bool		isnull;
	bytea	   *options;
B
Bruce Momjian 已提交
810

811
	relation->rd_options = NULL;
B
Bruce Momjian 已提交
812

813
	/* Fall out if relkind should not have options */
B
Bruce Momjian 已提交
814 815
	switch (relation->rd_rel->relkind)
	{
816 817
		case RELKIND_RELATION:
		case RELKIND_TOASTVALUE:
818 819 820
		case RELKIND_AOSEGMENTS:
		case RELKIND_AOBLOCKDIR:
		case RELKIND_AOVISIMAP:
821 822 823 824
		case RELKIND_INDEX:
			break;
		default:
			return;
B
Bruce Momjian 已提交
825 826
	}

827
	/*
B
Bruce Momjian 已提交
828 829 830
	 * Fetch reloptions from tuple; have to use a hardwired descriptor because
	 * we might not have any other for pg_class yet (consider executing this
	 * code for pg_class itself)
831 832 833 834 835 836 837
	 */
	datum = fastgetattr(tuple,
						Anum_pg_class_reloptions,
						GetPgClassDescriptor(),
						&isnull);
	if (isnull)
		return;
B
Bruce Momjian 已提交
838

839
	/* Parse into appropriate format; don't error out here */
B
Bruce Momjian 已提交
840 841
	switch (relation->rd_rel->relkind)
	{
842 843
		case RELKIND_RELATION:
		case RELKIND_TOASTVALUE:
844 845 846
		case RELKIND_AOSEGMENTS:
		case RELKIND_AOBLOCKDIR:
		case RELKIND_AOVISIMAP:
847 848 849 850 851 852 853 854 855 856 857 858 859 860
		case RELKIND_UNCATALOGED:
			options = heap_reloptions(relation->rd_rel->relkind, datum,
									  false);
			break;
		case RELKIND_INDEX:
			options = index_reloptions(relation->rd_am->amoptions, datum,
									   false);
			break;
		default:
			Assert(false);		/* can't get here */
			options = NULL;		/* keep compiler quiet */
			break;
	}

861 862 863 864 865 866
	/*
	 * Copy parsed data into CacheMemoryContext.  To guard against the
	 * possibility of leaks in the reloptions code, we want to do the actual
	 * parsing in the caller's memory context and copy the results into
	 * CacheMemoryContext after the fact.
	 */
867 868 869 870 871
	if (options)
	{
		relation->rd_options = MemoryContextAlloc(CacheMemoryContext,
												  VARSIZE(options));
		memcpy(relation->rd_options, options, VARSIZE(options));
872
		pfree(options);
B
Bruce Momjian 已提交
873 874 875
	}
}

876
/*
877
 *		RelationBuildTupleDesc
878
 *
879
 *		Form the relation's tuple descriptor from information in
880
 *		the pg_attribute, pg_attrdef & pg_constraint system catalogs.
881 882
 */
static void
883
RelationBuildTupleDesc(Relation relation)
884
{
885 886
	HeapTuple	pg_attribute_tuple;
	Relation	pg_attribute_desc;
887 888
	SysScanDesc pg_attribute_scan;
	ScanKeyData skey[2];
889
	int			need;
890
	TupleConstr *constr;
H
Hiroshi Inoue 已提交
891
	AttrDefault *attrdef = NULL;
892
	int			ndef = 0;
893

894 895 896 897
	/* copy some fields from pg_class row to rd_att */
	relation->rd_att->tdtypeid = relation->rd_rel->reltype;
	relation->rd_att->tdtypmod = -1;	/* unnecessary, but... */
	relation->rd_att->tdhasoid = relation->rd_rel->relhasoids;
898

899 900
	constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext,
												sizeof(TupleConstr));
H
Hiroshi Inoue 已提交
901
	constr->has_not_null = false;
902

903
	/*
904
	 * Form a scan key that selects only user attributes (attnum > 0).
B
Bruce Momjian 已提交
905 906
	 * (Eliminating system attribute rows at the index level is lots faster
	 * than fetching them.)
907
	 */
908 909 910 911 912 913 914 915
	ScanKeyInit(&skey[0],
				Anum_pg_attribute_attrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
	ScanKeyInit(&skey[1],
				Anum_pg_attribute_attnum,
				BTGreaterStrategyNumber, F_INT2GT,
				Int16GetDatum(0));
916

917
	/*
B
Bruce Momjian 已提交
918 919 920
	 * Open pg_attribute and begin a scan.	Force heap scan if we haven't yet
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).
921
	 */
922
	pg_attribute_desc = heap_open(AttributeRelationId, AccessShareLock);
923 924 925 926 927
	pg_attribute_scan = systable_beginscan(pg_attribute_desc,
										   AttributeRelidNumIndexId,
										   criticalRelcachesBuilt,
										   SnapshotNow,
										   2, skey);
928

929
	/*
B
Bruce Momjian 已提交
930
	 * add attribute data to relation->rd_att
931
	 */
932
	need = relation->rd_rel->relnatts;
933

934
	while (HeapTupleIsValid(pg_attribute_tuple = systable_getnext(pg_attribute_scan)))
935
	{
936 937
		Form_pg_attribute attp;

938
		attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
939

940 941
		if (attp->attnum <= 0 ||
			attp->attnum > relation->rd_rel->relnatts)
942
			elog(ERROR, "invalid attribute number %d for %s",
943 944
				 attp->attnum, RelationGetRelationName(relation));

945 946
		memcpy(relation->rd_att->attrs[attp->attnum - 1],
			   attp,
947
			   ATTRIBUTE_FIXED_PART_SIZE);
948

949 950
		/* Update constraint/default info */
		if (attp->attnotnull)
951
			constr->has_not_null = true;
H
Hiroshi Inoue 已提交
952

953 954 955 956
		if (attp->atthasdef)
		{
			if (attrdef == NULL)
				attrdef = (AttrDefault *)
957 958 959
					MemoryContextAllocZero(CacheMemoryContext,
										   relation->rd_rel->relnatts *
										   sizeof(AttrDefault));
960 961 962
			attrdef[ndef].adnum = attp->attnum;
			attrdef[ndef].adbin = NULL;
			ndef++;
963
		}
964 965 966
		need--;
		if (need == 0)
			break;
967
	}
968

969
	/*
B
Bruce Momjian 已提交
970
	 * end the scan and close the attribute relation
971
	 */
972
	systable_endscan(pg_attribute_scan);
973
	heap_close(pg_attribute_desc, AccessShareLock);
H
Hiroshi Inoue 已提交
974

975 976 977 978
	if (need != 0)
		elog(ERROR, "catalog is missing %d attribute(s) for relid %u",
			 need, RelationGetRelid(relation));

979
	/*
B
Bruce Momjian 已提交
980 981 982
	 * The attcacheoff values we read from pg_attribute should all be -1
	 * ("unknown").  Verify this if assert checking is on.	They will be
	 * computed when and if needed during tuple access.
983 984 985
	 */
#ifdef USE_ASSERT_CHECKING
	{
B
Bruce Momjian 已提交
986
		int			i;
987 988 989 990 991 992

		for (i = 0; i < relation->rd_rel->relnatts; i++)
			Assert(relation->rd_att->attrs[i]->attcacheoff == -1);
	}
#endif

993
	/*
B
Bruce Momjian 已提交
994
	 * However, we can easily set the attcacheoff value for the first
B
Bruce Momjian 已提交
995 996
	 * attribute: it must be zero.	This eliminates the need for special cases
	 * for attnum=1 that used to exist in fastgetattr() and index_getattr().
997
	 */
998 999
	if (relation->rd_rel->relnatts > 0)
		relation->rd_att->attrs[0]->attcacheoff = 0;
1000

1001 1002 1003 1004
	/*
	 * Set up constraint/default info
	 */
	if (constr->has_not_null || ndef > 0 || relation->rd_rel->relchecks)
1005
	{
1006
		relation->rd_att->constr = constr;
1007

1008
		if (ndef > 0)			/* DEFAULTs */
1009
		{
1010 1011 1012 1013 1014 1015 1016
			if (ndef < relation->rd_rel->relnatts)
				constr->defval = (AttrDefault *)
					repalloc(attrdef, ndef * sizeof(AttrDefault));
			else
				constr->defval = attrdef;
			constr->num_defval = ndef;
			AttrDefaultFetch(relation);
1017
		}
1018 1019
		else
			constr->num_defval = 0;
1020

1021
		if (relation->rd_rel->relchecks > 0)	/* CHECKs */
1022
		{
1023 1024
			constr->num_check = relation->rd_rel->relchecks;
			constr->check = (ConstrCheck *)
1025
				MemoryContextAllocZero(CacheMemoryContext,
B
Bruce Momjian 已提交
1026
									constr->num_check * sizeof(ConstrCheck));
1027
			CheckConstraintFetch(relation);
1028
		}
1029 1030 1031 1032 1033 1034 1035
		else
			constr->num_check = 0;
	}
	else
	{
		pfree(constr);
		relation->rd_att->constr = NULL;
1036
	}
1037 1038
}

1039
/*
1040
 *		RelationBuildRuleLock
1041
 *
1042 1043
 *		Form the relation's rewrite rules from information in
 *		the pg_rewrite system catalog.
1044 1045 1046 1047 1048 1049 1050
 *
 * Note: The rule parsetrees are potentially very complex node structures.
 * To allow these trees to be freed when the relcache entry is flushed,
 * we make a private memory context to hold the RuleLock information for
 * each relcache entry that has associated rules.  The context is used
 * just for rule info, not for any other subsidiary data of the relcache
 * entry, because that keeps the update logic in RelationClearRelation()
B
Bruce Momjian 已提交
1051
 * manageable.	The other subsidiary data structures are simple enough
1052
 * to be easy to free explicitly, anyway.
1053 1054 1055 1056
 */
static void
RelationBuildRuleLock(Relation relation)
{
1057 1058
	MemoryContext rulescxt;
	MemoryContext oldcxt;
1059 1060 1061
	HeapTuple	rewrite_tuple;
	Relation	rewrite_desc;
	TupleDesc	rewrite_tupdesc;
1062 1063
	SysScanDesc rewrite_scan;
	ScanKeyData key;
1064 1065 1066 1067
	RuleLock   *rulelock;
	int			numlocks;
	RewriteRule **rules;
	int			maxlocks;
1068

1069
	/*
B
Bruce Momjian 已提交
1070 1071
	 * Make the private context.  Parameters are set on the assumption that
	 * it'll probably not contain much data.
1072 1073 1074
	 */
	rulescxt = AllocSetContextCreate(CacheMemoryContext,
									 RelationGetRelationName(relation),
1075 1076 1077
									 ALLOCSET_SMALL_MINSIZE,
									 ALLOCSET_SMALL_INITSIZE,
									 ALLOCSET_SMALL_MAXSIZE);
1078 1079
	relation->rd_rulescxt = rulescxt;

1080
	/*
B
Bruce Momjian 已提交
1081 1082
	 * allocate an array to hold the rewrite rules (the array is extended if
	 * necessary)
1083 1084
	 */
	maxlocks = 4;
1085 1086
	rules = (RewriteRule **)
		MemoryContextAlloc(rulescxt, sizeof(RewriteRule *) * maxlocks);
1087 1088
	numlocks = 0;

1089 1090 1091 1092 1093 1094 1095 1096
	/*
	 * form a scan key
	 */
	ScanKeyInit(&key,
				Anum_pg_rewrite_ev_class,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));

1097
	/*
B
Bruce Momjian 已提交
1098
	 * open pg_rewrite and begin a scan
1099
	 *
1100 1101
	 * Note: since we scan the rules using RewriteRelRulenameIndexId, we will
	 * be reading the rules in name order, except possibly during
B
Bruce Momjian 已提交
1102 1103
	 * emergency-recovery operations (ie, IgnoreSystemIndexes). This in turn
	 * ensures that rules will be fired in name order.
1104
	 */
1105
	rewrite_desc = heap_open(RewriteRelationId, AccessShareLock);
1106 1107
	rewrite_tupdesc = RelationGetDescr(rewrite_desc);

1108 1109 1110 1111
	rewrite_scan = systable_beginscan(rewrite_desc,
									  RewriteRelRulenameIndexId,
									  true, SnapshotNow,
									  1, &key);
1112

1113
	while (HeapTupleIsValid(rewrite_tuple = systable_getnext(rewrite_scan)))
1114
	{
1115
		Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
1116
		bool		isnull;
1117 1118
		Datum		rule_datum;
		char	   *rule_str;
1119
		RewriteRule *rule;
1120

1121 1122
		rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
												  sizeof(RewriteRule));
1123

1124
		rule->ruleId = HeapTupleGetOid(rewrite_tuple);
1125

1126 1127
		rule->event = rewrite_form->ev_type - '0';
		rule->attrno = rewrite_form->ev_attr;
1128
		rule->enabled = rewrite_form->ev_enabled;
1129 1130
		rule->isInstead = rewrite_form->is_instead;

1131
		/*
B
Bruce Momjian 已提交
1132 1133 1134 1135
		 * Must use heap_getattr to fetch ev_action and ev_qual.  Also, the
		 * rule strings are often large enough to be toasted.  To avoid
		 * leaking memory in the caller's context, do the detoasting here so
		 * we can free the detoasted version.
1136 1137
		 */
		rule_datum = heap_getattr(rewrite_tuple,
1138
								  Anum_pg_rewrite_ev_action,
1139
								  rewrite_tupdesc,
B
Bruce Momjian 已提交
1140
								  &isnull);
B
Bruce Momjian 已提交
1141
		Assert(!isnull);
1142
		rule_str = TextDatumGetCString(rule_datum);
1143
		oldcxt = MemoryContextSwitchTo(rulescxt);
1144
		rule->actions = (List *) stringToNode(rule_str);
1145
		MemoryContextSwitchTo(oldcxt);
1146
		pfree(rule_str);
1147

1148 1149 1150 1151
		rule_datum = heap_getattr(rewrite_tuple,
								  Anum_pg_rewrite_ev_qual,
								  rewrite_tupdesc,
								  &isnull);
B
Bruce Momjian 已提交
1152
		Assert(!isnull);
1153
		rule_str = TextDatumGetCString(rule_datum);
1154
		oldcxt = MemoryContextSwitchTo(rulescxt);
1155
		rule->qual = (Node *) stringToNode(rule_str);
1156
		MemoryContextSwitchTo(oldcxt);
1157
		pfree(rule_str);
1158

1159 1160
		/*
		 * We want the rule's table references to be checked as though by the
B
Bruce Momjian 已提交
1161
		 * table owner, not the user referencing the rule.	Therefore, scan
1162
		 * through the rule's actions and set the checkAsUser field on all
B
Bruce Momjian 已提交
1163
		 * rtable entries.	We have to look at the qual as well, in case it
1164 1165
		 * contains sublinks.
		 *
B
Bruce Momjian 已提交
1166 1167 1168 1169 1170
		 * The reason for doing this when the rule is loaded, rather than when
		 * it is stored, is that otherwise ALTER TABLE OWNER would have to
		 * grovel through stored rules to update checkAsUser fields. Scanning
		 * the rule tree during load is relatively cheap (compared to
		 * constructing it in the first place), so we do it here.
1171 1172 1173 1174
		 */
		setRuleCheckAsUser((Node *) rule->actions, relation->rd_rel->relowner);
		setRuleCheckAsUser(rule->qual, relation->rd_rel->relowner);

1175
		if (numlocks >= maxlocks)
1176 1177
		{
			maxlocks *= 2;
1178 1179
			rules = (RewriteRule **)
				repalloc(rules, sizeof(RewriteRule *) * maxlocks);
1180
		}
1181
		rules[numlocks++] = rule;
1182
	}
1183

1184
	/*
B
Bruce Momjian 已提交
1185
	 * end the scan and close the attribute relation
1186
	 */
1187
	systable_endscan(rewrite_scan);
1188
	heap_close(rewrite_desc, AccessShareLock);
1189

1190
	/*
B
Bruce Momjian 已提交
1191
	 * form a RuleLock and insert into relation
1192
	 */
1193
	rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
1194 1195 1196 1197
	rulelock->numLocks = numlocks;
	rulelock->rules = rules;

	relation->rd_rules = rulelock;
1198 1199
}

1200
/*
1201 1202 1203 1204 1205 1206 1207 1208 1209
 *		equalRuleLocks
 *
 *		Determine whether two RuleLocks are equivalent
 *
 *		Probably this should be in the rules code someplace...
 */
static bool
equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
{
1210
	int			i;
1211

1212
	/*
B
Bruce Momjian 已提交
1213
	 * As of 7.3 we assume the rule ordering is repeatable, because
B
Bruce Momjian 已提交
1214 1215
	 * RelationBuildRuleLock should read 'em in a consistent order.  So just
	 * compare corresponding slots.
1216
	 */
1217 1218 1219 1220 1221 1222 1223 1224 1225
	if (rlock1 != NULL)
	{
		if (rlock2 == NULL)
			return false;
		if (rlock1->numLocks != rlock2->numLocks)
			return false;
		for (i = 0; i < rlock1->numLocks; i++)
		{
			RewriteRule *rule1 = rlock1->rules[i];
1226 1227 1228
			RewriteRule *rule2 = rlock2->rules[i];

			if (rule1->ruleId != rule2->ruleId)
1229 1230 1231 1232 1233
				return false;
			if (rule1->event != rule2->event)
				return false;
			if (rule1->attrno != rule2->attrno)
				return false;
1234 1235
			if (rule1->enabled != rule2->enabled)
				return false;
1236 1237
			if (rule1->isInstead != rule2->isInstead)
				return false;
1238
			if (!equal(rule1->qual, rule2->qual))
1239
				return false;
1240
			if (!equal(rule1->actions, rule2->actions))
1241 1242 1243 1244 1245 1246
				return false;
		}
	}
	else if (rlock2 != NULL)
		return false;
	return true;
1247 1248 1249
}


1250
/*
1251 1252
 *		RelationBuildDesc
 *
1253 1254 1255 1256
 *		Build a relation descriptor.  The caller must hold at least
 *		AccessShareLock on the target relid.
 *
 *		The new descriptor is inserted into the hash table if insertIt is true.
1257 1258 1259 1260
 *
 *		Returns NULL if no pg_class row could be found for the given relid
 *		(suggesting we are trying to access a just-deleted relation).
 *		Any other error is reported via elog.
1261
 */
1262
static Relation
1263
RelationBuildDesc(Oid targetRelId, bool insertIt)
1264
{
1265 1266
	Relation	relation;
	Oid			relid;
1267
	Relation    pg_class_relation;
1268
	HeapTuple	pg_class_tuple;
1269
	Form_pg_class relp;
1270

1271
	/*
B
Bruce Momjian 已提交
1272
	 * find the tuple in pg_class corresponding to the given relation id
1273
	 */
1274
	pg_class_tuple = ScanPgRelation(targetRelId, true, &pg_class_relation);
1275

1276
	/*
B
Bruce Momjian 已提交
1277
	 * if no such tuple exists, return NULL
1278 1279 1280 1281
	 */
	if (!HeapTupleIsValid(pg_class_tuple))
		return NULL;

1282
	/*
B
Bruce Momjian 已提交
1283
	 * get information from the pg_class_tuple
1284
	 */
1285
	relid = HeapTupleGetOid(pg_class_tuple);
1286
	relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
1287
	heap_close(pg_class_relation, AccessShareLock);
1288

1289
	/*
B
Bruce Momjian 已提交
1290
	 * allocate storage for the relation descriptor, and copy pg_class_tuple
1291
	 * to relation->rd_rel and new fields into relation->rd_newfields.
1292
	 */
1293
	relation = AllocateRelationDesc(relp);
1294

1295
	/*
B
Bruce Momjian 已提交
1296
	 * initialize the relation's relation id (relation->rd_id)
1297
	 */
1298
	RelationGetRelid(relation) = relid;
1299

1300
	/*
B
Bruce Momjian 已提交
1301 1302 1303
	 * normal relations are not nailed into the cache; nor can a pre-existing
	 * relation be new.  It could be temp though.  (Actually, it could be new
	 * too, but it's okay to forget that fact if forced to flush the entry.)
1304
	 */
1305
	relation->rd_refcnt = 0;
1306
	relation->rd_isnailed = false;
1307
	relation->rd_createSubid = InvalidSubTransactionId;
1308
	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1309
	relation->rd_istemp = isTempOrToastNamespace(relation->rd_rel->relnamespace);
1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322
	relation->rd_issyscat = (strncmp(relation->rd_rel->relname.data, "pg_", 3) == 0);

    /*
     * CDB: On QEs, temp relations must use shared buffer cache so data
     * will be visible to all segmates.  On QD, sequence objects must
     * use shared buffer cache so data will be visible to sequence server.
     */
    if (relation->rd_istemp &&
        relation->rd_rel->relkind != RELKIND_SEQUENCE &&
        Gp_role != GP_ROLE_EXECUTE)
        relation->rd_isLocalBuf = true;
    else
        relation->rd_isLocalBuf = false;
1323

1324
	/*
B
Bruce Momjian 已提交
1325
	 * initialize the tuple descriptor (relation->rd_att).
1326
	 */
1327
	RelationBuildTupleDesc(relation);
1328

1329
	/*
B
Bruce Momjian 已提交
1330
	 * Fetch rules and triggers that affect this relation
1331
	 */
1332
	if (relation->rd_rel->relhasrules)
1333 1334
		RelationBuildRuleLock(relation);
	else
1335
	{
1336
		relation->rd_rules = NULL;
1337 1338
		relation->rd_rulescxt = NULL;
	}
1339

1340
	if (relation->rd_rel->reltriggers > 0)
1341 1342 1343 1344
		RelationBuildTriggers(relation);
	else
		relation->trigdesc = NULL;

1345
	/*
1346
	 * if it's an index, initialize index-related information
1347
	 */
1348
	if (OidIsValid(relation->rd_rel->relam))
1349
		RelationInitIndexAccessInfo(relation);
1350

1351 1352 1353 1354 1355 1356 1357 1358 1359
	/*
	 * if it's an append-only table, get information from pg_appendonly
	 */
	if (relation->rd_rel->relstorage == RELSTORAGE_AOROWS ||
		relation->rd_rel->relstorage == RELSTORAGE_AOCOLS)
	{
		RelationInitAppendOnlyInfo(relation);
	}

1360 1361 1362
	/* extract reloptions if any */
	RelationParseRelOptions(relation, pg_class_tuple);

1363
	/*
B
Bruce Momjian 已提交
1364
	 * initialize the relation lock manager information
1365 1366 1367
	 */
	RelationInitLockInfo(relation);		/* see lmgr.c */

1368 1369 1370 1371
	/*
	 * initialize physical addressing information for the relation
	 */
	RelationInitPhysicalAddr(relation);
1372

1373
	/* make sure relation is marked as having no open file yet */
1374
	relation->rd_smgr = NULL;
1375

1376 1377 1378 1379 1380 1381 1382 1383 1384
    /*
     * initialize Greenplum Database partitioning info
     */
    if (relation->rd_rel->relkind == RELKIND_RELATION &&
        !IsSystemRelation(relation))
        relation->rd_cdbpolicy = GpPolicyFetch(CacheMemoryContext, targetRelId);

    relation->rd_cdbDefaultStatsWarningIssued = false;

B
Bruce Momjian 已提交
1385 1386 1387 1388 1389
	/*
	 * now we can free the memory allocated for pg_class_tuple
	 */
	heap_freetuple(pg_class_tuple);

1390
	/*
1391
	 * Insert newly created relation into relcache hash table, if requested.
1392
	 */
1393 1394
	if (insertIt)
		RelationCacheInsert(relation);
1395

1396 1397 1398
	/* It's fully valid */
	relation->rd_isvalid = true;

1399
	return relation;
1400 1401
}

1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418
/*
 * Initialize the physical addressing info (RelFileNode) for a relcache entry
 */
static void
RelationInitPhysicalAddr(Relation relation)
{
	if (relation->rd_rel->reltablespace)
		relation->rd_node.spcNode = relation->rd_rel->reltablespace;
	else
		relation->rd_node.spcNode = MyDatabaseTableSpace;
	if (relation->rd_rel->relisshared)
		relation->rd_node.dbNode = InvalidOid;
	else
		relation->rd_node.dbNode = MyDatabaseId;
	relation->rd_node.relNode = relation->rd_rel->relfilenode;
}

1419 1420 1421 1422 1423
/*
 * Initialize index-access-method support data for an index relation
 */
void
RelationInitIndexAccessInfo(Relation relation)
1424
{
1425 1426
	HeapTuple	tuple;
	Form_pg_am	aform;
1427
	Datum		indclassDatum;
1428
	Datum		indoptionDatum;
1429
	bool		isnull;
1430
	oidvector  *indclass;
B
Bruce Momjian 已提交
1431
	int2vector *indoption;
1432
	MemoryContext indexcxt;
1433
	MemoryContext oldcontext;
1434
	int			natts;
1435 1436
	uint16		amstrategies;
	uint16		amsupport;
1437 1438

	/*
1439
	 * Make a copy of the pg_index entry for the index.  Since pg_index
B
Bruce Momjian 已提交
1440 1441
	 * contains variable-length and possibly-null fields, we have to do this
	 * honestly rather than just treating it as a Form_pg_index struct.
1442 1443 1444 1445 1446
	 */
	tuple = SearchSysCache(INDEXRELID,
						   ObjectIdGetDatum(RelationGetRelid(relation)),
						   0, 0, 0);
	if (!HeapTupleIsValid(tuple))
1447
		elog(ERROR, "cache lookup failed for index %u",
1448
			 RelationGetRelid(relation));
1449 1450 1451 1452
	oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_indextuple = heap_copytuple(tuple);
	relation->rd_index = (Form_pg_index) GETSTRUCT(relation->rd_indextuple);
	MemoryContextSwitchTo(oldcontext);
1453 1454 1455 1456 1457 1458 1459 1460 1461
	ReleaseSysCache(tuple);

	/*
	 * Make a copy of the pg_am entry for the index's access method
	 */
	tuple = SearchSysCache(AMOID,
						   ObjectIdGetDatum(relation->rd_rel->relam),
						   0, 0, 0);
	if (!HeapTupleIsValid(tuple))
1462
		elog(ERROR, "cache lookup failed for access method %u",
1463 1464 1465 1466 1467
			 relation->rd_rel->relam);
	aform = (Form_pg_am) MemoryContextAlloc(CacheMemoryContext, sizeof *aform);
	memcpy(aform, GETSTRUCT(tuple), sizeof *aform);
	ReleaseSysCache(tuple);
	relation->rd_am = aform;
1468 1469

	natts = relation->rd_rel->relnatts;
1470
	if (natts != relation->rd_index->indnatts)
1471
		elog(ERROR, "relnatts disagrees with indnatts for index %u",
1472
			 RelationGetRelid(relation));
1473 1474
	amstrategies = aform->amstrategies;
	amsupport = aform->amsupport;
1475

1476
	/*
B
Bruce Momjian 已提交
1477 1478 1479
	 * Make the private context to hold index access info.	The reason we need
	 * a context, and not just a couple of pallocs, is so that we won't leak
	 * any subsidiary info attached to fmgr lookup records.
1480 1481 1482 1483 1484 1485
	 *
	 * Context parameters are set on the assumption that it'll probably not
	 * contain much data.
	 */
	indexcxt = AllocSetContextCreate(CacheMemoryContext,
									 RelationGetRelationName(relation),
1486 1487 1488
									 ALLOCSET_SMALL_MINSIZE,
									 ALLOCSET_SMALL_INITSIZE,
									 ALLOCSET_SMALL_MAXSIZE);
1489 1490 1491 1492 1493
	relation->rd_indexcxt = indexcxt;

	/*
	 * Allocate arrays to hold data
	 */
1494 1495 1496
	relation->rd_aminfo = (RelationAmInfo *)
		MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));

1497 1498 1499 1500 1501
	relation->rd_opfamily = (Oid *)
		MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
	relation->rd_opcintype = (Oid *)
		MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));

1502
	if (amstrategies > 0)
1503
		relation->rd_operator = (Oid *)
1504 1505
			MemoryContextAllocZero(indexcxt,
								   natts * amstrategies * sizeof(Oid));
1506
	else
1507
		relation->rd_operator = NULL;
1508

1509
	if (amsupport > 0)
1510
	{
1511
		int			nsupport = natts * amsupport;
1512

1513
		relation->rd_support = (RegProcedure *)
1514
			MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
1515
		relation->rd_supportinfo = (FmgrInfo *)
1516
			MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
1517 1518
	}
	else
1519
	{
1520 1521
		relation->rd_support = NULL;
		relation->rd_supportinfo = NULL;
1522
	}
1523

1524 1525 1526
	relation->rd_indoption = (int16 *)
		MemoryContextAllocZero(indexcxt, natts * sizeof(int16));

1527 1528
	/*
	 * indclass cannot be referenced directly through the C struct, because it
B
Bruce Momjian 已提交
1529 1530
	 * comes after the variable-width indkey field.  Must extract the datum
	 * the hard way...
1531 1532 1533 1534 1535 1536 1537
	 */
	indclassDatum = fastgetattr(relation->rd_indextuple,
								Anum_pg_index_indclass,
								GetPgIndexDescriptor(),
								&isnull);
	Assert(!isnull);
	indclass = (oidvector *) DatumGetPointer(indclassDatum);
1538

1539
	/*
B
Bruce Momjian 已提交
1540 1541 1542
	 * Fill the operator and support procedure OID arrays, as well as the info
	 * about opfamilies and opclass input types.  (aminfo and supportinfo are
	 * left as zeroes, and are filled on-the-fly when used)
1543
	 */
1544 1545 1546
	IndexSupportInitialize(indclass,
						   relation->rd_operator, relation->rd_support,
						   relation->rd_opfamily, relation->rd_opcintype,
1547
						   amstrategies, amsupport, natts);
1548

1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559
	/*
	 * Similarly extract indoption and copy it to the cache entry
	 */
	indoptionDatum = fastgetattr(relation->rd_indextuple,
								 Anum_pg_index_indoption,
								 GetPgIndexDescriptor(),
								 &isnull);
	Assert(!isnull);
	indoption = (int2vector *) DatumGetPointer(indoptionDatum);
	memcpy(relation->rd_indoption, indoption->values, natts * sizeof(int16));

1560 1561 1562 1563 1564
	/*
	 * expressions and predicate cache will be filled later
	 */
	relation->rd_indexprs = NIL;
	relation->rd_indpred = NIL;
1565
	relation->rd_amcache = NULL;
1566 1567
}

1568
/*
1569
 * IndexSupportInitialize
1570
 *		Initializes an index's cached opclass information,
1571
 *		given the index's pg_index.indclass entry.
1572
 *
1573 1574
 * Data is returned into *indexOperator, *indexSupport, *opFamily, and
 * *opcInType, which are arrays allocated by the caller.
1575 1576 1577 1578 1579 1580 1581
 *
 * The caller also passes maxStrategyNumber, maxSupportNumber, and
 * maxAttributeNumber, since these indicate the size of the arrays
 * it has allocated --- but in practice these numbers must always match
 * those obtainable from the system catalog entries for the index and
 * access method.
 */
1582
void
1583
IndexSupportInitialize(oidvector *indclass,
1584 1585
					   Oid *indexOperator,
					   RegProcedure *indexSupport,
1586 1587
					   Oid *opFamily,
					   Oid *opcInType,
1588 1589 1590 1591 1592 1593 1594 1595 1596 1597
					   StrategyNumber maxStrategyNumber,
					   StrategyNumber maxSupportNumber,
					   AttrNumber maxAttributeNumber)
{
	int			attIndex;

	for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
	{
		OpClassCacheEnt *opcentry;

1598
		if (!OidIsValid(indclass->values[attIndex]))
1599
			elog(ERROR, "bogus pg_index tuple");
1600 1601

		/* look up the info for this opclass, using a cache */
1602
		opcentry = LookupOpclassInfo(indclass->values[attIndex],
1603 1604 1605
									 maxStrategyNumber,
									 maxSupportNumber);

1606
		/* copy cached data into relcache entry */
1607 1608
		opFamily[attIndex] = opcentry->opcfamily;
		opcInType[attIndex] = opcentry->opcintype;
1609
		if (maxStrategyNumber > 0)
1610 1611 1612
			memcpy(&indexOperator[attIndex * maxStrategyNumber],
				   opcentry->operatorOids,
				   maxStrategyNumber * sizeof(Oid));
1613
		if (maxSupportNumber > 0)
1614 1615 1616
			memcpy(&indexSupport[attIndex * maxSupportNumber],
				   opcentry->supportProcs,
				   maxSupportNumber * sizeof(RegProcedure));
1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631
	}
}

/*
 * LookupOpclassInfo
 *
 * This routine maintains a per-opclass cache of the information needed
 * by IndexSupportInitialize().  This is more efficient than relying on
 * the catalog cache, because we can load all the info about a particular
 * opclass in a single indexscan of pg_amproc or pg_amop.
 *
 * The information from pg_am about expected range of strategy and support
 * numbers is passed in, rather than being looked up, mainly because the
 * caller will have it already.
 *
1632 1633 1634
 * Note there is no provision for flushing the cache.  This is OK at the
 * moment because there is no way to ALTER any interesting properties of an
 * existing opclass --- all you can do is drop it, which will result in
1635
 * a useless but harmless dead entry in the cache.  To support altering
1636 1637 1638
 * opclass membership (not the same as opfamily membership!), we'd need to
 * be able to flush this cache as well as the contents of relcache entries
 * for indexes.
1639 1640 1641 1642 1643 1644 1645 1646
 */
static OpClassCacheEnt *
LookupOpclassInfo(Oid operatorClassOid,
				  StrategyNumber numStrats,
				  StrategyNumber numSupport)
{
	OpClassCacheEnt *opcentry;
	bool		found;
1647 1648
	Relation	rel;
	SysScanDesc scan;
1649
	ScanKeyData skey[3];
1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663
	HeapTuple	htup;
	bool		indexOK;

	if (OpClassCache == NULL)
	{
		/* First time through: initialize the opclass cache */
		HASHCTL		ctl;

		if (!CacheMemoryContext)
			CreateCacheMemoryContext();

		MemSet(&ctl, 0, sizeof(ctl));
		ctl.keysize = sizeof(Oid);
		ctl.entrysize = sizeof(OpClassCacheEnt);
1664
		ctl.hash = oid_hash;
1665 1666 1667 1668 1669 1670 1671 1672
		OpClassCache = hash_create("Operator class cache", 64,
								   &ctl, HASH_ELEM | HASH_FUNCTION);
	}

	opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
											   (void *) &operatorClassOid,
											   HASH_ENTER, &found);

1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694
	if (!found)
	{
		/* Need to allocate memory for new entry */
		opcentry->valid = false;	/* until known OK */
		opcentry->numStrats = numStrats;
		opcentry->numSupport = numSupport;

		if (numStrats > 0)
			opcentry->operatorOids = (Oid *)
				MemoryContextAllocZero(CacheMemoryContext,
									   numStrats * sizeof(Oid));
		else
			opcentry->operatorOids = NULL;

		if (numSupport > 0)
			opcentry->supportProcs = (RegProcedure *)
				MemoryContextAllocZero(CacheMemoryContext,
									   numSupport * sizeof(RegProcedure));
		else
			opcentry->supportProcs = NULL;
	}
	else
1695 1696 1697 1698 1699
	{
		Assert(numStrats == opcentry->numStrats);
		Assert(numSupport == opcentry->numSupport);
	}

1700 1701 1702 1703 1704 1705 1706 1707 1708 1709
	/*
	 * When testing for cache-flush hazards, we intentionally disable the
	 * operator class cache and force reloading of the info on each call.
	 * This is helpful because we want to test the case where a cache flush
	 * occurs while we are loading the info, and it's very hard to provoke
	 * that if this happens only once per opclass per backend.
	 */
#if defined(CLOBBER_CACHE_ALWAYS)
	opcentry->valid = false;
#endif
1710

1711 1712
	if (opcentry->valid)
		return opcentry;
1713 1714

	/*
1715 1716
	 * Need to fill in new entry.
	 *
B
Bruce Momjian 已提交
1717 1718 1719
	 * To avoid infinite recursion during startup, force heap scans if we're
	 * looking up info for the opclasses used by the indexes we would like to
	 * reference here.
1720 1721 1722 1723 1724
	 */
	indexOK = criticalRelcachesBuilt ||
		(operatorClassOid != OID_BTREE_OPS_OID &&
		 operatorClassOid != INT2_BTREE_OPS_OID);

1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752
	/*
	 * We have to fetch the pg_opclass row to determine its opfamily and
	 * opcintype, which are needed to look up the operators and functions.
	 * It'd be convenient to use the syscache here, but that probably doesn't
	 * work while bootstrapping.
	 */
	ScanKeyInit(&skey[0],
				ObjectIdAttributeNumber,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(operatorClassOid));
	rel = heap_open(OperatorClassRelationId, AccessShareLock);
	scan = systable_beginscan(rel, OpclassOidIndexId, indexOK,
							  SnapshotNow, 1, skey);

	if (HeapTupleIsValid(htup = systable_getnext(scan)))
	{
		Form_pg_opclass opclassform = (Form_pg_opclass) GETSTRUCT(htup);

		opcentry->opcfamily = opclassform->opcfamily;
		opcentry->opcintype = opclassform->opcintype;
	}
	else
		elog(ERROR, "could not find tuple for opclass %u", operatorClassOid);

	systable_endscan(scan);
	heap_close(rel, AccessShareLock);


1753
	/*
B
Bruce Momjian 已提交
1754
	 * Scan pg_amop to obtain operators for the opclass.  We only fetch the
1755
	 * default ones (those with lefttype = righttype = opcintype).
1756 1757 1758
	 */
	if (numStrats > 0)
	{
1759
		ScanKeyInit(&skey[0],
1760
					Anum_pg_amop_amopfamily,
1761
					BTEqualStrategyNumber, F_OIDEQ,
1762
					ObjectIdGetDatum(opcentry->opcfamily));
1763
		ScanKeyInit(&skey[1],
1764 1765 1766 1767 1768
					Anum_pg_amop_amoplefttype,
					BTEqualStrategyNumber, F_OIDEQ,
					ObjectIdGetDatum(opcentry->opcintype));
		ScanKeyInit(&skey[2],
					Anum_pg_amop_amoprighttype,
1769
					BTEqualStrategyNumber, F_OIDEQ,
1770
					ObjectIdGetDatum(opcentry->opcintype));
1771 1772
		rel = heap_open(AccessMethodOperatorRelationId, AccessShareLock);
		scan = systable_beginscan(rel, AccessMethodStrategyIndexId, indexOK,
1773
								  SnapshotNow, 3, skey);
1774 1775

		while (HeapTupleIsValid(htup = systable_getnext(scan)))
1776 1777 1778 1779 1780
		{
			Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(htup);

			if (amopform->amopstrategy <= 0 ||
				(StrategyNumber) amopform->amopstrategy > numStrats)
1781
				elog(ERROR, "invalid amopstrategy number %d for opclass %u",
1782 1783 1784 1785 1786
					 amopform->amopstrategy, operatorClassOid);
			opcentry->operatorOids[amopform->amopstrategy - 1] =
				amopform->amopopr;
		}

1787 1788
		systable_endscan(scan);
		heap_close(rel, AccessShareLock);
1789 1790 1791
	}

	/*
B
Bruce Momjian 已提交
1792
	 * Scan pg_amproc to obtain support procs for the opclass.	We only fetch
1793
	 * the default ones (those with lefttype = righttype = opcintype).
1794 1795 1796
	 */
	if (numSupport > 0)
	{
1797
		ScanKeyInit(&skey[0],
1798
					Anum_pg_amproc_amprocfamily,
1799
					BTEqualStrategyNumber, F_OIDEQ,
1800
					ObjectIdGetDatum(opcentry->opcfamily));
1801
		ScanKeyInit(&skey[1],
1802
					Anum_pg_amproc_amproclefttype,
1803
					BTEqualStrategyNumber, F_OIDEQ,
1804 1805 1806 1807 1808
					ObjectIdGetDatum(opcentry->opcintype));
		ScanKeyInit(&skey[2],
					Anum_pg_amproc_amprocrighttype,
					BTEqualStrategyNumber, F_OIDEQ,
					ObjectIdGetDatum(opcentry->opcintype));
1809 1810
		rel = heap_open(AccessMethodProcedureRelationId, AccessShareLock);
		scan = systable_beginscan(rel, AccessMethodProcedureIndexId, indexOK,
1811
								  SnapshotNow, 3, skey);
1812 1813

		while (HeapTupleIsValid(htup = systable_getnext(scan)))
1814 1815 1816 1817 1818
		{
			Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);

			if (amprocform->amprocnum <= 0 ||
				(StrategyNumber) amprocform->amprocnum > numSupport)
1819
				elog(ERROR, "invalid amproc number %d for opclass %u",
1820 1821 1822 1823 1824 1825
					 amprocform->amprocnum, operatorClassOid);

			opcentry->supportProcs[amprocform->amprocnum - 1] =
				amprocform->amproc;
		}

1826 1827
		systable_endscan(scan);
		heap_close(rel, AccessShareLock);
1828 1829 1830 1831 1832 1833 1834 1835 1836 1837
	}

	opcentry->valid = true;
	return opcentry;
}


/*
 *		formrdesc
 *
1838 1839
 *		This is a special cut-down version of RelationBuildDesc(),
 *		used while initializing the relcache.
1840
 *		The relation descriptor is built just from the supplied parameters,
1841 1842
 *		without actually looking at any system table entries.  We cheat
 *		quite a lot since we only need to work for a few basic system
1843 1844
 *		catalogs.
 *
1845 1846 1847
 * formrdesc is currently used for: pg_database, pg_authid, pg_auth_members,
 * pg_class, pg_attribute, pg_proc, and pg_type
 * (see RelationCacheInitializePhase2/3).
1848
 *
1849 1850
 * Note that these catalogs can't have constraints (except attnotnull),
 * default values, rules, or triggers, since we don't cope with any of that.
1851 1852 1853
 * (Well, actually, this only matters for properties that need to be valid
 * during bootstrap or before RelationCacheInitializePhase3 runs, and none of
 * these properties matter then...)
1854
 *
1855
 * NOTE: we assume we are already switched into CacheMemoryContext.
1856 1857
 */
static void
1858
formrdesc(const char *relationName, Oid relationReltype,
1859 1860
		  bool isshared, bool hasoids,
		  int natts, const FormData_pg_attribute *attrs)
1861
{
1862
	Relation	relation;
1863
	int			i;
1864
	bool		has_not_null;
1865

1866
	/*
1867
	 * allocate new relation desc, clear all fields of reldesc
1868
	 */
1869
	relation = (Relation) palloc0(sizeof(RelationData));
1870 1871 1872
	relation->rd_targblock = InvalidBlockNumber;

	/* make sure relation is marked as having no open file yet */
1873
	relation->rd_smgr = NULL;
1874

1875
	/*
1876
	 * initialize reference count: 1 because it is nailed in cache
1877
	 */
1878
	relation->rd_refcnt = 1;
1879

1880
	/*
B
Bruce Momjian 已提交
1881 1882
	 * all entries built with this routine are nailed-in-cache; none are for
	 * new or temp relations.
1883
	 */
1884
	relation->rd_isnailed = true;
1885
	relation->rd_createSubid = InvalidSubTransactionId;
1886
	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1887
	relation->rd_istemp = false;
1888 1889
	relation->rd_issyscat = (strncmp(relationName, "pg_", 3) == 0);	/* GP */
    relation->rd_isLocalBuf = false;    /*CDB*/
1890

1891
	/*
B
Bruce Momjian 已提交
1892
	 * initialize relation tuple form
1893
	 *
1894 1895
	 * The data we insert here is pretty incomplete/bogus, but it'll serve to
	 * get us launched.  RelationCacheInitializePhase2() will read the real
1896 1897 1898
	 * data from pg_class and replace what we've done here.  Note in particular
	 * that relowner is left as zero; this cues RelationCacheInitializePhase2
	 * that the real data isn't there yet.
1899
	 */
1900
	relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
1901

1902 1903
	namestrcpy(&relation->rd_rel->relname, relationName);
	relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
1904
	relation->rd_rel->reltype = relationReltype;
1905 1906

	/*
B
Bruce Momjian 已提交
1907
	 * It's important to distinguish between shared and non-shared relations,
1908
	 * even at bootstrap time, to make sure we know where they are stored.
1909
	 */
1910 1911 1912
	relation->rd_rel->relisshared = isshared;
	if (isshared)
		relation->rd_rel->reltablespace = GLOBALTABLESPACE_OID;
1913

1914 1915
	relation->rd_rel->relpages = 0;
	relation->rd_rel->reltuples = 0;
1916
	relation->rd_rel->relkind = RELKIND_RELATION;
1917
	relation->rd_rel->relstorage = RELSTORAGE_HEAP;
1918
	relation->rd_rel->relhasoids = hasoids;
1919
	relation->rd_rel->relnatts = (int16) natts;
1920

1921 1922 1923 1924 1925 1926
	/*
	 * Physical file-system information.
	 */
	relation->rd_segfile0_relationnodeinfo.isPresent = false;
	relation->rd_segfile0_relationnodeinfo.tidAllowedToBeZero = false;
	
1927
	/*
B
Bruce Momjian 已提交
1928
	 * initialize attribute tuple form
1929
	 *
B
Bruce Momjian 已提交
1930
	 * Unlike the case with the relation tuple, this data had better be right
B
Bruce Momjian 已提交
1931 1932
	 * because it will never be replaced.  The input values must be correctly
	 * defined by macros in src/include/catalog/ headers.
1933
	 */
1934
	relation->rd_att = CreateTemplateTupleDesc(natts, hasoids);
1935 1936
	relation->rd_att->tdrefcount = 1;	/* mark as refcounted */

1937 1938
	relation->rd_att->tdtypeid = relationReltype;
	relation->rd_att->tdtypmod = -1;	/* unnecessary, but... */
1939

1940
	/*
B
Bruce Momjian 已提交
1941
	 * initialize tuple desc info
1942
	 */
1943
	has_not_null = false;
1944 1945
	for (i = 0; i < natts; i++)
	{
1946
		memcpy(relation->rd_att->attrs[i],
1947 1948 1949
			   &attrs[i],
			   ATTRIBUTE_FIXED_PART_SIZE);
		has_not_null |= attrs[i].attnotnull;
1950 1951
		/* make sure attcacheoff is valid */
		relation->rd_att->attrs[i]->attcacheoff = -1;
1952 1953
	}

1954 1955 1956
	/* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
	relation->rd_att->attrs[0]->attcacheoff = 0;

1957 1958 1959 1960 1961 1962 1963 1964 1965
	/* mark not-null status */
	if (has_not_null)
	{
		TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

		constr->has_not_null = true;
		relation->rd_att->constr = constr;
	}

1966
	/*
1967
	 * initialize relation id from info in att array (my, this is ugly)
1968
	 */
1969
	RelationGetRelid(relation) = relation->rd_att->attrs[0]->attrelid;
1970
	relation->rd_rel->relfilenode = RelationGetRelid(relation);
1971

1972
	/*
1973
	 * initialize the relation lock manager information
1974 1975 1976
	 */
	RelationInitLockInfo(relation);		/* see lmgr.c */

1977 1978 1979 1980
	/*
	 * initialize physical addressing information for the relation
	 */
	RelationInitPhysicalAddr(relation);
1981

1982
	/*
B
Bruce Momjian 已提交
1983
	 * initialize the rel-has-index flag, using hardwired knowledge
1984
	 */
1985 1986 1987 1988 1989 1990
	if (IsBootstrapProcessingMode())
	{
		/* In bootstrap mode, we have no indexes */
		relation->rd_rel->relhasindex = false;
	}
	else
1991
	{
1992 1993
		/* Otherwise, all the rels formrdesc is used for have indexes */
		relation->rd_rel->relhasindex = true;
1994 1995
	}

1996
	/*
B
Bruce Momjian 已提交
1997
	 * add new reldesc to relcache
1998
	 */
1999
	RelationCacheInsert(relation);
2000 2001 2002

	/* It's fully valid */
	relation->rd_isvalid = true;
2003 2004 2005
}


2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048
static void
RelationInitAppendOnlyInfo(Relation relation)
{
	Relation	pg_appendonly_rel;
	HeapTuple	tuple;
	MemoryContext oldcontext;
	SysScanDesc scan;
	ScanKeyData skey;

	/*
	 * Check the pg_appendonly relation to be certain the ao table
	 * is there.
	 */
	pg_appendonly_rel = heap_open(AppendOnlyRelationId, AccessShareLock);

	ScanKeyInit(&skey,
				Anum_pg_appendonly_relid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
	/* FIXME: isn't there a mode in relcache code to *not* use an index? Should
	 * we do something here to obey it?
	 */
	scan = systable_beginscan(pg_appendonly_rel, AppendOnlyRelidIndexId, true,
							  SnapshotNow, 1, &skey);

	tuple = systable_getnext(scan);
	if (!tuple)
		elog(ERROR, "could not find pg_appendonly tuple for relation \"%s\"",
			 RelationGetRelationName(relation));

	/*
	 * Make a copy of the pg_appendonly entry for the table.
	 */
	oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_aotuple = heap_copytuple(tuple);
	relation->rd_appendonly = (Form_pg_appendonly) GETSTRUCT(relation->rd_aotuple);
	MemoryContextSwitchTo(oldcontext);
	systable_endscan(scan);
	heap_close(pg_appendonly_rel, AccessShareLock);

}


2049
/* ----------------------------------------------------------------
2050
 *				 Relation Descriptor Lookup Interface
2051 2052 2053
 * ----------------------------------------------------------------
 */

2054
/*
2055
 *		RelationIdGetRelation
2056
 *
2057
 *		Lookup a reldesc by OID; make one if not already in cache.
2058
 *
2059 2060 2061
 *		Returns NULL if no pg_class row could be found for the given relid
 *		(suggesting we are trying to access a just-deleted relation).
 *		Any other error is reported via elog.
2062
 *
2063 2064 2065 2066
 *		NB: caller should already have at least AccessShareLock on the
 *		relation ID, else there are nasty race conditions.
 *
 *		NB: relation ref count is incremented, or set to 1 if new entry.
2067 2068
 *		Caller should eventually decrement count.  (Usually,
 *		that happens by calling RelationClose().)
2069 2070
 */
Relation
2071
RelationIdGetRelation(Oid relationId)
2072
{
2073
	Relation	rd;
2074

2075 2076 2077
	/*
	 * first try to find reldesc in the cache
	 */
2078 2079 2080
	RelationIdCacheLookup(relationId, rd);

	if (RelationIsValid(rd))
2081
	{
2082
		RelationIncrementReferenceCount(rd);
2083
		/* revalidate cache entry if necessary */
2084
		if (!rd->rd_isvalid)
2085 2086 2087 2088 2089 2090 2091
		{
			/*
			 * Indexes only have a limited number of possible schema changes,
			 * and we don't want to use the full-blown procedure because it's
			 * a headache for indexes that reload itself depends on.
			 */
			if (rd->rd_rel->relkind == RELKIND_INDEX)
2092
				RelationReloadIndexInfo(rd);
2093 2094
			else
				RelationClearRelation(rd, true);
2095
		}
2096
		return rd;
2097
	}
2098

2099
	/*
B
Bruce Momjian 已提交
2100 2101
	 * no reldesc in the cache, so have RelationBuildDesc() build one and add
	 * it.
2102
	 */
2103
	rd = RelationBuildDesc(relationId, true);
2104 2105
	if (RelationIsValid(rd))
		RelationIncrementReferenceCount(rd);
2106

2107 2108 2109 2110
	return rd;
}

/* ----------------------------------------------------------------
2111
 *				cache invalidation support routines
2112 2113 2114
 * ----------------------------------------------------------------
 */

2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138
/*
 * RelationIncrementReferenceCount
 *		Increments relation reference count.
 *
 * Note: bootstrap mode has its own weird ideas about relation refcount
 * behavior; we ought to fix it someday, but for now, just disable
 * reference count ownership tracking in bootstrap mode.
 */
void
RelationIncrementReferenceCount(Relation rel)
{
	ResourceOwnerEnlargeRelationRefs(CurrentResourceOwner);
	rel->rd_refcnt += 1;
	if (!IsBootstrapProcessingMode())
		ResourceOwnerRememberRelationRef(CurrentResourceOwner, rel);
}

/*
 * RelationDecrementReferenceCount
 *		Decrements relation reference count.
 */
void
RelationDecrementReferenceCount(Relation rel)
{
2139 2140 2141 2142 2143 2144 2145 2146 2147 2148
	if (rel->rd_refcnt <= 0)
	{
		elog(ERROR,
			 "Relation decrement reference count found relation %u/%u/%u with bad count (reference count %d)",
			 rel->rd_node.spcNode,
			 rel->rd_node.dbNode,
			 rel->rd_node.relNode,
			 rel->rd_refcnt);
	}
	
2149 2150 2151 2152 2153
	rel->rd_refcnt -= 1;
	if (!IsBootstrapProcessingMode())
		ResourceOwnerForgetRelationRef(CurrentResourceOwner, rel);
}

2154
/*
2155 2156
 * RelationClose - close an open relation
 *
2157 2158 2159 2160 2161 2162 2163
 *	Actually, we just decrement the refcount.
 *
 *	NOTE: if compiled with -DRELCACHE_FORCE_RELEASE then relcache entries
 *	will be freed as soon as their refcount goes to zero.  In combination
 *	with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
 *	to catch references to already-released relcache entries.  It slows
 *	things down quite a bit, however.
2164 2165 2166 2167
 */
void
RelationClose(Relation relation)
{
2168 2169
	/* Note: no locking manipulations needed */
	RelationDecrementReferenceCount(relation);
2170 2171

#ifdef RELCACHE_FORCE_RELEASE
2172
	if (RelationHasReferenceCountZero(relation) &&
2173 2174
		relation->rd_createSubid == InvalidSubTransactionId &&
		relation->rd_newRelfilenodeSubid == InvalidSubTransactionId)
2175 2176
		RelationClearRelation(relation, false);
#endif
2177 2178
}

2179
/*
2180
 * RelationReloadIndexInfo - reload minimal information for an open index
2181
 *
2182 2183 2184 2185 2186 2187 2188
 *	This function is used only for indexes.  A relcache inval on an index
 *	can mean that its pg_class or pg_index row changed.  There are only
 *	very limited changes that are allowed to an existing index's schema,
 *	so we can update the relcache entry without a complete rebuild; which
 *	is fortunate because we can't rebuild an index entry that is "nailed"
 *	and/or in active use.  We support full replacement of the pg_class row,
 *	as well as updates of a few simple fields of the pg_index row.
2189
 *
2190
 *	We can't necessarily reread the catalog rows right away; we might be
2191 2192
 *	in a failed transaction when we receive the SI notification.  If so,
 *	RelationClearRelation just marks the entry as invalid by setting
2193
 *	rd_isvalid to false.  This routine is called to fix the entry when it
2194
 *	is next needed.
2195 2196 2197 2198
 *
 *	We assume that at the time we are called, we have at least AccessShareLock
 *	on the target index.  (Note: in the calls from RelationClearRelation,
 *	this is legitimate because we know the rel has positive refcount.)
2199 2200 2201 2202 2203 2204
 *
 *	If the target index is an index on pg_class or pg_index, we'd better have
 *	previously gotten at least AccessShareLock on its underlying catalog,
 *	else we are at risk of deadlock against someone trying to exclusive-lock
 *	the heap and index in that order.  This is ensured in current usage by
 *	only applying this to indexes being opened or having positive refcount.
H
Hiroshi Inoue 已提交
2205 2206
 */
static void
2207
RelationReloadIndexInfo(Relation relation)
H
Hiroshi Inoue 已提交
2208
{
2209
	bool		indexOK;
H
Hiroshi Inoue 已提交
2210
	HeapTuple	pg_class_tuple;
B
Bruce Momjian 已提交
2211
	Form_pg_class relp;
H
Hiroshi Inoue 已提交
2212

2213 2214 2215 2216 2217
	/* Should be called only for invalidated indexes */
	Assert(relation->rd_rel->relkind == RELKIND_INDEX &&
		   !relation->rd_isvalid);
	/* Should be closed at smgr level */
	Assert(relation->rd_smgr == NULL);
B
Bruce Momjian 已提交
2218

2219 2220 2221 2222 2223 2224
	/* Make sure targblock is reset in case rel was truncated */
	relation->rd_targblock = InvalidBlockNumber;
	/* Must free any AM cached data, too */
	if (relation->rd_amcache)
		pfree(relation->rd_amcache);
	relation->rd_amcache = NULL;
2225

2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238
	/*
	 * If it's a shared index, we might be called before backend startup has
	 * finished selecting a database, in which case we have no way to read
	 * pg_class yet.  However, a shared index can never have any significant
	 * schema updates, so it's okay to ignore the invalidation signal.  Just
	 * mark it valid and return without doing anything more.
	 */
	if (relation->rd_rel->relisshared && !criticalRelcachesBuilt)
	{
		relation->rd_isvalid = true;
		return;
	}

2239
	/*
2240 2241
	 * Read the pg_class row
	 *
2242 2243
	 * Don't try to use an indexscan of pg_class_oid_index to reload the info
	 * for pg_class_oid_index ...
2244
	 */
2245
	indexOK = (RelationGetRelid(relation) != ClassOidIndexId);
2246
	pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK, NULL);
H
Hiroshi Inoue 已提交
2247
	if (!HeapTupleIsValid(pg_class_tuple))
2248
		elog(ERROR, "could not find pg_class tuple for index %u",
2249
			 RelationGetRelid(relation));
H
Hiroshi Inoue 已提交
2250
	relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
2251
	memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
2252
	/* Reload reloptions in case they changed */
B
Bruce Momjian 已提交
2253 2254
	if (relation->rd_options)
		pfree(relation->rd_options);
2255 2256
	RelationParseRelOptions(relation, pg_class_tuple);
	/* done with pg_class tuple */
H
Hiroshi Inoue 已提交
2257
	heap_freetuple(pg_class_tuple);
2258 2259
	/* We must recalculate physical address in case it changed */
	RelationInitPhysicalAddr(relation);
2260 2261 2262 2263

	/* Forget gp_relation_node information -- it may have changed. */
	MemSet(&relation->rd_segfile0_relationnodeinfo, 0, sizeof(RelationNodeInfo));

2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280
	/*
	 * For a non-system index, there are fields of the pg_index row that are
	 * allowed to change, so re-read that row and update the relcache entry.
	 * Most of the info derived from pg_index (such as support function lookup
	 * info) cannot change, and indeed the whole point of this routine is to
	 * update the relcache entry without clobbering that data; so wholesale
	 * replacement is not appropriate.
	 */
	if (!IsSystemRelation(relation))
	{
		HeapTuple	tuple;
		Form_pg_index index;

		tuple = SearchSysCache(INDEXRELID,
							   ObjectIdGetDatum(RelationGetRelid(relation)),
							   0, 0, 0);
		if (!HeapTupleIsValid(tuple))
B
Bruce Momjian 已提交
2281 2282
			elog(ERROR, "cache lookup failed for index %u",
				 RelationGetRelid(relation));
2283 2284
		index = (Form_pg_index) GETSTRUCT(tuple);

2285 2286 2287 2288 2289 2290 2291 2292 2293
		/*
		 * Basically, let's just copy all the bool fields.  There are one or
		 * two of these that can't actually change in the current code, but
		 * it's not worth it to track exactly which ones they are.  None of
		 * the array fields are allowed to change, though.
		 */
		relation->rd_index->indisunique = index->indisunique;
		relation->rd_index->indisprimary = index->indisprimary;
		relation->rd_index->indisclustered = index->indisclustered;
2294
		relation->rd_index->indisvalid = index->indisvalid;
2295 2296
		relation->rd_index->indcheckxmin = index->indcheckxmin;
		relation->rd_index->indisready = index->indisready;
2297 2298

		/* Copy xmin too, as that is needed to make sense of indcheckxmin */
2299 2300
		HeapTupleHeaderSetXmin(relation->rd_indextuple->t_data,
							   HeapTupleHeaderGetXmin(tuple->t_data));
2301 2302 2303

		ReleaseSysCache(tuple);
	}
2304

2305
	/* Okay, now it's valid again */
2306
	relation->rd_isvalid = true;
H
Hiroshi Inoue 已提交
2307
}
2308

2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337
/*
 * RelationDestroyRelation
 *
 *	Physically delete a relation cache entry and all subsidiary data.
 *	Caller must already have unhooked the entry from the hash table.
 */
static void
RelationDestroyRelation(Relation relation)
{
	Assert(RelationHasReferenceCountZero(relation));

	/*
	 * Make sure smgr and lower levels close the relation's files, if they
	 * weren't closed already.  (This was probably done by caller, but let's
	 * just be real sure.)
	 */
	RelationCloseSmgr(relation);

	/*
	 * Free all the subsidiary data structures of the relcache entry,
	 * then the entry itself.
	 */
	if (relation->rd_rel)
		pfree(relation->rd_rel);
	/* can't use DecrTupleDescRefCount here */
	Assert(relation->rd_att->tdrefcount > 0);
	if (--relation->rd_att->tdrefcount == 0)
		FreeTupleDesc(relation->rd_att);
	list_free(relation->rd_indexlist);
2338
	bms_free(relation->rd_indexattr);
2339 2340 2341 2342 2343
	FreeTriggerDesc(relation->trigdesc);
	if (relation->rd_options)
		pfree(relation->rd_options);
	if (relation->rd_indextuple)
		pfree(relation->rd_indextuple);
2344 2345
	if (relation->rd_aotuple)
		pfree(relation->rd_aotuple);
2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357
	if (relation->rd_am)
		pfree(relation->rd_am);
	if (relation->rd_indexcxt)
		MemoryContextDelete(relation->rd_indexcxt);
	if (relation->rd_rulescxt)
		MemoryContextDelete(relation->rd_rulescxt);
	if (relation->rd_cdbpolicy)
		pfree(relation->rd_cdbpolicy);

	pfree(relation);
}

2358
/*
2359
 * RelationClearRelation
2360
 *
2361 2362
 *	 Physically blow away a relation cache entry, or reset it and rebuild
 *	 it from scratch (that is, from catalog entries).  The latter path is
2363 2364
 *	 used when we are notified of a change to an open relation (one with
 *	 refcount > 0).
2365
 *
2366 2367 2368 2369 2370 2371
 *	 NB: when rebuilding, we'd better hold some lock on the relation,
 *	 else the catalog data we need to read could be changing under us.
 *	 Also, a rel to be rebuilt had better have refcnt > 0.  This is because
 *	 an sinval reset could happen while we're accessing the catalogs, and
 *	 the rel would get blown away underneath us by RelationCacheInvalidate
 *	 if it has zero refcnt.
2372
 *
2373 2374 2375
 *	 The "rebuild" parameter is redundant in current usage because it has
 *	 to match the relation's refcnt status, but we keep it as a crosscheck
 *	 that we're doing what the caller expects.
2376
 */
2377
static void
2378
RelationClearRelation(Relation relation, bool rebuild)
2379
{
2380 2381 2382 2383 2384 2385 2386
	/*
	 * As per notes above, a rel to be rebuilt MUST have refcnt > 0; while
	 * of course it would be a bad idea to blow away one with nonzero refcnt.
	 */
	Assert(rebuild ?
		   !RelationHasReferenceCountZero(relation) :
		   RelationHasReferenceCountZero(relation));
2387 2388

	/*
2389
	 * Make sure smgr and lower levels close the relation's files, if they
B
Bruce Momjian 已提交
2390 2391 2392 2393
	 * weren't closed already.  If the relation is not getting deleted, the
	 * next smgr access should reopen the files automatically.	This ensures
	 * that the low-level file access state is updated after, say, a vacuum
	 * truncation.
2394
	 */
2395
	RelationCloseSmgr(relation);
2396

2397
	/*
B
Bruce Momjian 已提交
2398 2399 2400
	 * Never, never ever blow away a nailed-in system relation, because we'd
	 * be unable to recover.  However, we must reset rd_targblock, in case we
	 * got called because of a relation cache flush that was triggered by
2401
	 * VACUUM.  Likewise reset the fsm and vm size info.
2402
	 *
2403 2404 2405
	 * If it's a nailed index, then we need to re-read the pg_class row to see
	 * if its relfilenode changed.	We can't necessarily do that here, because
	 * we might be in a failed transaction.  We assume it's okay to do it if
B
Bruce Momjian 已提交
2406 2407 2408
	 * there are open references to the relcache entry (cf notes for
	 * AtEOXact_RelationCache).  Otherwise just mark the entry as possibly
	 * invalid, and it'll be fixed when next opened.
2409 2410
	 */
	if (relation->rd_isnailed)
H
Hiroshi Inoue 已提交
2411
	{
2412
		relation->rd_targblock = InvalidBlockNumber;
2413 2414
		if (relation->rd_rel->relkind == RELKIND_INDEX)
		{
B
Bruce Momjian 已提交
2415
			relation->rd_isvalid = false;		/* needs to be revalidated */
2416
			if (relation->rd_refcnt > 1)
2417
				RelationReloadIndexInfo(relation);
2418
		}
2419
		return;
H
Hiroshi Inoue 已提交
2420
	}
2421

2422 2423 2424 2425
	/*
	 * Even non-system indexes should not be blown away if they are open and
	 * have valid index support information.  This avoids problems with active
	 * use of the index support information.  As with nailed indexes, we
B
Bruce Momjian 已提交
2426
	 * re-read the pg_class row to handle possible physical relocation of the
2427
	 * index, and we check for pg_index updates too.
2428 2429 2430 2431 2432
	 */
	if (relation->rd_rel->relkind == RELKIND_INDEX &&
		relation->rd_refcnt > 0 &&
		relation->rd_indexcxt != NULL)
	{
B
Bruce Momjian 已提交
2433
		relation->rd_isvalid = false;	/* needs to be revalidated */
2434
		RelationReloadIndexInfo(relation);
2435 2436 2437
		return;
	}

2438 2439
	/* Mark it invalid until we've finished rebuild */
	relation->rd_isvalid = false;
2440

2441
	/*
2442
	 * If we're really done with the relcache entry, blow it away. But if
B
Bruce Momjian 已提交
2443 2444 2445
	 * someone is still using it, reconstruct the whole deal without moving
	 * the physical RelationData record (so that the someone's pointer is
	 * still valid).
2446
	 */
2447
	if (!rebuild)
2448
	{
2449 2450 2451 2452 2453
		/* Remove it from the hash table */
		RelationCacheDelete(relation);

		/* And release storage */
		RelationDestroyRelation(relation);
2454 2455 2456
	}
	else
	{
2457
		/*
2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472
		 * Our strategy for rebuilding an open relcache entry is to build
		 * a new entry from scratch, swap its contents with the old entry,
		 * and finally delete the new entry (along with any infrastructure
		 * swapped over from the old entry).  This is to avoid trouble in case
		 * an error causes us to lose control partway through.  The old entry
		 * will still be marked !rd_isvalid, so we'll try to rebuild it again
		 * on next access.  Meanwhile it's not any less valid than it was
		 * before, so any code that might expect to continue accessing it
		 * isn't hurt by the rebuild failure.  (Consider for example a
		 * subtransaction that ALTERs a table and then gets cancelled partway
		 * through the cache entry rebuild.  The outer transaction should
		 * still see the not-modified cache entry as valid.)  The worst
		 * consequence of an error is leaking the necessarily-unreferenced
		 * new entry, and this shouldn't happen often enough for that to be
		 * a big problem.
2473
		 *
2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512
		 * When rebuilding an open relcache entry, we must preserve ref count
		 * and rd_createSubid/rd_newRelfilenodeSubid state.  Also attempt to
		 * preserve the pg_class entry (rd_rel), tupledesc, and rewrite-rule
		 * substructures in place, because various places assume that these
		 * structures won't move while they are working with an open relcache
		 * entry.  (Note: the refcount mechanism for tupledescs might someday
		 * allow us to remove this hack for the tupledesc.)
 		 *
 		 * Note that this process does not touch CurrentResourceOwner; which
 		 * is good because whatever ref counts the entry may have do not
 		 * necessarily belong to that resource owner.
 		 */
		Relation	newrel;
 		Oid			save_relid = RelationGetRelid(relation);
		bool		keep_tupdesc;
		bool		keep_rules;

		/* Build temporary entry, but don't link it into hashtable */
		newrel = RelationBuildDesc(save_relid, false);
		if (newrel == NULL)
 		{
 			/* Should only get here if relation was deleted */
			RelationCacheDelete(relation);
			RelationDestroyRelation(relation);
 			elog(ERROR, "relation %u deleted while still in use", save_relid);
 		}
 
		keep_tupdesc = equalTupleDescs(relation->rd_att, newrel->rd_att, true);
		keep_rules = equalRuleLocks(relation->rd_rules, newrel->rd_rules);

		/*
		 * Perform swapping of the relcache entry contents.  Within this
		 * process the old entry is momentarily invalid, so there *must*
		 * be no possibility of CHECK_FOR_INTERRUPTS within this sequence.
		 * Do it in all-in-line code for safety.
		 *
		 * Since the vast majority of fields should be swapped, our method
		 * is to swap the whole structures and then re-swap those few fields
		 * we didn't want swapped.
2513
		 */
2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527
#define SWAPFIELD(fldtype, fldname) \
		do { \
			fldtype _tmp = newrel->fldname; \
			newrel->fldname = relation->fldname; \
			relation->fldname = _tmp; \
		} while (0)

		/* swap all Relation struct fields */
 		{
			RelationData tmpstruct;

			memcpy(&tmpstruct, newrel, sizeof(RelationData));
			memcpy(newrel, relation, sizeof(RelationData));
			memcpy(relation, &tmpstruct, sizeof(RelationData));
2528
		}
2529 2530 2531 2532 2533 2534 2535 2536 2537

		/* rd_smgr must not be swapped, due to back-links from smgr level */
		SWAPFIELD(SMgrRelation, rd_smgr);
		/* rd_refcnt must be preserved */
		SWAPFIELD(int, rd_refcnt);
		/* isnailed shouldn't change */
		Assert(newrel->rd_isnailed == relation->rd_isnailed);
		/* creation sub-XIDs must be preserved */
		SWAPFIELD(SubTransactionId, rd_createSubid);
2538
		SWAPFIELD(SubTransactionId, rd_newRelfilenodeSubid);
2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560
		/* un-swap rd_rel pointers, swap contents instead */
		SWAPFIELD(Form_pg_class, rd_rel);
		/* ... but actually, we don't have to update newrel->rd_rel */
		memcpy(relation->rd_rel, newrel->rd_rel, CLASS_TUPLE_SIZE);
		/* preserve old tupledesc and rules if no logical change */
		if (keep_tupdesc)
			SWAPFIELD(TupleDesc, rd_att);
		if (keep_rules)
 		{
			SWAPFIELD(RuleLock *, rd_rules);
			SWAPFIELD(MemoryContext, rd_rulescxt);
 		}
		/* pgstat_info must be preserved */
		SWAPFIELD(struct PgStat_TableStatus *, pgstat_info);

		/* preserve persistent table information for the relation  */
		SWAPFIELD(struct RelationNodeInfo, rd_segfile0_relationnodeinfo);

#undef SWAPFIELD

		/* And now we can throw away the temporary entry */
		RelationDestroyRelation(newrel);
2561
	}
2562 2563
}

2564
/*
2565 2566 2567 2568 2569
 * RelationFlushRelation
 *
 *	 Rebuild the relation if it is open (refcount > 0), else blow it away.
 */
static void
2570
RelationFlushRelation(Relation relation)
2571
{
2572 2573
	if (relation->rd_createSubid != InvalidSubTransactionId ||
		relation->rd_newRelfilenodeSubid != InvalidSubTransactionId)
2574 2575
	{
		/*
2576 2577
		 * New relcache entries are always rebuilt, not flushed; else we'd
		 * forget the "new" status of the relation, which is a useful
2578
		 * optimization to have.  Ditto for the new-relfilenode status.
2579 2580 2581 2582
		 *
		 * The rel could have zero refcnt here, so temporarily increment
		 * the refcnt to ensure it's safe to rebuild it.  We can assume that
		 * the current transaction has some lock on the rel already.
2583
		 */
2584 2585 2586
		RelationIncrementReferenceCount(relation);
		RelationClearRelation(relation, true);
		RelationDecrementReferenceCount(relation);
2587 2588 2589 2590
	}
	else
	{
		/*
2591
		 * Pre-existing rels can be dropped from the relcache if not open.
2592
		 */
2593
		bool	rebuild = !RelationHasReferenceCountZero(relation);
2594

2595 2596
		RelationClearRelation(relation, rebuild);
	}
2597 2598
}

2599
/*
2600
 * RelationForgetRelation - unconditionally remove a relcache entry
2601
 *
2602 2603
 *		   External interface for destroying a relcache entry when we
 *		   drop the relation.
2604 2605
 */
void
2606
RelationForgetRelation(Oid rid)
2607
{
2608
	Relation	relation;
2609 2610 2611

	RelationIdCacheLookup(rid, relation);

2612 2613 2614 2615
	if (!PointerIsValid(relation))
		return;					/* not in cache, nothing to do */

	if (!RelationHasReferenceCountZero(relation))
2616
		elog(ERROR, "relation %u is still open", rid);
2617 2618 2619

	/* Unconditionally destroy the relcache entry */
	RelationClearRelation(relation, false);
2620 2621
}

2622
/*
2623
 *		RelationCacheInvalidateEntry
2624 2625 2626
 *
 *		This routine is invoked for SI cache flush messages.
 *
2627 2628
 * Any relcache entry matching the relid must be flushed.  (Note: caller has
 * already determined that the relid belongs to our database or is a shared
2629
 * relation.)
2630 2631 2632 2633 2634 2635
 *
 * We used to skip local relations, on the grounds that they could
 * not be targets of cross-backend SI update messages; but it seems
 * safer to process them, so that our *own* SI update messages will
 * have the same effects during CommandCounterIncrement for both
 * local and nonlocal relations.
2636 2637
 */
void
2638
RelationCacheInvalidateEntry(Oid relationId)
2639
{
2640
	Relation	relation;
2641 2642 2643

	RelationIdCacheLookup(relationId, relation);

2644
	if (PointerIsValid(relation))
2645
	{
2646
		relcacheInvalsReceived++;
2647
		RelationFlushRelation(relation);
2648
	}
2649 2650 2651 2652
}

/*
 * RelationCacheInvalidate
2653
 *	 Blow away cached relation descriptors that have zero reference counts,
B
Bruce Momjian 已提交
2654
 *	 and rebuild those with positive reference counts.	Also reset the smgr
2655
 *	 relation cache.
2656
 *
2657
 *	 This is currently used only to recover from SI message buffer overflow,
2658
 *	 so we do not touch new-in-transaction relations; they cannot be targets
2659 2660
 *	 of cross-backend SI updates (and our own updates now go through a
 *	 separate linked list that isn't limited by the SI message buffer size).
2661 2662 2663
 *	 Likewise, we need not discard new-relfilenode-in-transaction hints,
 *	 since any invalidation of those would be a local event.
 *
2664 2665
 *	 We do this in two phases: the first pass deletes deletable items, and
 *	 the second one rebuilds the rebuildable items.  This is essential for
2666
 *	 safety, because hash_seq_search only copes with concurrent deletion of
B
Bruce Momjian 已提交
2667
 *	 the element it is currently visiting.	If a second SI overflow were to
2668 2669 2670 2671
 *	 occur while we are walking the table, resulting in recursive entry to
 *	 this routine, we could crash because the inner invocation blows away
 *	 the entry next to be visited by the outer scan.  But this way is OK,
 *	 because (a) during the first pass we won't process any more SI messages,
2672
 *	 so hash_seq_search will complete safely; (b) during the second pass we
2673
 *	 only hold onto pointers to nondeletable entries.
2674 2675 2676 2677 2678 2679
 *
 *	 The two-phase approach also makes it easy to ensure that we process
 *	 nailed-in-cache indexes before other nondeletable items, and that we
 *	 process pg_class_oid_index first of all.  In scenarios where a nailed
 *	 index has been given a new relfilenode, we have to detect that update
 *	 before the nailed index is used in reloading any other relcache entry.
2680 2681
 */
void
2682
RelationCacheInvalidate(void)
2683
{
2684
	HASH_SEQ_STATUS status;
2685
	RelIdCacheEnt *idhentry;
2686
	Relation	relation;
2687
	List	   *rebuildFirstList = NIL;
B
Bruce Momjian 已提交
2688
	List	   *rebuildList = NIL;
2689
	ListCell   *l;
2690 2691

	/* Phase 1 */
2692
	hash_seq_init(&status, RelationIdCache);
2693

2694
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2695
	{
2696
		relation = idhentry->reldesc;
2697

2698
		/* Must close all smgr references to avoid leaving dangling ptrs */
2699
		RelationCloseSmgr(relation);
2700

2701
		/* Ignore new relations, since they are never SI targets */
2702
		if (relation->rd_createSubid != InvalidSubTransactionId)
2703
			continue;
2704

2705 2706
		relcacheInvalsReceived++;

2707
		if (RelationHasReferenceCountZero(relation))
2708 2709
		{
			/* Delete this entry immediately */
2710
			Assert(!relation->rd_isnailed);
2711 2712 2713 2714
			RelationClearRelation(relation, false);
		}
		else
		{
2715 2716
			/*
			 * Add this entry to list of stuff to rebuild in second pass.
B
Bruce Momjian 已提交
2717 2718
			 * pg_class_oid_index goes on the front of rebuildFirstList, other
			 * nailed indexes on the back, and everything else into
2719 2720 2721 2722 2723
			 * rebuildList (in no particular order).
			 */
			if (relation->rd_isnailed &&
				relation->rd_rel->relkind == RELKIND_INDEX)
			{
2724
				if (RelationGetRelid(relation) == ClassOidIndexId)
2725 2726 2727 2728 2729 2730
					rebuildFirstList = lcons(relation, rebuildFirstList);
				else
					rebuildFirstList = lappend(rebuildFirstList, relation);
			}
			else
				rebuildList = lcons(relation, rebuildList);
2731
		}
2732
	}
2733

2734
	/*
B
Bruce Momjian 已提交
2735 2736 2737
	 * Now zap any remaining smgr cache entries.  This must happen before we
	 * start to rebuild entries, since that may involve catalog fetches which
	 * will re-open catalog files.
2738 2739 2740
	 */
	smgrcloseall();

2741
	/* Phase 2: rebuild the items found to need rebuild in phase 1 */
2742 2743 2744 2745 2746 2747
	foreach(l, rebuildFirstList)
	{
		relation = (Relation) lfirst(l);
		RelationClearRelation(relation, true);
	}
	list_free(rebuildFirstList);
2748
	foreach(l, rebuildList)
2749
	{
2750 2751
		relation = (Relation) lfirst(l);
		RelationClearRelation(relation, true);
2752
	}
2753
	list_free(rebuildList);
2754
}
2755

2756
/*
2757
 * AtEOXact_RelationCache
2758
 *
2759
 *	Clean up the relcache at main-transaction commit or abort.
2760 2761 2762 2763 2764
 *
 * Note: this must be called *before* processing invalidation messages.
 * In the case of abort, we don't want to try to rebuild any invalidated
 * cache entries (since we can't safely do database accesses).  Therefore
 * we must reset refcnts before handling pending invalidations.
2765 2766 2767 2768 2769 2770
 *
 * As of PostgreSQL 8.1, relcache refcnts should get released by the
 * ResourceOwner mechanism.  This routine just does a debugging
 * cross-check that no pins remain.  However, we also need to do special
 * cleanup when the current transaction created any relations or made use
 * of forced index lists.
2771 2772
 */
void
2773
AtEOXact_RelationCache(bool isCommit)
2774
{
2775
	HASH_SEQ_STATUS status;
2776
	RelIdCacheEnt *idhentry;
2777

2778 2779
	/*
	 * To speed up transaction exit, we want to avoid scanning the relcache
B
Bruce Momjian 已提交
2780 2781 2782 2783
	 * unless there is actually something for this routine to do.  Other than
	 * the debug-only Assert checks, most transactions don't create any work
	 * for us to do here, so we keep a static flag that gets set if there is
	 * anything to do.	(Currently, this means either a relation is created in
2784 2785 2786 2787
	 * the current xact, or one is given a new relfilenode, or an index list
	 * is forced.)  For simplicity, the flag remains set till end of top-level
	 * transaction, even though we could clear it at subtransaction end in
	 * some cases.
2788 2789 2790
	 *
	 * MPP-3333: READERS need to *always* scan, otherwise they will not be able
	 * to maintain a coherent view of the storage layer.
2791 2792
	 */
	if (!need_eoxact_work
2793
		&& DistributedTransactionContext != DTX_CONTEXT_QE_READER
2794 2795 2796 2797 2798 2799
#ifdef USE_ASSERT_CHECKING
		&& !assert_enabled
#endif
		)
		return;

2800
	hash_seq_init(&status, RelationIdCache);
2801

2802
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2803
	{
2804
		Relation	relation = idhentry->reldesc;
2805 2806 2807 2808 2809

		/*
		 * The relcache entry's ref count should be back to its normal
		 * not-in-a-transaction state: 0 unless it's nailed in cache.
		 *
B
Bruce Momjian 已提交
2810 2811 2812
		 * In bootstrap mode, this is NOT true, so don't check it --- the
		 * bootstrap code expects relations to stay open across start/commit
		 * transaction calls.  (That seems bogus, but it's not worth fixing.)
2813 2814 2815 2816 2817 2818 2819 2820 2821 2822
		 */
#ifdef USE_ASSERT_CHECKING
		if (!IsBootstrapProcessingMode())
		{
			int			expected_refcnt;

			expected_refcnt = relation->rd_isnailed ? 1 : 0;
			Assert(relation->rd_refcnt == expected_refcnt);
		}
#endif
2823

2824 2825 2826 2827 2828 2829 2830 2831 2832
		/*
		 * QE-readers aren't properly enrolled in transactions, they
		 * just get the snapshot which corresponds -- so here, where
		 * we are maintaining their relcache, we want to just clean
		 * up (almost as if we had aborted). (MPP-3338)
		 */
		if (DistributedTransactionContext == DTX_CONTEXT_QE_ENTRY_DB_SINGLETON ||
			DistributedTransactionContext == DTX_CONTEXT_QE_READER)
		{
2833
			RelationClearRelation(relation, relation->rd_isnailed ? true : false);
2834 2835 2836
			continue;
		}

2837 2838 2839
		/*
		 * Is it a relation created in the current transaction?
		 *
B
Bruce Momjian 已提交
2840 2841 2842 2843 2844 2845
		 * During commit, reset the flag to zero, since we are now out of the
		 * creating transaction.  During abort, simply delete the relcache
		 * entry --- it isn't interesting any longer.  (NOTE: if we have
		 * forgotten the new-ness of a new relation due to a forced cache
		 * flush, the entry will get deleted anyway by shared-cache-inval
		 * processing of the aborted pg_class insertion.)
2846
		 */
2847
		if (relation->rd_createSubid != InvalidSubTransactionId)
2848
		{
2849
			if (isCommit)
2850
				relation->rd_createSubid = InvalidSubTransactionId;
2851 2852
			else
			{
2853 2854 2855 2856 2857
				/*
				 * In abort, delete the error log file before forgetting
				 * this relation.
				 */
				ErrorLogDelete(MyDatabaseId, RelationGetRelid(relation));
2858 2859 2860 2861 2862
				RelationClearRelation(relation, false);
				continue;
			}
		}

2863 2864 2865
		/*
		 * Likewise, reset the hint about the relfilenode being new.
		 */
2866
		relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
2867

2868 2869 2870 2871 2872
		/*
		 * Flush any temporary index list.
		 */
		if (relation->rd_indexvalid == 2)
		{
2873
			list_free(relation->rd_indexlist);
2874
			relation->rd_indexlist = NIL;
2875
			relation->rd_oidindex = InvalidOid;
2876 2877
			relation->rd_indexvalid = 0;
		}
2878
	}
2879

2880 2881
	/* Once done with the transaction, we can reset need_eoxact_work */
	need_eoxact_work = false;
2882
}
2883

2884 2885 2886 2887 2888 2889 2890 2891
/*
 * AtEOSubXact_RelationCache
 *
 *	Clean up the relcache at sub-transaction commit or abort.
 *
 * Note: this must be called *before* processing invalidation messages.
 */
void
2892 2893
AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
						  SubTransactionId parentSubid)
2894 2895 2896 2897
{
	HASH_SEQ_STATUS status;
	RelIdCacheEnt *idhentry;

2898
	/*
2899 2900
	 * Skip the relcache scan if nothing to do --- see notes for
	 * AtEOXact_RelationCache.
2901
	 */
2902 2903
	if (!need_eoxact_work &&
		DistributedTransactionContext != DTX_CONTEXT_QE_READER)
2904 2905
		return;

2906 2907 2908 2909 2910 2911
	hash_seq_init(&status, RelationIdCache);

	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
	{
		Relation	relation = idhentry->reldesc;

2912 2913 2914 2915 2916 2917 2918 2919
		/*
		 * As opposed to AtEOXact_RelationCache, subtransactions
		 * in readers are only caused by internal commands, and
		 * there shouldn't be interaction with global transactions,
		 * (reader gangs commit their transaction independently)
		 * we must not clear the relcache here.
		 */

2920 2921 2922
		/*
		 * Is it a relation created in the current subtransaction?
		 *
2923 2924
		 * During subcommit, mark it as belonging to the parent, instead.
		 * During subabort, simply delete the relcache entry.
2925
		 */
2926
		if (relation->rd_createSubid == mySubid)
2927 2928
		{
			if (isCommit)
2929
				relation->rd_createSubid = parentSubid;
2930
			else if (RelationHasReferenceCountZero(relation))
2931
			{
2932 2933 2934 2935 2936
				/*
				 * In abort, delete the error log file before forgetting
				 * this relation.
				 */
				ErrorLogDelete(MyDatabaseId, RelationGetRelid(relation));
2937

2938 2939 2940
				RelationClearRelation(relation, false);
				continue;
			}
2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954
			else
			{
				/*
				 * Hmm, somewhere there's a (leaked?) reference to the
				 * relation.  We daren't remove the entry for fear of
				 * dereferencing a dangling pointer later.  Bleat, and mark it
				 * as not belonging to the current transaction.  Hopefully
				 * it'll get cleaned up eventually.  This must be just a
				 * WARNING to avoid error-during-error-recovery loops.
				 */
				relation->rd_createSubid = InvalidSubTransactionId;
				elog(WARNING, "cannot remove relcache entry for \"%s\" because it has nonzero refcount",
					 RelationGetRelationName(relation));
			}
2955 2956
		}

2957
		/*
B
Bruce Momjian 已提交
2958 2959
		 * Likewise, update or drop any new-relfilenode-in-subtransaction
		 * hint.
2960
		 */
2961 2962 2963 2964 2965
		if (relation->rd_newRelfilenodeSubid == mySubid)
		{
			if (isCommit)
				relation->rd_newRelfilenodeSubid = parentSubid;
			else
2966
				relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
2967
		}
2968 2969 2970 2971 2972 2973 2974 2975

		/*
		 * Flush any temporary index list.
		 */
		if (relation->rd_indexvalid == 2)
		{
			list_free(relation->rd_indexlist);
			relation->rd_indexlist = NIL;
2976
			relation->rd_oidindex = InvalidOid;
2977 2978 2979 2980 2981
			relation->rd_indexvalid = 0;
		}
	}
}

2982 2983 2984 2985
/*
 * RelationCacheMarkNewRelfilenode
 *
 *	Mark the rel as having been given a new relfilenode in the current
B
Bruce Momjian 已提交
2986
 *	(sub) transaction.	This is a hint that can be used to optimize
2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998
 *	later operations on the rel in the same transaction.
 */
void
RelationCacheMarkNewRelfilenode(Relation rel)
{
	/* Mark it... */
	rel->rd_newRelfilenodeSubid = GetCurrentSubTransactionId();
	/* ... and now we have eoxact cleanup work to do */
	need_eoxact_work = true;
}


2999
/*
3000 3001 3002
 *		RelationBuildLocalRelation
 *			Build a relcache entry for an about-to-be-created relation,
 *			and enter it into the relcache.
3003
 */
3004 3005
Relation
RelationBuildLocalRelation(const char *relname,
3006
						   Oid relnamespace,
3007
						   TupleDesc tupDesc,
3008 3009
						   Oid relid,
						   Oid reltablespace,
3010
			               char relkind,            /*CDB*/
3011
						   bool shared_relation)
3012
{
3013
	Relation	rel;
3014
	MemoryContext oldcxt;
3015 3016
	int			natts = tupDesc->natts;
	int			i;
3017
	bool		has_not_null;
3018
	bool		nailit;
3019

3020
	AssertArg(natts >= 0);
3021

3022 3023 3024
	/*
	 * check for creation of a rel that must be nailed in cache.
	 *
3025 3026
	 * XXX this list had better match the relations specially handled in
	 * RelationCacheInitializePhase2/3.
3027 3028 3029
	 */
	switch (relid)
	{
3030 3031 3032
		case DatabaseRelationId:
		case AuthIdRelationId:
		case AuthMemRelationId:
3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043
		case RelationRelationId:
		case AttributeRelationId:
		case ProcedureRelationId:
		case TypeRelationId:
			nailit = true;
			break;
		default:
			nailit = false;
			break;
	}

3044 3045
	/*
	 * check that hardwired list of shared rels matches what's in the
B
Bruce Momjian 已提交
3046 3047 3048
	 * bootstrap .bki file.  If you get a failure here during initdb, you
	 * probably need to fix IsSharedRelation() to match whatever you've done
	 * to the set of shared relations.
3049 3050 3051 3052 3053
	 */
	if (shared_relation != IsSharedRelation(relid))
		elog(ERROR, "shared_relation flag for \"%s\" does not match IsSharedRelation(%u)",
			 relname, relid);

3054 3055 3056 3057 3058
	/*
	 * switch to the cache context to create the relcache entry.
	 */
	if (!CacheMemoryContext)
		CreateCacheMemoryContext();
3059

3060 3061
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

3062
	/*
3063
	 * allocate a new relation descriptor and fill in basic state fields.
3064
	 */
3065
	rel = (Relation) palloc0(sizeof(RelationData));
3066

3067 3068 3069
	rel->rd_targblock = InvalidBlockNumber;

	/* make sure relation is marked as having no open file yet */
3070
	rel->rd_smgr = NULL;
3071

3072 3073 3074
	/* mark it nailed if appropriate */
	rel->rd_isnailed = nailit;

3075
	rel->rd_refcnt = nailit ? 1 : 0;
3076

3077
	/* it's being created in this transaction */
3078
	rel->rd_createSubid = GetCurrentSubTransactionId();
3079
	rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3080

3081
	/* must flag that we have rels created in this transaction */
3082
	need_eoxact_work = true;
3083

3084
	/* is it a temporary relation? */
3085
	rel->rd_istemp = isTempOrToastNamespace(relnamespace);
3086

3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101
	/* is it a system catalog? */
	rel->rd_issyscat = (strncmp(relname, "pg_", 3) == 0);

    /*
     * CDB: On QEs, temp relations must use shared buffer cache so data
     * will be visible to all segmates.  On QD, sequence objects must
     * use shared buffer cache so data will be visible to sequence server.
     */
    if (rel->rd_istemp &&
        relkind != RELKIND_SEQUENCE &&
        Gp_role != GP_ROLE_EXECUTE)
        rel->rd_isLocalBuf = true;
    else
        rel->rd_isLocalBuf = false;

3102
	/*
3103
	 * create a new tuple descriptor from the one passed in.  We do this
B
Bruce Momjian 已提交
3104 3105 3106 3107
	 * partly to copy it into the cache context, and partly because the new
	 * relation can't have any defaults or constraints yet; they have to be
	 * added in later steps, because they require additions to multiple system
	 * catalogs.  We can copy attnotnull constraints here, however.
3108
	 */
3109
	rel->rd_att = CreateTupleDescCopy(tupDesc);
3110
	rel->rd_att->tdrefcount = 1;	/* mark as refcounted */
3111
	has_not_null = false;
3112
	for (i = 0; i < natts; i++)
3113
	{
3114
		rel->rd_att->attrs[i]->attnotnull = tupDesc->attrs[i]->attnotnull;
3115 3116 3117 3118 3119 3120 3121 3122 3123 3124
		has_not_null |= tupDesc->attrs[i]->attnotnull;
	}

	if (has_not_null)
	{
		TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

		constr->has_not_null = true;
		rel->rd_att->constr = constr;
	}
3125 3126 3127 3128

	/*
	 * initialize relation tuple form (caller may add/override data later)
	 */
3129
	rel->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
3130

3131 3132
	namestrcpy(&rel->rd_rel->relname, relname);
	rel->rd_rel->relnamespace = relnamespace;
3133 3134

	rel->rd_rel->relkind = RELKIND_UNCATALOGED;
3135
	rel->rd_rel->relstorage = RELSTORAGE_HEAP;
3136
	rel->rd_rel->relhasoids = rel->rd_att->tdhasoid;
3137 3138
	rel->rd_rel->relnatts = natts;
	rel->rd_rel->reltype = InvalidOid;
3139 3140
	/* needed when bootstrapping: */
	rel->rd_rel->relowner = BOOTSTRAP_SUPERUSERID;
3141

3142 3143 3144 3145 3146 3147 3148
	/*
	 * Create zeroed-out gp_relation_node data.  It will be filled in when the
	 * disk file is created.
	 */
	rel->rd_segfile0_relationnodeinfo.isPresent = false;
	rel->rd_segfile0_relationnodeinfo.tidAllowedToBeZero = false;

3149
	/*
B
Bruce Momjian 已提交
3150 3151 3152
	 * Insert relation physical and logical identifiers (OIDs) into the right
	 * places.	Note that the physical ID (relfilenode) is initially the same
	 * as the logical ID (OID).
3153
	 */
3154
	rel->rd_rel->relisshared = shared_relation;
3155 3156 3157 3158 3159 3160

	RelationGetRelid(rel) = relid;

	for (i = 0; i < natts; i++)
		rel->rd_att->attrs[i]->attrelid = relid;

3161 3162
	rel->rd_rel->relfilenode = relid;
	rel->rd_rel->reltablespace = reltablespace;
3163

3164
	RelationInitLockInfo(rel);	/* see lmgr.c */
3165

3166 3167
	RelationInitPhysicalAddr(rel);

3168 3169 3170 3171
	/*
	 * Okay to insert into the relcache hash tables.
	 */
	RelationCacheInsert(rel);
3172

3173 3174 3175
	/*
	 * done building relcache entry.
	 */
3176
	MemoryContextSwitchTo(oldcxt);
3177

3178 3179 3180
	/* It's fully valid */
	rel->rd_isvalid = true;

3181 3182 3183 3184 3185
	/*
	 * Caller expects us to pin the returned entry.
	 */
	RelationIncrementReferenceCount(rel);

3186
	return rel;
3187 3188
}

3189
/*
3190
 *		RelationCacheInitialize
3191
 *
3192 3193
 *		This initializes the relation descriptor cache.  At the time
 *		that this is invoked, we can't do database access yet (mainly
3194 3195 3196 3197 3198
 *		because the transaction subsystem is not up); all we are doing
 *		is making an empty cache hashtable.  This must be done before
 *		starting the initialization transaction, because otherwise
 *		AtEOXact_RelationCache would crash if that transaction aborts
 *		before we can get the relcache set up.
3199 3200
 */

3201
#define INITRELCACHESIZE		400
3202 3203

void
3204
RelationCacheInitialize(void)
3205
{
3206 3207
	MemoryContext oldcxt;
	HASHCTL		ctl;
3208

3209
	/*
3210
	 * make sure cache memory context exists
3211
	 */
3212 3213
	if (!CacheMemoryContext)
		CreateCacheMemoryContext();
3214

3215 3216 3217
    /*
	 * switch to cache memory context
	 */
3218
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3219

3220
	/*
3221
	 * create hashtable that indexes the relcache
3222
	 */
3223
	MemSet(&ctl, 0, sizeof(ctl));
3224
	ctl.keysize = sizeof(Oid);
3225
	ctl.entrysize = sizeof(RelIdCacheEnt);
3226
	ctl.hash = oid_hash;
3227 3228
	RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
								  &ctl, HASH_ELEM | HASH_FUNCTION);
3229

3230 3231 3232 3233 3234 3235
	MemoryContextSwitchTo(oldcxt);
}

/*
 *		RelationCacheInitializePhase2
 *
3236 3237 3238 3239 3240 3241 3242
 *		This is called to prepare for access to shared catalogs during startup.
 *		We must at least set up nailed reldescs for pg_database, pg_authid,
 *		and pg_auth_members.  Ideally we'd like to have reldescs for their
 *		indexes, too.  We attempt to load this information from the shared
 *		relcache init file.  If that's missing or broken, just make phony
 *		entries for the catalogs themselves.  RelationCacheInitializePhase3
 *		will clean up as needed.
3243 3244 3245
 */
void
RelationCacheInitializePhase2(void)
3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267 3268 3269 3270 3271 3272 3273 3274 3275 3276 3277 3278 3279 3280 3281 3282 3283 3284 3285 3286 3287 3288 3289 3290 3291 3292 3293 3294 3295
{
	MemoryContext oldcxt;

	/*
	 * In bootstrap mode, the shared catalogs aren't there yet anyway, so do
	 * nothing.
	 */
	if (IsBootstrapProcessingMode())
		return;

	/*
	 * switch to cache memory context
	 */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

	/*
	 * Try to load the shared relcache cache file.	If unsuccessful, bootstrap
	 * the cache with pre-made descriptors for the critical shared catalogs.
	 */
	if (!load_relcache_init_file(true))
	{
		formrdesc("pg_database", PG_DATABASE_RELTYPE_OID, true,
				  true, Natts_pg_database, Desc_pg_database);
		formrdesc("pg_authid", PG_AUTHID_RELTYPE_OID, true,
				  true, Natts_pg_authid, Desc_pg_authid);
		formrdesc("pg_auth_members", PG_AUTH_MEMBERS_RELTYPE_OID, true,
				  false, Natts_pg_auth_members, Desc_pg_auth_members);

#define NUM_CRITICAL_SHARED_RELS	3	/* fix if you change list above */
	}

	MemoryContextSwitchTo(oldcxt);
}

/*
 *		RelationCacheInitializePhase3
 *
 *		This is called as soon as the catcache and transaction system
 *		are functional and we have determined MyDatabaseId.  At this point
 *		we can actually read data from the database's system catalogs.
 *		We first try to read pre-computed relcache entries from the local
 *		relcache init file.  If that's missing or broken, make phony entries
 *		for the minimum set of nailed-in-cache relations.  Then (unless
 *		bootstrapping) make sure we have entries for the critical system
 *		indexes.  Once we've done all this, we have enough infrastructure to
 *		open any system catalog or use any catcache.  The last step is to
 *		rewrite the cache files if needed.
 */
void
RelationCacheInitializePhase3(void)
3296 3297 3298 3299
{
	HASH_SEQ_STATUS status;
	RelIdCacheEnt *idhentry;
	MemoryContext oldcxt;
3300
	bool		needNewCacheFile = !criticalSharedRelcachesBuilt;
3301

3302
	/*
3303 3304 3305 3306 3307
	 * switch to cache memory context
	 */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

	/*
3308 3309
	 * Try to load the local relcache cache file.  If unsuccessful, bootstrap
	 * the cache with pre-made descriptors for the critical "nailed-in" system
3310
	 * catalogs.
3311
	 */
3312
	if (IsBootstrapProcessingMode() ||
3313
		!load_relcache_init_file(false))
3314
	{
3315 3316
		needNewCacheFile = true;

3317
		formrdesc("pg_class", PG_CLASS_RELTYPE_OID, false,
3318
				  true, Natts_pg_class, Desc_pg_class);
3319
		formrdesc("pg_attribute", PG_ATTRIBUTE_RELTYPE_OID, false,
3320
				  false, Natts_pg_attribute, Desc_pg_attribute);
3321
		formrdesc("pg_proc", PG_PROC_RELTYPE_OID, false,
3322
				  true, Natts_pg_proc, Desc_pg_proc);
3323
		formrdesc("pg_type", PG_TYPE_RELTYPE_OID, false,
3324
				  true, Natts_pg_type, Desc_pg_type);
3325

3326
#define NUM_CRITICAL_LOCAL_RELS 4		/* fix if you change list above */
3327
	}
3328 3329

	MemoryContextSwitchTo(oldcxt);
3330

3331
	/* In bootstrap mode, the faked-up formrdesc info is all we'll have */
3332 3333 3334
	if (IsBootstrapProcessingMode())
		return;

3335
	/*
B
Bruce Momjian 已提交
3336
	 * If we didn't get the critical system indexes loaded into relcache, do
3337 3338
	 * so now.	These are critical because the catcache and/or opclass cache
	 * depend on them for fetches done during relcache load.  Thus, we have an
B
Bruce Momjian 已提交
3339 3340 3341 3342 3343 3344
	 * infinite-recursion problem.	We can break the recursion by doing
	 * heapscans instead of indexscans at certain key spots. To avoid hobbling
	 * performance, we only want to do that until we have the critical indexes
	 * loaded into relcache.  Thus, the flag criticalRelcachesBuilt is used to
	 * decide whether to do heapscan or indexscan at the key spots, and we set
	 * it true after we've loaded the critical indexes.
3345
	 *
B
Bruce Momjian 已提交
3346 3347 3348 3349 3350 3351
	 * The critical indexes are marked as "nailed in cache", partly to make it
	 * easy for load_relcache_init_file to count them, but mainly because we
	 * cannot flush and rebuild them once we've set criticalRelcachesBuilt to
	 * true.  (NOTE: perhaps it would be possible to reload them by
	 * temporarily setting criticalRelcachesBuilt to false again.  For now,
	 * though, we just nail 'em in.)
3352 3353 3354 3355
	 *
	 * RewriteRelRulenameIndexId and TriggerRelidNameIndexId are not critical
	 * in the same way as the others, because the critical catalogs don't
	 * (currently) have any rules or triggers, and so these indexes can be
B
Bruce Momjian 已提交
3356
	 * rebuilt without inducing recursion.	However they are used during
3357 3358
	 * relcache load when a rel does have rules or triggers, so we choose to
	 * nail them for performance reasons.
3359
	 */
B
Bruce Momjian 已提交
3360
	if (!criticalRelcachesBuilt)
3361
	{
3362 3363 3364 3365 3366 3367
		load_critical_index(ClassOidIndexId,
							RelationRelationId);
		load_critical_index(AttributeRelidNumIndexId,
							AttributeRelationId);
		load_critical_index(IndexRelidIndexId,
							IndexRelationId);
3368 3369
		load_critical_index(OpclassOidIndexId,
							OperatorClassRelationId);
3370 3371 3372 3373 3374 3375 3376 3377 3378 3379 3380
		load_critical_index(AccessMethodStrategyIndexId,
							AccessMethodOperatorRelationId);
		load_critical_index(OpclassOidIndexId,
							OperatorClassRelationId);
		load_critical_index(AccessMethodProcedureIndexId,
							AccessMethodProcedureRelationId);
		load_critical_index(RewriteRelRulenameIndexId,
							RewriteRelationId);
		load_critical_index(TriggerRelidNameIndexId,
							TriggerRelationId);

3381
#define NUM_CRITICAL_LOCAL_INDEXES	9	/* fix if you change list above */
3382 3383 3384 3385

		criticalRelcachesBuilt = true;
	}

3386 3387 3388 3389 3390 3391 3392 3393 3394 3395 3396 3397 3398 3399 3400 3401 3402 3403 3404 3405 3406 3407 3408 3409 3410 3411 3412 3413
	/*
	 * Process critical shared indexes too.
	 *
	 * DatabaseNameIndexId isn't critical for relcache loading, but rather for
	 * initial lookup of MyDatabaseId, without which we'll never find any
	 * non-shared catalogs at all.	Autovacuum calls InitPostgres with a
	 * database OID, so it instead depends on DatabaseOidIndexId.  We also
	 * need to nail up some indexes on pg_authid and pg_auth_members for use
	 * during client authentication.
	 */
	if (!criticalSharedRelcachesBuilt)
	{
		load_critical_index(DatabaseNameIndexId,
							DatabaseRelationId);
		load_critical_index(DatabaseOidIndexId,
							DatabaseRelationId);
		load_critical_index(AuthIdRolnameIndexId,
							AuthIdRelationId);
		load_critical_index(AuthIdOidIndexId,
							AuthIdRelationId);
		load_critical_index(AuthMemMemRoleIndexId,
							AuthMemRelationId);

#define NUM_CRITICAL_SHARED_INDEXES 5	/* fix if you change list above */

		criticalSharedRelcachesBuilt = true;
	}

3414
	/*
B
Bruce Momjian 已提交
3415 3416 3417 3418 3419 3420
	 * Now, scan all the relcache entries and update anything that might be
	 * wrong in the results from formrdesc or the relcache cache file. If we
	 * faked up relcache entries using formrdesc, then read the real pg_class
	 * rows and replace the fake entries with them. Also, if any of the
	 * relcache entries have rules or triggers, load that info the hard way
	 * since it isn't recorded in the cache file.
3421
	 *
3422 3423
	 * Whenever we access the catalogs to read data, there is a possibility
	 * of a shared-inval cache flush causing relcache entries to be removed.
3424 3425 3426 3427 3428
	 * Since hash_seq_search only guarantees to still work after the *current*
	 * entry is removed, it's unsafe to continue the hashtable scan afterward.
	 * We handle this by restarting the scan from scratch after each access.
	 * This is theoretically O(N^2), but the number of entries that actually
	 * need to be fixed is small enough that it doesn't matter.
3429
	 */
3430
	hash_seq_init(&status, RelationIdCache);
3431

3432
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3433
	{
3434
		Relation	relation = idhentry->reldesc;
3435 3436 3437 3438 3439 3440
		bool		restart = false;

		/*
		 * Make sure *this* entry doesn't get flushed while we work with it.
		 */
		RelationIncrementReferenceCount(relation);
3441

3442
		/*
3443
		 * If it's a faked-up entry, read the real pg_class tuple.
3444
		 */
3445
		if (relation->rd_rel->relowner == InvalidOid)
3446 3447 3448
		{
			HeapTuple	htup;
			Form_pg_class relp;
B
Bruce Momjian 已提交
3449

3450
			htup = SearchSysCache(RELOID,
3451
							   ObjectIdGetDatum(RelationGetRelid(relation)),
3452 3453
								  0, 0, 0);
			if (!HeapTupleIsValid(htup))
3454 3455
				elog(FATAL, "cache lookup failed for relation %u",
					 RelationGetRelid(relation));
3456
			relp = (Form_pg_class) GETSTRUCT(htup);
B
Bruce Momjian 已提交
3457

3458 3459 3460 3461 3462
			/*
			 * Copy tuple to relation->rd_rel. (See notes in
			 * AllocateRelationDesc())
			 */
			memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
3463

3464 3465 3466 3467 3468
			/* Update rd_options while we have the tuple */
			if (relation->rd_options)
				pfree(relation->rd_options);
			RelationParseRelOptions(relation, htup);

3469
			/*
3470
			 * Check the values in rd_att were set up correctly.  (We cannot
3471 3472 3473
			 * just copy them over now: formrdesc must have set up the
			 * rd_att data correctly to start with, because it may already
			 * have been copied into one or more catcache entries.)
3474
			 */
3475 3476 3477
			Assert(relation->rd_att->tdtypeid == relp->reltype);
			Assert(relation->rd_att->tdtypmod == -1);
			Assert(relation->rd_att->tdhasoid == relp->relhasoids);
3478

3479
			ReleaseSysCache(htup);
3480 3481 3482 3483 3484 3485 3486

			/* relowner had better be OK now, else we'll loop forever */
			if (relation->rd_rel->relowner == InvalidOid)
				elog(ERROR, "invalid relowner in pg_class entry for \"%s\"",
					 RelationGetRelationName(relation));

			restart = true;
3487 3488 3489 3490
		}

		/*
		 * Fix data that isn't saved in relcache cache file.
3491 3492 3493 3494 3495
		 *
		 * relhasrules or relhastriggers could possibly be wrong or out of
		 * date.  If we don't actually find any rules or triggers, clear the
		 * local copy of the flag so that we don't get into an infinite loop
		 * here.  We don't make any attempt to fix the pg_class entry, though.
3496 3497
		 */
		if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
3498
		{
3499
			RelationBuildRuleLock(relation);
3500 3501 3502 3503
			if (relation->rd_rules == NULL)
				relation->rd_rel->relhasrules = false;
			restart = true;
		}
3504
		if (relation->rd_rel->reltriggers > 0 && relation->trigdesc == NULL)
3505
		{
3506
			RelationBuildTriggers(relation);
3507 3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518 3519 3520
			if (relation->trigdesc == NULL)
				relation->rd_rel->reltriggers = 0;
			restart = true;
		}

		/* Release hold on the relation */
		RelationDecrementReferenceCount(relation);

		/* Now, restart the hashtable scan if needed */
		if (restart)
		{
			hash_seq_term(&status);
			hash_seq_init(&status, RelationIdCache);
		}
3521
	}
3522

3523
	/*
3524 3525
	 * Lastly, write out new relcache cache files if needed.  We don't bother
	 * to distinguish cases where only one of the two needs an update.
3526
	 */
3527 3528 3529
	if (needNewCacheFile)
	{
		/*
B
Bruce Momjian 已提交
3530 3531 3532
		 * Force all the catcaches to finish initializing and thereby open the
		 * catalogs and indexes they use.  This will preload the relcache with
		 * entries for all the most important system catalogs and indexes, so
3533
		 * that the init files will be most useful for future backends.
3534 3535 3536
		 */
		InitCatalogCachePhase2();

3537 3538 3539 3540 3541 3542
		/* reset initFileRelationIds list; we'll fill it during write */
		initFileRelationIds = NIL;

		/* now write the files */
		write_relcache_init_file(true);
		write_relcache_init_file(false);
3543 3544 3545
	}
}

3546 3547 3548 3549 3550 3551 3552 3553 3554 3555 3556 3557 3558 3559 3560 3561 3562 3563 3564 3565 3566 3567 3568 3569 3570 3571 3572 3573
/*
 * Load one critical system index into the relcache
 *
 * indexoid is the OID of the target index, heapoid is the OID of the catalog
 * it belongs to.
 */
static void
load_critical_index(Oid indexoid, Oid heapoid)
{
	Relation	ird;

	/*
	 * We must lock the underlying catalog before locking the index to avoid
	 * deadlock, since RelationBuildDesc might well need to read the catalog,
	 * and if anyone else is exclusive-locking this catalog and index they'll
	 * be doing it in that order.
	 */
	LockRelationOid(heapoid, AccessShareLock);
	LockRelationOid(indexoid, AccessShareLock);
	ird = RelationBuildDesc(indexoid, true);
	if (ird == NULL)
		elog(PANIC, "could not open critical system index %u", indexoid);
	ird->rd_isnailed = true;
	ird->rd_refcnt = 1;
	UnlockRelationOid(indexoid, AccessShareLock);
	UnlockRelationOid(heapoid, AccessShareLock);
}

3574
/*
3575
 * GetPgClassDescriptor -- get a predefined tuple descriptor for pg_class
3576 3577 3578
 * GetPgIndexDescriptor -- get a predefined tuple descriptor for pg_index
 *
 * We need this kluge because we have to be able to access non-fixed-width
3579 3580 3581 3582 3583 3584
 * fields of pg_class and pg_index before we have the standard catalog caches
 * available.  We use predefined data that's set up in just the same way as
 * the bootstrapped reldescs used by formrdesc().  The resulting tupdesc is
 * not 100% kosher: it does not have the correct rowtype OID in tdtypeid, nor
 * does it have a TupleConstr field.  But it's good enough for the purpose of
 * extracting fields.
3585 3586
 */
static TupleDesc
3587 3588
BuildHardcodedDescriptor(int natts, const FormData_pg_attribute *attrs,
						 bool hasoids)
3589
{
3590
	TupleDesc	result;
3591 3592 3593 3594 3595
	MemoryContext oldcxt;
	int			i;

	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

3596
	result = CreateTemplateTupleDesc(natts, hasoids);
B
Bruce Momjian 已提交
3597
	result->tdtypeid = RECORDOID;		/* not right, but we don't care */
3598
	result->tdtypmod = -1;
3599

3600
	for (i = 0; i < natts; i++)
3601
	{
3602
		memcpy(result->attrs[i], &attrs[i], ATTRIBUTE_FIXED_PART_SIZE);
3603
		/* make sure attcacheoff is valid */
3604
		result->attrs[i]->attcacheoff = -1;
3605 3606 3607
	}

	/* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
3608
	result->attrs[0]->attcacheoff = 0;
3609 3610 3611 3612 3613

	/* Note: we don't bother to set up a TupleConstr entry */

	MemoryContextSwitchTo(oldcxt);

3614 3615 3616 3617 3618 3619 3620 3621 3622 3623 3624 3625 3626 3627 3628 3629 3630 3631 3632 3633 3634 3635 3636 3637 3638 3639 3640 3641
	return result;
}

static TupleDesc
GetPgClassDescriptor(void)
{
	static TupleDesc pgclassdesc = NULL;

	/* Already done? */
	if (pgclassdesc == NULL)
		pgclassdesc = BuildHardcodedDescriptor(Natts_pg_class,
											   Desc_pg_class,
											   true);

	return pgclassdesc;
}

static TupleDesc
GetPgIndexDescriptor(void)
{
	static TupleDesc pgindexdesc = NULL;

	/* Already done? */
	if (pgindexdesc == NULL)
		pgindexdesc = BuildHardcodedDescriptor(Natts_pg_index,
											   Desc_pg_index,
											   false);

3642 3643 3644
	return pgindexdesc;
}

3645 3646 3647
/*
 * Load any default attribute value definitions for the relation.
 */
3648
static void
3649
AttrDefaultFetch(Relation relation)
3650
{
3651 3652 3653
	AttrDefault *attrdef = relation->rd_att->constr->defval;
	int			ndef = relation->rd_att->constr->num_defval;
	Relation	adrel;
3654 3655
	SysScanDesc adscan;
	ScanKeyData skey;
H
Hiroshi Inoue 已提交
3656
	HeapTuple	htup;
3657
	Datum		val;
3658 3659 3660
	bool		isnull;
	int			found;
	int			i;
3661

3662 3663 3664 3665
	ScanKeyInit(&skey,
				Anum_pg_attrdef_adrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
3666

3667 3668 3669
	adrel = heap_open(AttrDefaultRelationId, AccessShareLock);
	adscan = systable_beginscan(adrel, AttrDefaultIndexId, true,
								SnapshotNow, 1, &skey);
3670
	found = 0;
3671

3672
	while (HeapTupleIsValid(htup = systable_getnext(adscan)))
3673
	{
3674
		Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
3675

3676 3677 3678 3679
		for (i = 0; i < ndef; i++)
		{
			if (adform->adnum != attrdef[i].adnum)
				continue;
3680
			if (attrdef[i].adbin != NULL)
3681
				elog(WARNING, "multiple attrdef records found for attr %s of rel %s",
B
Bruce Momjian 已提交
3682
				NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
3683
					 RelationGetRelationName(relation));
3684 3685
			else
				found++;
3686

3687 3688 3689
			val = fastgetattr(htup,
							  Anum_pg_attrdef_adbin,
							  adrel->rd_att, &isnull);
3690
			if (isnull)
3691
				elog(WARNING, "null adbin for attr %s of rel %s",
B
Bruce Momjian 已提交
3692
				NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
3693
					 RelationGetRelationName(relation));
3694 3695
			else
				attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext,
3696
												   TextDatumGetCString(val));
3697 3698
			break;
		}
3699

3700
		if (i >= ndef)
3701 3702
			elog(WARNING, "unexpected attrdef record found for attr %d of rel %s",
				 adform->adnum, RelationGetRelationName(relation));
3703 3704
	}

3705
	systable_endscan(adscan);
3706
	heap_close(adrel, AccessShareLock);
3707 3708

	if (found != ndef)
3709
		elog(WARNING, "%d attrdef record(s) missing for rel %s",
3710
			 ndef - found, RelationGetRelationName(relation));
3711 3712
}

3713 3714 3715
/*
 * Load any check constraints for the relation.
 */
3716
static void
3717
CheckConstraintFetch(Relation relation)
3718
{
3719 3720
	ConstrCheck *check = relation->rd_att->constr->check;
	int			ncheck = relation->rd_att->constr->num_check;
3721
	Relation	conrel;
3722 3723
	SysScanDesc conscan;
	ScanKeyData skey[1];
H
Hiroshi Inoue 已提交
3724
	HeapTuple	htup;
3725
	Datum		val;
3726
	bool		isnull;
3727
	int			found = 0;
3728

3729 3730 3731 3732
	ScanKeyInit(&skey[0],
				Anum_pg_constraint_conrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
3733

3734 3735 3736
	conrel = heap_open(ConstraintRelationId, AccessShareLock);
	conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true,
								 SnapshotNow, 1, skey);
3737

3738
	while (HeapTupleIsValid(htup = systable_getnext(conscan)))
3739
	{
3740 3741 3742 3743 3744 3745
		Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);

		/* We want check constraints only */
		if (conform->contype != CONSTRAINT_CHECK)
			continue;

3746
		if (found >= ncheck)
3747 3748 3749
			elog(ERROR,
			     "pg_class reports %d constraint record(s) for rel %s, but found extra in pg_constraint",
			     ncheck, RelationGetRelationName(relation));
3750

3751
		check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
B
Bruce Momjian 已提交
3752
												  NameStr(conform->conname));
3753 3754

		/* Grab and test conbin is actually set */
3755
		val = fastgetattr(htup,
3756 3757
						  Anum_pg_constraint_conbin,
						  conrel->rd_att, &isnull);
3758
		if (isnull)
3759
			elog(ERROR, "null conbin for rel %s",
3760
				 RelationGetRelationName(relation));
3761

3762
		check[found].ccbin = MemoryContextStrdup(CacheMemoryContext,
3763
												 TextDatumGetCString(val));
3764 3765 3766
		found++;
	}

3767
	systable_endscan(conscan);
3768
	heap_close(conrel, AccessShareLock);
3769 3770

	if (found != ncheck)
3771 3772 3773
		elog(ERROR,
		     "found %d in pg_constraint, but pg_class reports %d constraint record(s) for rel %s",
		     found, ncheck, RelationGetRelationName(relation));
3774 3775
}

3776 3777 3778 3779 3780 3781 3782 3783 3784 3785 3786 3787 3788 3789 3790

/*
 * RelationGetPartitioningKey -- get GpPolicy struct for distributed relation
 *
 * Returns a copy of the relation's GpPolicy object, palloc'd in
 * the caller's context.  Caller should pfree() it.  If NULL is
 * returned, relation should be accessed locally.
 */
GpPolicy*
RelationGetPartitioningKey(Relation relation)
{
    return GpPolicyCopy(CurrentMemoryContext, relation->rd_cdbpolicy);
}                                       /* RelationGetPartitioningKey */


3791 3792 3793 3794 3795 3796
/*
 * RelationGetIndexList -- get a list of OIDs of indexes on this relation
 *
 * The index list is created only if someone requests it.  We scan pg_index
 * to find relevant indexes, and add the list to the relcache entry so that
 * we won't have to compute it again.  Note that shared cache inval of a
3797
 * relcache entry will delete the old list and set rd_indexvalid to 0,
3798 3799 3800
 * so that we must recompute the index list on next request.  This handles
 * creation or deletion of an index.
 *
3801 3802 3803 3804 3805 3806
 * The returned list is guaranteed to be sorted in order by OID.  This is
 * needed by the executor, since for index types that we obtain exclusive
 * locks on when updating the index, all backends must lock the indexes in
 * the same order or we will get deadlocks (see ExecOpenIndices()).  Any
 * consistent ordering would do, but ordering by OID is easy.
 *
3807 3808
 * Since shared cache inval causes the relcache's copy of the list to go away,
 * we return a copy of the list palloc'd in the caller's context.  The caller
B
Bruce Momjian 已提交
3809
 * may list_free() the returned list after scanning it. This is necessary
3810 3811
 * since the caller will typically be doing syscache lookups on the relevant
 * indexes, and syscache lookup could cause SI messages to be processed!
3812 3813 3814 3815 3816
 *
 * We also update rd_oidindex, which this module treats as effectively part
 * of the index list.  rd_oidindex is valid when rd_indexvalid isn't zero;
 * it is the pg_class OID of a unique index on OID when the relation has one,
 * and InvalidOid if there is no such index.
3817 3818 3819 3820 3821
 */
List *
RelationGetIndexList(Relation relation)
{
	Relation	indrel;
3822 3823
	SysScanDesc indscan;
	ScanKeyData skey;
3824
	HeapTuple	htup;
3825
	List	   *result;
3826
	Oid			oidIndex;
3827 3828 3829
	MemoryContext oldcxt;

	/* Quick exit if we already computed the list. */
3830
	if (relation->rd_indexvalid != 0)
3831
		return list_copy(relation->rd_indexlist);
3832 3833

	/*
B
Bruce Momjian 已提交
3834 3835 3836 3837
	 * We build the list we intend to return (in the caller's context) while
	 * doing the scan.	After successfully completing the scan, we copy that
	 * list into the relcache entry.  This avoids cache-context memory leakage
	 * if we get some sort of error partway through.
3838 3839
	 */
	result = NIL;
3840
	oidIndex = InvalidOid;
B
Bruce Momjian 已提交
3841

3842
	/* Prepare to scan pg_index for entries having indrelid = this rel. */
3843 3844 3845 3846
	ScanKeyInit(&skey,
				Anum_pg_index_indrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
3847

3848
	indrel = heap_open(IndexRelationId, AccessShareLock);
3849 3850
	indscan = systable_beginscan(indrel, IndexIndrelidIndexId, true,
								 SnapshotNow, 1, &skey);
3851

3852
	while (HeapTupleIsValid(htup = systable_getnext(indscan)))
3853 3854
	{
		Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
3855

3856
		/* Add index's OID to result list in the proper order */
3857
		result = insert_ordered_oid(result, index->indexrelid);
3858 3859

		/* Check to see if it is a unique, non-partial btree index on OID */
3860 3861
		if (IndexIsValid(index) &&
			index->indnatts == 1 &&
3862 3863 3864 3865 3866
			index->indisunique &&
			index->indkey.values[0] == ObjectIdAttributeNumber &&
			index->indclass.values[0] == OID_BTREE_OPS_OID &&
			heap_attisnull(htup, Anum_pg_index_indpred))
			oidIndex = index->indexrelid;
3867 3868
	}

3869
	systable_endscan(indscan);
3870 3871
	heap_close(indrel, AccessShareLock);

3872
	/* Now save a copy of the completed list in the relcache entry. */
3873
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3874
	relation->rd_indexlist = list_copy(result);
3875
	relation->rd_oidindex = oidIndex;
3876
	relation->rd_indexvalid = 1;
3877 3878 3879 3880 3881
	MemoryContextSwitchTo(oldcxt);

	return result;
}

3882 3883 3884 3885 3886 3887 3888 3889 3890 3891 3892 3893
/*
 * insert_ordered_oid
 *		Insert a new Oid into a sorted list of Oids, preserving ordering
 *
 * Building the ordered list this way is O(N^2), but with a pretty small
 * constant, so for the number of entries we expect it will probably be
 * faster than trying to apply qsort().  Most tables don't have very many
 * indexes...
 */
static List *
insert_ordered_oid(List *list, Oid datum)
{
B
Bruce Momjian 已提交
3894
	ListCell   *prev;
3895 3896

	/* Does the datum belong at the front? */
3897 3898
	if (list == NIL || datum < linitial_oid(list))
		return lcons_oid(datum, list);
3899
	/* No, so find the entry it belongs after */
3900
	prev = list_head(list);
3901 3902
	for (;;)
	{
B
Bruce Momjian 已提交
3903
		ListCell   *curr = lnext(prev);
3904

3905
		if (curr == NULL || datum < lfirst_oid(curr))
B
Bruce Momjian 已提交
3906
			break;				/* it belongs after 'prev', before 'curr' */
3907 3908

		prev = curr;
3909
	}
3910 3911
	/* Insert datum into list after 'prev' */
	lappend_cell_oid(list, prev, datum);
3912 3913 3914
	return list;
}

3915 3916 3917 3918
/*
 * RelationSetIndexList -- externally force the index list contents
 *
 * This is used to temporarily override what we think the set of valid
3919 3920
 * indexes is (including the presence or absence of an OID index).
 * The forcing will be valid only until transaction commit or abort.
3921 3922 3923 3924 3925 3926
 *
 * This should only be applied to nailed relations, because in a non-nailed
 * relation the hacked index list could be lost at any time due to SI
 * messages.  In practice it is only used on pg_class (see REINDEX).
 *
 * It is up to the caller to make sure the given list is correctly ordered.
3927 3928 3929 3930 3931 3932 3933
 *
 * We deliberately do not change rd_indexattr here: even when operating
 * with a temporary partial index list, HOT-update decisions must be made
 * correctly with respect to the full index set.  It is up to the caller
 * to ensure that a correct rd_indexattr set has been cached before first
 * calling RelationSetIndexList; else a subsequent inquiry might cause a
 * wrong rd_indexattr set to get computed and cached.
3934 3935
 */
void
3936
RelationSetIndexList(Relation relation, List *indexIds, Oid oidIndex)
3937 3938 3939
{
	MemoryContext oldcxt;

3940
	Assert(relation->rd_isnailed);
3941 3942
	/* Copy the list into the cache context (could fail for lack of mem) */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3943
	indexIds = list_copy(indexIds);
3944 3945
	MemoryContextSwitchTo(oldcxt);
	/* Okay to replace old list */
3946
	list_free(relation->rd_indexlist);
3947
	relation->rd_indexlist = indexIds;
3948
	relation->rd_oidindex = oidIndex;
B
Bruce Momjian 已提交
3949
	relation->rd_indexvalid = 2;	/* mark list as forced */
3950
	/* must flag that we have a forced index list */
3951
	need_eoxact_work = true;
3952 3953
}

3954 3955 3956 3957 3958 3959 3960 3961 3962 3963 3964
/*
 * RelationGetOidIndex -- get the pg_class OID of the relation's OID index
 *
 * Returns InvalidOid if there is no such index.
 */
Oid
RelationGetOidIndex(Relation relation)
{
	List	   *ilist;

	/*
B
Bruce Momjian 已提交
3965 3966 3967
	 * If relation doesn't have OIDs at all, caller is probably confused. (We
	 * could just silently return InvalidOid, but it seems better to throw an
	 * assertion.)
3968 3969 3970 3971 3972 3973 3974 3975 3976 3977 3978 3979 3980 3981
	 */
	Assert(relation->rd_rel->relhasoids);

	if (relation->rd_indexvalid == 0)
	{
		/* RelationGetIndexList does the heavy lifting. */
		ilist = RelationGetIndexList(relation);
		list_free(ilist);
		Assert(relation->rd_indexvalid != 0);
	}

	return relation->rd_oidindex;
}

3982 3983 3984 3985 3986 3987 3988 3989 3990 3991 3992 3993 3994 3995 3996 3997 3998 3999 4000 4001 4002 4003 4004 4005 4006 4007 4008 4009
/*
 * RelationGetIndexExpressions -- get the index expressions for an index
 *
 * We cache the result of transforming pg_index.indexprs into a node tree.
 * If the rel is not an index or has no expressional columns, we return NIL.
 * Otherwise, the returned tree is copied into the caller's memory context.
 * (We don't want to return a pointer to the relcache copy, since it could
 * disappear due to relcache invalidation.)
 */
List *
RelationGetIndexExpressions(Relation relation)
{
	List	   *result;
	Datum		exprsDatum;
	bool		isnull;
	char	   *exprsString;
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_indexprs)
		return (List *) copyObject(relation->rd_indexprs);

	/* Quick exit if there is nothing to do. */
	if (relation->rd_indextuple == NULL ||
		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs))
		return NIL;

	/*
B
Bruce Momjian 已提交
4010 4011 4012
	 * We build the tree we intend to return in the caller's context. After
	 * successfully completing the work, we copy it into the relcache entry.
	 * This avoids problems if we get some sort of error partway through.
4013 4014 4015 4016 4017
	 */
	exprsDatum = heap_getattr(relation->rd_indextuple,
							  Anum_pg_index_indexprs,
							  GetPgIndexDescriptor(),
							  &isnull);
4018
	Assert(!isnull);
4019
	exprsString = TextDatumGetCString(exprsDatum);
4020 4021 4022 4023
	result = (List *) stringToNode(exprsString);
	pfree(exprsString);

	/*
4024 4025 4026 4027
	 * Run the expressions through eval_const_expressions. This is not just an
	 * optimization, but is necessary, because the planner will be comparing
	 * them to similarly-processed qual clauses, and may fail to detect valid
	 * matches without this.  We don't bother with canonicalize_qual, however.
4028
	 */
4029
	result = (List *) eval_const_expressions(NULL, (Node *) result);
4030

4031 4032 4033 4034 4035 4036
	/*
	 * Also mark any coercion format fields as "don't care", so that the
	 * planner can match to both explicit and implicit coercions.
	 */
	set_coercionform_dontcare((Node *) result);

4037 4038 4039 4040
	/* May as well fix opfuncids too */
	fix_opfuncids((Node *) result);

	/* Now save a copy of the completed tree in the relcache entry. */
4041
	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
4042 4043 4044 4045 4046 4047 4048 4049 4050
	relation->rd_indexprs = (List *) copyObject(result);
	MemoryContextSwitchTo(oldcxt);

	return result;
}

/*
 * RelationGetIndexPredicate -- get the index predicate for an index
 *
4051 4052
 * We cache the result of transforming pg_index.indpred into an implicit-AND
 * node tree (suitable for ExecQual).
4053 4054 4055 4056 4057 4058 4059 4060 4061 4062 4063 4064 4065 4066 4067 4068 4069 4070 4071 4072 4073 4074 4075 4076
 * If the rel is not an index or has no predicate, we return NIL.
 * Otherwise, the returned tree is copied into the caller's memory context.
 * (We don't want to return a pointer to the relcache copy, since it could
 * disappear due to relcache invalidation.)
 */
List *
RelationGetIndexPredicate(Relation relation)
{
	List	   *result;
	Datum		predDatum;
	bool		isnull;
	char	   *predString;
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_indpred)
		return (List *) copyObject(relation->rd_indpred);

	/* Quick exit if there is nothing to do. */
	if (relation->rd_indextuple == NULL ||
		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indpred))
		return NIL;

	/*
B
Bruce Momjian 已提交
4077 4078 4079
	 * We build the tree we intend to return in the caller's context. After
	 * successfully completing the work, we copy it into the relcache entry.
	 * This avoids problems if we get some sort of error partway through.
4080 4081 4082 4083 4084
	 */
	predDatum = heap_getattr(relation->rd_indextuple,
							 Anum_pg_index_indpred,
							 GetPgIndexDescriptor(),
							 &isnull);
4085
	Assert(!isnull);
4086
	predString = TextDatumGetCString(predDatum);
4087 4088 4089 4090
	result = (List *) stringToNode(predString);
	pfree(predString);

	/*
4091 4092 4093 4094 4095
	 * Run the expression through const-simplification and canonicalization.
	 * This is not just an optimization, but is necessary, because the planner
	 * will be comparing it to similarly-processed qual clauses, and may fail
	 * to detect valid matches without this.  This must match the processing
	 * done to qual clauses in preprocess_expression()!  (We can skip the
B
Bruce Momjian 已提交
4096 4097
	 * stuff involving subqueries, however, since we don't allow any in index
	 * predicates.)
4098
	 */
4099
	result = (List *) eval_const_expressions(NULL, (Node *) result);
4100

4101 4102
	result = (List *) canonicalize_qual((Expr *) result);

4103 4104 4105 4106 4107 4108
	/*
	 * Also mark any coercion format fields as "don't care", so that the
	 * planner can match to both explicit and implicit coercions.
	 */
	set_coercionform_dontcare((Node *) result);

4109 4110 4111
	/* Also convert to implicit-AND format */
	result = make_ands_implicit((Expr *) result);

4112 4113 4114 4115
	/* May as well fix opfuncids too */
	fix_opfuncids((Node *) result);

	/* Now save a copy of the completed tree in the relcache entry. */
4116
	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
4117 4118 4119 4120 4121 4122
	relation->rd_indpred = (List *) copyObject(result);
	MemoryContextSwitchTo(oldcxt);

	return result;
}

4123 4124 4125 4126 4127 4128 4129 4130 4131 4132 4133
/*
 * RelationGetIndexAttrBitmap -- get a bitmap of index attribute numbers
 *
 * The result has a bit set for each attribute used anywhere in the index
 * definitions of all the indexes on this relation.  (This includes not only
 * simple index keys, but attributes used in expressions and partial-index
 * predicates.)
 *
 * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
 * we can include system attributes (e.g., OID) in the bitmap representation.
 *
4134 4135 4136 4137
 * Caller had better hold at least RowExclusiveLock on the target relation
 * to ensure that it has a stable set of indexes.  This also makes it safe
 * (deadlock-free) for us to take locks on the relation's indexes.
 *
4138 4139 4140 4141 4142 4143
 * The returned result is palloc'd in the caller's memory context and should
 * be bms_free'd when not needed anymore.
 */
Bitmapset *
RelationGetIndexAttrBitmap(Relation relation)
{
B
Bruce Momjian 已提交
4144 4145 4146
	Bitmapset  *indexattrs;
	List	   *indexoidlist;
	ListCell   *l;
4147 4148 4149 4150 4151 4152 4153 4154 4155 4156 4157 4158 4159 4160 4161 4162 4163 4164 4165 4166 4167
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_indexattr != NULL)
		return bms_copy(relation->rd_indexattr);

	/* Fast path if definitely no indexes */
	if (!RelationGetForm(relation)->relhasindex)
		return NULL;

	/*
	 * Get cached list of index OIDs
	 */
	indexoidlist = RelationGetIndexList(relation);

	/* Fall out if no indexes (but relhasindex was set) */
	if (indexoidlist == NIL)
		return NULL;

	/*
	 * For each index, add referenced attributes to indexattrs.
4168 4169 4170 4171 4172
	 *
	 * Note: we consider all indexes returned by RelationGetIndexList, even if
	 * they are not indisready or indisvalid.  This is important because an
	 * index for which CREATE INDEX CONCURRENTLY has just started must be
	 * included in HOT-safety decisions (see README.HOT).
4173 4174 4175 4176 4177 4178 4179
	 */
	indexattrs = NULL;
	foreach(l, indexoidlist)
	{
		Oid			indexOid = lfirst_oid(l);
		Relation	indexDesc;
		IndexInfo  *indexInfo;
B
Bruce Momjian 已提交
4180
		int			i;
4181 4182 4183 4184 4185 4186 4187 4188 4189

		indexDesc = index_open(indexOid, AccessShareLock);

		/* Extract index key information from the index's pg_index row */
		indexInfo = BuildIndexInfo(indexDesc);

		/* Collect simple attribute references */
		for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
		{
B
Bruce Momjian 已提交
4190
			int			attrnum = indexInfo->ii_KeyAttrNumbers[i];
4191 4192 4193

			if (attrnum != 0)
				indexattrs = bms_add_member(indexattrs,
B
Bruce Momjian 已提交
4194
							   attrnum - FirstLowInvalidHeapAttributeNumber);
4195 4196 4197 4198 4199 4200 4201 4202 4203 4204 4205 4206 4207 4208 4209 4210 4211 4212 4213 4214 4215 4216
		}

		/* Collect all attributes used in expressions, too */
		pull_varattnos((Node *) indexInfo->ii_Expressions, &indexattrs);

		/* Collect all attributes in the index predicate, too */
		pull_varattnos((Node *) indexInfo->ii_Predicate, &indexattrs);

		index_close(indexDesc, AccessShareLock);
	}

	list_free(indexoidlist);

	/* Now save a copy of the bitmap in the relcache entry. */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_indexattr = bms_copy(indexattrs);
	MemoryContextSwitchTo(oldcxt);

	/* We return our original working copy for caller to play with */
	return indexattrs;
}

4217

4218
/*
4219
 *	load_relcache_init_file, write_relcache_init_file
4220
 *
4221 4222 4223
 *		In late 1992, we started regularly having databases with more than
 *		a thousand classes in them.  With this number of classes, it became
 *		critical to do indexed lookups on the system catalogs.
4224
 *
4225 4226 4227 4228
 *		Bootstrapping these lookups is very hard.  We want to be able to
 *		use an index on pg_attribute, for example, but in order to do so,
 *		we must have read pg_attribute for the attributes in the index,
 *		which implies that we need to use the index.
4229
 *
4230
 *		In order to get around the problem, we do the following:
4231
 *
4232
 *		   +  When the database system is initialized (at initdb time), we
4233
 *			  don't use indexes.  We do sequential scans.
4234
 *
4235 4236 4237
 *		   +  When the backend is started up in normal mode, we load an image
 *			  of the appropriate relation descriptors, in internal format,
 *			  from an initialization file in the data/base/... directory.
4238
 *
4239
 *		   +  If the initialization file isn't there, then we create the
4240
 *			  relation descriptors using sequential scans and write 'em to
4241
 *			  the initialization file for use by subsequent backends.
4242
 *
4243
 *		We could dispense with the initialization files and just build the
4244
 *		critical reldescs the hard way on every backend startup, but that
4245 4246 4247 4248 4249 4250
 *		slows down backend startup noticeably.
 *
 *		We can in fact go further, and save more relcache entries than
 *		just the ones that are absolutely critical; this allows us to speed
 *		up backend startup by not having to build such entries the hard way.
 *		Presently, all the catalog and index entries that are referred to
4251
 *		by catcaches are stored in the initialization files.
4252
 *
T
Tom Lane 已提交
4253 4254
 *		The same mechanism that detects when catcache and relcache entries
 *		need to be invalidated (due to catalog updates) also arranges to
4255 4256
 *		unlink the initialization files when the contents may be out of date.
 *		The files will then be rebuilt during the next backend startup.
4257 4258
 */

4259 4260 4261 4262
/*
 * load_relcache_init_file -- attempt to load cache from the init file
 *
 * If successful, return TRUE and set criticalRelcachesBuilt to true.
4263
 * If not successful, return FALSE.
4264 4265 4266 4267
 *
 * NOTE: we assume we are already switched into CacheMemoryContext.
 */
static bool
4268
load_relcache_init_file(bool shared)
4269
{
4270 4271 4272 4273 4274 4275 4276
	FILE	   *fp;
	char		initfilename[MAXPGPATH];
	Relation   *rels;
	int			relno,
				num_rels,
				max_rels,
				nailed_rels,
4277 4278
				nailed_indexes,
				magic;
4279
	int			i;
4280

4281 4282 4283 4284 4285 4286
	if (shared)
		snprintf(initfilename, sizeof(initfilename), "global/%s",
				 RELCACHE_INIT_FILENAME);
	else
		snprintf(initfilename, sizeof(initfilename), "%s/%s",
				 DatabasePath, RELCACHE_INIT_FILENAME);
4287 4288 4289 4290

	fp = AllocateFile(initfilename, PG_BINARY_R);
	if (fp == NULL)
		return false;
4291

4292
	/*
B
Bruce Momjian 已提交
4293 4294 4295
	 * Read the index relcache entries from the file.  Note we will not enter
	 * any of them into the cache if the read fails partway through; this
	 * helps to guard against broken init files.
4296 4297 4298 4299 4300 4301 4302
	 */
	max_rels = 100;
	rels = (Relation *) palloc(max_rels * sizeof(Relation));
	num_rels = 0;
	nailed_rels = nailed_indexes = 0;
	initFileRelationIds = NIL;

4303 4304 4305 4306 4307 4308
	/* check for correct magic number (compatible version) */
	if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
		goto read_failed;
	if (magic != RELCACHE_INIT_FILEMAGIC)
		goto read_failed;

B
Bruce Momjian 已提交
4309
	for (relno = 0;; relno++)
4310
	{
4311 4312 4313 4314
		Size		len;
		size_t		nread;
		Relation	rel;
		Form_pg_class relform;
4315
		bool		has_not_null;
4316

4317
		/* first read the relation descriptor length */
4318 4319 4320 4321
		if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
		{
			if (nread == 0)
				break;			/* end of file */
4322
			goto read_failed;
4323
		}
4324

4325 4326
		/* safety check for incompatible relcache layout */
		if (len != sizeof(RelationData))
4327
			goto read_failed;
4328

4329 4330 4331 4332 4333 4334
		/* allocate another relcache header */
		if (num_rels >= max_rels)
		{
			max_rels *= 2;
			rels = (Relation *) repalloc(rels, max_rels * sizeof(Relation));
		}
4335

4336
		rel = rels[num_rels++] = (Relation) palloc(len);
4337

4338 4339
		/* then, read the Relation structure */
		if ((nread = fread(rel, 1, len, fp)) != len)
4340
			goto read_failed;
4341 4342

		/* next read the relation tuple form */
4343
		if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
4344
			goto read_failed;
4345 4346

		relform = (Form_pg_class) palloc(len);
4347
		if ((nread = fread(relform, 1, len, fp)) != len)
4348
			goto read_failed;
4349

4350
		rel->rd_rel = relform;
4351 4352

		/* initialize attribute tuple forms */
4353 4354
		rel->rd_att = CreateTemplateTupleDesc(relform->relnatts,
											  relform->relhasoids);
4355 4356
		rel->rd_att->tdrefcount = 1;	/* mark as refcounted */

4357
		rel->rd_att->tdtypeid = relform->reltype;
B
Bruce Momjian 已提交
4358
		rel->rd_att->tdtypmod = -1;		/* unnecessary, but... */
4359 4360

		/* next read all the attribute tuple form data entries */
4361
		has_not_null = false;
4362 4363
		for (i = 0; i < relform->relnatts; i++)
		{
4364
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
4365
				goto read_failed;
4366
			if (len != ATTRIBUTE_FIXED_PART_SIZE)
4367
				goto read_failed;
4368
			if ((nread = fread(rel->rd_att->attrs[i], 1, len, fp)) != len)
4369
				goto read_failed;
4370 4371 4372 4373

			has_not_null |= rel->rd_att->attrs[i]->attnotnull;
		}

B
Bruce Momjian 已提交
4374 4375 4376 4377 4378 4379 4380 4381
		/* next read the access method specific field */
		if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
			goto read_failed;
		if (len > 0)
		{
			rel->rd_options = palloc(len);
			if ((nread = fread(rel->rd_options, 1, len, fp)) != len)
				goto read_failed;
4382
			if (len != VARSIZE(rel->rd_options))
B
Bruce Momjian 已提交
4383
				goto read_failed;		/* sanity check */
B
Bruce Momjian 已提交
4384 4385 4386 4387 4388 4389
		}
		else
		{
			rel->rd_options = NULL;
		}

4390 4391 4392 4393 4394 4395 4396
		/* mark not-null status */
		if (has_not_null)
		{
			TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

			constr->has_not_null = true;
			rel->rd_att->constr = constr;
4397 4398
		}

4399 4400 4401 4402 4403
		/* If it's an index, there's more to do */
		if (rel->rd_rel->relkind == RELKIND_INDEX)
		{
			Form_pg_am	am;
			MemoryContext indexcxt;
4404 4405
			Oid		   *opfamily;
			Oid		   *opcintype;
4406 4407
			Oid		   *operator;
			RegProcedure *support;
4408
			int			nsupport;
4409
			int16	   *indoption;
4410 4411 4412 4413 4414

			/* Count nailed indexes to ensure we have 'em all */
			if (rel->rd_isnailed)
				nailed_indexes++;

4415
			/* next, read the pg_index tuple */
4416 4417
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;
4418

4419 4420
			rel->rd_indextuple = (HeapTuple) palloc(len);
			if ((nread = fread(rel->rd_indextuple, 1, len, fp)) != len)
4421
				goto read_failed;
4422

4423 4424 4425 4426
			/* Fix up internal pointers in the tuple -- see heap_copytuple */
			rel->rd_indextuple->t_data = (HeapTupleHeader) ((char *) rel->rd_indextuple + HEAPTUPLESIZE);
			rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);

4427 4428 4429
			/* next, read the access method tuple form */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;
4430

4431 4432 4433 4434
			am = (Form_pg_am) palloc(len);
			if ((nread = fread(am, 1, len, fp)) != len)
				goto read_failed;
			rel->rd_am = am;
4435

4436 4437 4438 4439 4440 4441
			/*
			 * prepare index info context --- parameters should match
			 * RelationInitIndexAccessInfo
			 */
			indexcxt = AllocSetContextCreate(CacheMemoryContext,
											 RelationGetRelationName(rel),
4442 4443 4444
											 ALLOCSET_SMALL_MINSIZE,
											 ALLOCSET_SMALL_INITSIZE,
											 ALLOCSET_SMALL_MAXSIZE);
4445 4446
			rel->rd_indexcxt = indexcxt;

4447 4448 4449 4450 4451 4452 4453 4454 4455 4456 4457 4458 4459 4460 4461 4462 4463 4464 4465 4466
			/* next, read the vector of opfamily OIDs */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			opfamily = (Oid *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(opfamily, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_opfamily = opfamily;

			/* next, read the vector of opcintype OIDs */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			opcintype = (Oid *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(opcintype, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_opcintype = opcintype;

4467 4468 4469 4470 4471 4472 4473 4474 4475
			/* next, read the vector of operator OIDs */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			operator = (Oid *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(operator, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_operator = operator;
4476

4477
			/* next, read the vector of support procedures */
4478 4479 4480 4481 4482 4483 4484 4485
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;
			support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(support, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_support = support;

4486 4487 4488 4489 4490 4491 4492 4493 4494 4495
			/* finally, read the vector of indoption values */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			indoption = (int16 *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(indoption, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_indoption = indoption;

4496 4497 4498
			/* set up zeroed fmgr-info vectors */
			rel->rd_aminfo = (RelationAmInfo *)
				MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));
4499 4500
			nsupport = relform->relnatts * am->amsupport;
			rel->rd_supportinfo = (FmgrInfo *)
4501
				MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
4502 4503 4504 4505 4506 4507 4508 4509
		}
		else
		{
			/* Count nailed rels to ensure we have 'em all */
			if (rel->rd_isnailed)
				nailed_rels++;

			Assert(rel->rd_index == NULL);
4510
			Assert(rel->rd_indextuple == NULL);
4511 4512
			Assert(rel->rd_am == NULL);
			Assert(rel->rd_indexcxt == NULL);
4513
			Assert(rel->rd_aminfo == NULL);
4514 4515
			Assert(rel->rd_opfamily == NULL);
			Assert(rel->rd_opcintype == NULL);
4516 4517 4518
			Assert(rel->rd_operator == NULL);
			Assert(rel->rd_support == NULL);
			Assert(rel->rd_supportinfo == NULL);
4519
			Assert(rel->rd_indoption == NULL);
4520 4521 4522 4523
		}

		/*
		 * Rules and triggers are not saved (mainly because the internal
B
Bruce Momjian 已提交
4524
		 * format is complex and subject to change).  They must be rebuilt if
4525
		 * needed by RelationCacheInitializePhase3.  This is not expected to
4526 4527
		 * be a big performance hit since few system catalogs have such. Ditto
		 * for index expressions and predicates.
4528 4529 4530 4531
		 */
		rel->rd_rules = NULL;
		rel->rd_rulescxt = NULL;
		rel->trigdesc = NULL;
4532 4533
		rel->rd_indexprs = NIL;
		rel->rd_indpred = NIL;
4534 4535 4536 4537

		/*
		 * Reset transient-state fields in the relcache entry
		 */
4538
		rel->rd_smgr = NULL;
4539 4540
		rel->rd_targblock = InvalidBlockNumber;
		if (rel->rd_isnailed)
4541
			rel->rd_refcnt = 1;
4542
		else
4543
			rel->rd_refcnt = 0;
4544
		rel->rd_indexvalid = 0;
4545
		rel->rd_indexlist = NIL;
4546
		rel->rd_indexattr = NULL;
4547
		rel->rd_oidindex = InvalidOid;
4548
		rel->rd_createSubid = InvalidSubTransactionId;
4549
		rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
4550
		rel->rd_amcache = NULL;
4551
		MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
4552 4553
        rel->rd_cdbpolicy = NULL;
        rel->rd_cdbDefaultStatsWarningIssued = false;
4554

4555
		/*
4556
		 * Recompute lock and physical addressing info.  This is needed in
B
Bruce Momjian 已提交
4557 4558
		 * case the pg_internal.init file was copied from some other database
		 * by CREATE DATABASE.
4559 4560
		 */
		RelationInitLockInfo(rel);
4561
		RelationInitPhysicalAddr(rel);
4562 4563 4564
	}

	/*
B
Bruce Momjian 已提交
4565 4566 4567
	 * We reached the end of the init file without apparent problem. Did we
	 * get the right number of nailed items?  (This is a useful crosscheck in
	 * case the set of critical rels or indexes changes.)
4568
	 */
4569 4570 4571 4572 4573 4574 4575 4576 4577 4578 4579 4580 4581
	if (shared)
	{
		if (nailed_rels != NUM_CRITICAL_SHARED_RELS ||
			nailed_indexes != NUM_CRITICAL_SHARED_INDEXES)
			goto read_failed;
	}
	else
	{
		if (nailed_rels != NUM_CRITICAL_LOCAL_RELS ||
			nailed_indexes != NUM_CRITICAL_LOCAL_INDEXES)
			goto read_failed;
	}

4582 4583 4584 4585 4586 4587 4588 4589 4590 4591

	/*
	 * OK, all appears well.
	 *
	 * Now insert all the new relcache entries into the cache.
	 */
	for (relno = 0; relno < num_rels; relno++)
	{
		RelationCacheInsert(rels[relno]);
		/* also make a list of their OIDs, for RelationIdIsInInitFile */
4592 4593 4594
		if (!shared)
			initFileRelationIds = lcons_oid(RelationGetRelid(rels[relno]),
											initFileRelationIds);
4595
	}
4596

4597 4598 4599
	pfree(rels);
	FreeFile(fp);

4600 4601 4602 4603
	if (shared)
		criticalSharedRelcachesBuilt = true;
	else
		criticalRelcachesBuilt = true;
4604
	return true;
4605

4606
	/*
B
Bruce Momjian 已提交
4607 4608 4609
	 * init file is broken, so do it the hard way.	We don't bother trying to
	 * free the clutter we just allocated; it's not in the relcache so it
	 * won't hurt.
4610
	 */
4611
read_failed:
4612 4613 4614 4615
	pfree(rels);
	FreeFile(fp);

	return false;
4616 4617
}

4618 4619 4620 4621
/*
 * Write out a new initialization file with the current contents
 * of the relcache.
 */
4622
static void
4623
write_relcache_init_file(bool shared)
4624
{
4625
	FILE	   *fp;
4626 4627
	char		tempfilename[MAXPGPATH];
	char		finalfilename[MAXPGPATH];
4628
	int			magic;
4629
	HASH_SEQ_STATUS status;
4630
	RelIdCacheEnt *idhentry;
4631 4632
	MemoryContext oldcxt;
	int			i;
4633 4634

	/*
4635
	 * We must write a temporary file and rename it into place. Otherwise,
B
Bruce Momjian 已提交
4636 4637
	 * another backend starting at about the same time might crash trying to
	 * read the partially-complete file.
4638
	 */
4639 4640 4641 4642 4643 4644 4645 4646 4647 4648 4649 4650 4651 4652
	if (shared)
	{
		snprintf(tempfilename, sizeof(tempfilename), "global/%s.%d",
				 RELCACHE_INIT_FILENAME, MyProcPid);
		snprintf(finalfilename, sizeof(finalfilename), "global/%s",
				 RELCACHE_INIT_FILENAME);
	}
	else
	{
		snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
				 DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
		snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
				 DatabasePath, RELCACHE_INIT_FILENAME);
	}
4653

4654 4655 4656 4657
	unlink(tempfilename);		/* in case it exists w/wrong permissions */

	fp = AllocateFile(tempfilename, PG_BINARY_W);
	if (fp == NULL)
4658 4659 4660 4661 4662
	{
		/*
		 * We used to consider this a fatal error, but we might as well
		 * continue with backend startup ...
		 */
4663 4664
		ereport(WARNING,
				(errcode_for_file_access(),
4665
				 errmsg("could not create relation-cache initialization file \"%s\": %m",
4666
						tempfilename),
B
Bruce Momjian 已提交
4667
			  errdetail("Continuing anyway, but there's something wrong.")));
4668 4669
		return;
	}
4670

4671
	/*
B
Bruce Momjian 已提交
4672
	 * Write a magic number to serve as a file version identifier.	We can
4673 4674 4675 4676 4677 4678
	 * change the magic number whenever the relcache layout changes.
	 */
	magic = RELCACHE_INIT_FILEMAGIC;
	if (fwrite(&magic, 1, sizeof(magic), fp) != sizeof(magic))
		elog(FATAL, "could not write init file");

4679
	/*
4680
	 * Write all the reldescs (in no particular order).
H
Hiroshi Inoue 已提交
4681
	 */
4682
	hash_seq_init(&status, RelationIdCache);
4683

4684
	initFileRelationIds = NIL;
4685

4686
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
4687
	{
4688
		Relation	rel = idhentry->reldesc;
4689
		Form_pg_class relform = rel->rd_rel;
4690

4691 4692 4693 4694
		/* ignore if not correct group */
		if (relform->relisshared != shared)
			continue;

B
Bruce Momjian 已提交
4695 4696
		/* first write the relcache entry proper */
		write_item(rel, sizeof(RelationData), fp);
4697 4698

		/* next write the relation tuple form */
B
Bruce Momjian 已提交
4699
		write_item(relform, CLASS_TUPLE_SIZE, fp);
4700 4701 4702 4703

		/* next, do all the attribute tuple form data entries */
		for (i = 0; i < relform->relnatts; i++)
		{
4704
			write_item(rel->rd_att->attrs[i], ATTRIBUTE_FIXED_PART_SIZE, fp);
4705 4706
		}

B
Bruce Momjian 已提交
4707 4708
		/* next, do the access method specific field */
		write_item(rel->rd_options,
4709
				   (rel->rd_options ? VARSIZE(rel->rd_options) : 0),
4710
				   fp);
B
Bruce Momjian 已提交
4711

4712 4713 4714 4715
		/* If it's an index, there's more to do */
		if (rel->rd_rel->relkind == RELKIND_INDEX)
		{
			Form_pg_am	am = rel->rd_am;
4716

4717 4718
			/* write the pg_index tuple */
			/* we assume this was created by heap_copytuple! */
B
Bruce Momjian 已提交
4719
			write_item(rel->rd_indextuple,
4720 4721
					   HEAPTUPLESIZE + rel->rd_indextuple->t_len,
					   fp);
4722 4723

			/* next, write the access method tuple form */
B
Bruce Momjian 已提交
4724
			write_item(am, sizeof(FormData_pg_am), fp);
4725

4726 4727 4728 4729 4730 4731 4732 4733 4734 4735
			/* next, write the vector of opfamily OIDs */
			write_item(rel->rd_opfamily,
					   relform->relnatts * sizeof(Oid),
					   fp);

			/* next, write the vector of opcintype OIDs */
			write_item(rel->rd_opcintype,
					   relform->relnatts * sizeof(Oid),
					   fp);

4736
			/* next, write the vector of operator OIDs */
4737 4738 4739
			write_item(rel->rd_operator,
					   relform->relnatts * (am->amstrategies * sizeof(Oid)),
					   fp);
4740

4741
			/* next, write the vector of support procedures */
4742
			write_item(rel->rd_support,
B
Bruce Momjian 已提交
4743
				  relform->relnatts * (am->amsupport * sizeof(RegProcedure)),
4744
					   fp);
4745 4746 4747 4748 4749

			/* finally, write the vector of indoption values */
			write_item(rel->rd_indoption,
					   relform->relnatts * sizeof(int16),
					   fp);
4750
		}
4751

4752
		/* also make a list of their OIDs, for RelationIdIsInInitFile */
4753 4754 4755 4756 4757 4758 4759
		if (!shared)
		{
			oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
			initFileRelationIds = lcons_oid(RelationGetRelid(rel),
											initFileRelationIds);
			MemoryContextSwitchTo(oldcxt);
		}
4760
	}
4761

4762 4763
	if (FreeFile(fp))
		elog(FATAL, "could not write init file");
4764

4765
	/*
4766
	 * Now we have to check whether the data we've so painstakingly
B
Bruce Momjian 已提交
4767 4768 4769 4770 4771
	 * accumulated is already obsolete due to someone else's just-committed
	 * catalog changes.  If so, we just delete the temp file and leave it to
	 * the next backend to try again.  (Our own relcache entries will be
	 * updated by SI message processing, but we can't be sure whether what we
	 * wrote out was up-to-date.)
4772
	 *
4773 4774
	 * This mustn't run concurrently with the code that unlinks an init file
	 * and sends SI messages, so grab a serialization lock for the duration.
4775
	 */
4776 4777 4778 4779 4780 4781
	LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);

	/* Make sure we have seen all incoming SI messages */
	AcceptInvalidationMessages();

	/*
B
Bruce Momjian 已提交
4782 4783
	 * If we have received any SI relcache invals since backend start, assume
	 * we may have written out-of-date data.
4784 4785
	 */
	if (relcacheInvalsReceived == 0L)
4786 4787
	{
		/*
4788 4789
		 * OK, rename the temp file to its final name, deleting any
		 * previously-existing init file.
4790
		 *
4791 4792 4793 4794
		 * Note: a failure here is possible under Cygwin, if some other
		 * backend is holding open an unlinked-but-not-yet-gone init file. So
		 * treat this as a noncritical failure; just remove the useless temp
		 * file on failure.
4795
		 */
4796 4797
		if (rename(tempfilename, finalfilename) < 0)
			unlink(tempfilename);
4798 4799 4800 4801
	}
	else
	{
		/* Delete the already-obsolete temp file */
4802 4803
		unlink(tempfilename);
	}
4804 4805

	LWLockRelease(RelCacheInitLock);
4806 4807
}

4808 4809 4810 4811 4812 4813 4814 4815 4816 4817
/* write a chunk of data preceded by its length */
static void
write_item(const void *data, Size len, FILE *fp)
{
	if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
		elog(FATAL, "could not write init file");
	if (fwrite(data, 1, len, fp) != len)
		elog(FATAL, "could not write init file");
}

4818 4819 4820 4821 4822 4823 4824 4825 4826 4827 4828 4829
/*
 * Detect whether a given relation (identified by OID) is one of the ones
 * we store in the init file.
 *
 * Note that we effectively assume that all backends running in a database
 * would choose to store the same set of relations in the init file;
 * otherwise there are cases where we'd fail to detect the need for an init
 * file invalidation.  This does not seem likely to be a problem in practice.
 */
bool
RelationIdIsInInitFile(Oid relationId)
{
4830
	return list_member_oid(initFileRelationIds, relationId);
4831 4832 4833 4834 4835
}

/*
 * Invalidate (remove) the init file during commit of a transaction that
 * changed one or more of the relation cache entries that are kept in the
4836
 * local init file.
4837
 *
4838 4839 4840 4841 4842 4843 4844 4845 4846 4847 4848 4849
 * To be safe against concurrent inspection or rewriting of the init file,
 * we must take RelCacheInitLock, then remove the old init file, then send
 * the SI messages that include relcache inval for such relations, and then
 * release RelCacheInitLock.  This serializes the whole affair against
 * write_relcache_init_file, so that we can be sure that any other process
 * that's concurrently trying to create a new init file won't move an
 * already-stale version into place after we unlink.  Also, because we unlink
 * before sending the SI messages, a backend that's currently starting cannot
 * read the now-obsolete init file and then miss the SI messages that will
 * force it to update its relcache entries.  (This works because the backend
 * startup sequence gets into the sinval array before trying to load the init
 * file.)
4850
 *
4851 4852 4853
 * We take the lock and do the unlink in RelationCacheInitFilePreInvalidate,
 * then release the lock in RelationCacheInitFilePostInvalidate.  Caller must
 * send any pending SI messages between those calls.
4854 4855 4856 4857 4858 4859 4860
 *
 * Notice this deals only with the local init file, not the shared init file.
 * The reason is that there can never be a "significant" change to the
 * relcache entry of a shared relation; the most that could happen is
 * updates of noncritical fields such as relpages/reltuples.  So, while
 * it's worth updating the shared init file from time to time, it can never
 * be invalid enough to make it necessary to remove it.
4861 4862
 */
void
4863
RelationCacheInitFilePreInvalidate(void)
4864 4865 4866 4867 4868 4869
{
	char		initfilename[MAXPGPATH];

	snprintf(initfilename, sizeof(initfilename), "%s/%s",
			 DatabasePath, RELCACHE_INIT_FILENAME);

4870 4871 4872
	LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);

	if (unlink(initfilename) < 0)
4873 4874
	{
		/*
4875 4876 4877 4878
		 * The file might not be there if no backend has been started since
		 * the last removal.  But complain about failures other than ENOENT.
		 * Fortunately, it's not too late to abort the transaction if we
		 * can't get rid of the would-be-obsolete init file.
4879
		 */
4880 4881 4882 4883 4884
		if (errno != ENOENT)
			ereport(ERROR,
					(errcode_for_file_access(),
					 errmsg("could not remove cache file \"%s\": %m",
							initfilename)));
4885
	}
4886
}
4887

4888 4889 4890 4891 4892 4893
void
RelationCacheInitFilePostInvalidate(void)
{
	LWLockRelease(RelCacheInitLock);
}

4894
/*
4895 4896 4897 4898 4899 4900 4901
 * Remove the init files during postmaster startup.
 *
 * We used to keep the init files across restarts, but that is unsafe even in simple
 * crash-recovery cases as there are windows for the init files to become out-of-sync
 * with the database. So now we just remove them during startup and expect the
 * first backend launch to rebuild them. Of course, this has to happen in each
 * database of the cluster.
4902 4903
 */
void
4904 4905 4906 4907 4908 4909 4910 4911 4912 4913 4914 4915 4916 4917 4918 4919 4920 4921 4922
RelationCacheInitFileRemove(void)
{
	char		path[MAXPGPATH];

	/*
	 * We zap the shared cache file too.  In theory it can't get out of sync
	 * enough to be a problem, but in data-corruption cases, who knows ...
	 */
	snprintf(path, sizeof(path), "global/%s",
			 RELCACHE_INIT_FILENAME);
	unlink_initfile(path);

	/* Scan everything in the default tablespace */
	RelationCacheInitFileRemoveInDir("base");
}

/* Process one per-tablespace directory for RelationCacheInitFileRemove */
static void
RelationCacheInitFileRemoveInDir(const char *tblspcpath)
4923
{
4924 4925
	DIR		   *dir;
	struct dirent *de;
4926 4927
	char		initfilename[MAXPGPATH];

4928 4929 4930 4931 4932 4933 4934 4935 4936 4937 4938 4939 4940 4941 4942 4943 4944 4945 4946 4947 4948 4949 4950 4951 4952 4953 4954 4955 4956 4957 4958 4959
	/* Scan the tablespace directory to find per-database directories */
	dir = AllocateDir(tblspcpath);
	if (dir == NULL)
	{
		elog(LOG, "could not open tablespace directory \"%s\": %m",
			 tblspcpath);
		return;
	}

	while ((de = ReadDir(dir, tblspcpath)) != NULL)
	{
		if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
		{
			/* Try to remove the init file in each database */
			snprintf(initfilename, sizeof(initfilename), "%s/%s/%s",
					 tblspcpath, de->d_name, RELCACHE_INIT_FILENAME);
			unlink_initfile(initfilename);
		}
	}

	FreeDir(dir);
}

static void
unlink_initfile(const char *initfilename)
{
	if (unlink(initfilename) < 0)
	{
		/* It might not be there, but log any error other than ENOENT */
		if (errno != ENOENT)
			elog(LOG, "could not remove cache file \"%s\": %m", initfilename);
	}
4960
}
4961 4962 4963 4964 4965 4966 4967 4968 4969 4970 4971 4972 4973 4974 4975 4976 4977

void
RelationGetPTInfo(Relation rel,
	ItemPointer persistentTid,
	int64 *persistentSerialNum)
{
	if (! GpPersistent_SkipXLogInfo(rel->rd_node.relNode) &&
		! rel->rd_segfile0_relationnodeinfo.isPresent)
	{
		elog(ERROR,
			 "required Persistent Table information missing for relation %u/%u/%u",
			 rel->rd_node.spcNode, rel->rd_node.dbNode, rel->rd_node.relNode);
	}

	*persistentTid = rel->rd_segfile0_relationnodeinfo.persistentTid;
	*persistentSerialNum = rel->rd_segfile0_relationnodeinfo.persistentSerialNum;
}