relcache.c 151.5 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * relcache.c
4
 *	  POSTGRES relation descriptor cache code
5
 *
6 7
 * Portions Copyright (c) 2005-2009, Greenplum inc.
 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
B
Add:  
Bruce Momjian 已提交
8
 * Portions Copyright (c) 1994, Regents of the University of California
9 10 11
 *
 *
 * IDENTIFICATION
12
 *	  $PostgreSQL: pgsql/src/backend/utils/cache/relcache.c,v 1.266.2.10 2010/09/02 03:17:06 tgl Exp $
13 14 15 16 17
 *
 *-------------------------------------------------------------------------
 */
/*
 * INTERFACE ROUTINES
18
 *		RelationCacheInitialize			- initialize relcache (to empty)
19 20
 *		RelationCacheInitializePhase2	- initialize shared-catalog entries
 *		RelationCacheInitializePhase3	- finish initializing relcache
21 22
 *		RelationIdGetRelation			- get a reldesc by relation id
 *		RelationClose					- close an open relation
23 24
 *
 * NOTES
25 26
 *		The following code contains many undocumented hacks.  Please be
 *		careful....
27
 */
28 29
#include "postgres.h"

30
#include <sys/file.h>
31
#include <fcntl.h>
32
#include <unistd.h>
33

34 35
#include "access/genam.h"
#include "access/heapam.h"
36
#include "access/reloptions.h"
37
#include "access/sysattr.h"
38
#include "access/xact.h"
39
#include "catalog/catalog.h"
40
#include "catalog/index.h"
B
Bruce Momjian 已提交
41
#include "catalog/indexing.h"
42
#include "catalog/namespace.h"
43 44
#include "catalog/pg_amop.h"
#include "catalog/pg_amproc.h"
B
Bruce Momjian 已提交
45
#include "catalog/pg_attrdef.h"
46
#include "catalog/pg_authid.h"
47
#include "catalog/pg_auth_members.h"
48
#include "catalog/pg_constraint.h"
49
#include "catalog/pg_database.h"
50
#include "catalog/pg_namespace.h"
51
#include "catalog/pg_opclass.h"
52
#include "catalog/pg_operator.h"
B
Bruce Momjian 已提交
53
#include "catalog/pg_proc.h"
54
#include "catalog/pg_rewrite.h"
55 56
#include "catalog/pg_tablespace.h"
#include "catalog/pg_trigger.h"
57
#include "catalog/pg_type.h"
58
#include "commands/trigger.h"
B
Bruce Momjian 已提交
59
#include "miscadmin.h"
60 61
#include "optimizer/clauses.h"
#include "optimizer/planmain.h"
62
#include "optimizer/prep.h"
63
#include "optimizer/var.h"
64
#include "rewrite/rewriteDefine.h"
65
#include "storage/fd.h"
B
Bruce Momjian 已提交
66
#include "storage/smgr.h"
67
#include "utils/builtins.h"
68
#include "utils/fmgroids.h"
69
#include "utils/inval.h"
70
#include "utils/memutils.h"
B
Bruce Momjian 已提交
71
#include "utils/relcache.h"
72
#include "utils/relationnode.h"
73
#include "utils/resowner.h"
74
#include "utils/syscache.h"
B
Bruce Momjian 已提交
75

76
#include "catalog/gp_policy.h"         /* GpPolicy */
77 78 79 80 81 82
#include "cdb/cdbtm.h"
#include "cdb/cdbvars.h"        /* Gp_role */
#include "cdb/cdbmirroredflatfile.h"
#include "cdb/cdbpersistentfilesysobj.h"
#include "cdb/cdbsreh.h"

83

84 85 86 87 88
/*
 * name of relcache init file, used to speed up backend startup
 */
#define RELCACHE_INIT_FILENAME	"pg_internal.init"

89
#define RELCACHE_INIT_FILEMAGIC		0x773264	/* version ID value */
90

91
/*
92
 *		hardcoded tuple descriptors.  see include/catalog/pg_attribute.h
93
 */
94 95 96 97 98 99 100 101 102
static const FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
static const FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
static const FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
static const FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
static const FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};

static const FormData_pg_attribute Desc_pg_database[Natts_pg_database] = {Schema_pg_database};
static const FormData_pg_attribute Desc_pg_authid[Natts_pg_authid] = {Schema_pg_authid};
static const FormData_pg_attribute Desc_pg_auth_members[Natts_pg_auth_members] = {Schema_pg_auth_members};
103

104
/*
105
 *		Hash tables that index the relation cache
106
 *
107 108
 *		We used to index the cache by both name and OID, but now there
 *		is only an index by OID.
109
 */
110 111 112 113 114 115
typedef struct relidcacheent
{
	Oid			reloid;
	Relation	reldesc;
} RelIdCacheEnt;

116
static HTAB *RelationIdCache;
117

118 119 120 121
/*
 * This flag is false until we have prepared the critical relcache entries
 * that are needed to do indexscans on the tables read by relcache building.
 */
B
Bruce Momjian 已提交
122
bool		criticalRelcachesBuilt = false;
123

124 125 126 127 128 129
/*
 * This flag is false until we have prepared the critical relcache entries
 * for shared catalogs (which are the tables needed for login).
 */
bool		criticalSharedRelcachesBuilt = false;

130 131
/*
 * This counter counts relcache inval events received since backend startup
B
Bruce Momjian 已提交
132
 * (but only for rels that are actually in cache).	Presently, we use it only
133 134 135 136
 * to detect whether data about to be written by write_relcache_init_file()
 * might already be obsolete.
 */
static long relcacheInvalsReceived = 0L;
137

138
/*
139 140 141 142
 * This list remembers the OIDs of the non-shared relations cached in the
 * database's local relcache init file.  Note that there is no corresponding
 * list for the shared relcache init file, for reasons explained in the
 * comments for RelationCacheInitFileRemove.
143 144
 */
static List *initFileRelationIds = NIL;
145

146
/*
147
 * This flag lets us optimize away work in AtEO(Sub)Xact_RelationCache().
148
 */
149
static bool need_eoxact_work = false;
150

151

152
/*
153
 *		macros to manipulate the lookup hashtables
154 155
 */
#define RelationCacheInsert(RELATION)	\
156
do { \
157
	RelIdCacheEnt *idhentry; bool found; \
158
	idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
159
										   (void *) &(RELATION->rd_id), \
160
										   HASH_ENTER, &found); \
161
	/* used to give notice if found -- now just keep quiet */ \
162 163 164
	idhentry->reldesc = RELATION; \
} while(0)

165
#define RelationIdCacheLookup(ID, RELATION) \
166
do { \
167 168
	RelIdCacheEnt *hentry; \
	hentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
169 170
										 (void *) &(ID), \
										 HASH_FIND, NULL); \
171
	if (hentry) \
172 173 174 175 176 177 178
		RELATION = hentry->reldesc; \
	else \
		RELATION = NULL; \
} while(0)

#define RelationCacheDelete(RELATION) \
do { \
179
	RelIdCacheEnt *idhentry; \
180
	idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
181
										   (void *) &(RELATION->rd_id), \
182
										   HASH_REMOVE, NULL); \
183
	if (idhentry == NULL) \
184
		elog(WARNING, "trying to delete a rd_id reldesc that does not exist"); \
185
} while(0)
186

187 188 189

/*
 * Special cache for opclass-related information
190
 *
191 192
 * Note: only default operators and support procs get cached, ie, those with
 * lefttype = righttype = opcintype.
193 194 195 196 197 198 199
 */
typedef struct opclasscacheent
{
	Oid			opclassoid;		/* lookup key: OID of opclass */
	bool		valid;			/* set TRUE after successful fill-in */
	StrategyNumber numStrats;	/* max # of strategies (from pg_am) */
	StrategyNumber numSupport;	/* max # of support procs (from pg_am) */
200 201
	Oid			opcfamily;		/* OID of opclass's family */
	Oid			opcintype;		/* OID of opclass's declared input type */
202
	Oid		   *operatorOids;	/* strategy operators' OIDs */
B
Bruce Momjian 已提交
203
	RegProcedure *supportProcs; /* support procs */
204 205 206 207 208
} OpClassCacheEnt;

static HTAB *OpClassCache = NULL;


209
/* non-export function prototypes */
210

211
static void RelationDestroyRelation(Relation relation);
212
static void RelationClearRelation(Relation relation, bool rebuild);
B
Bruce Momjian 已提交
213

214
static void RelationReloadIndexInfo(Relation relation);
215
static void RelationFlushRelation(Relation relation);
216 217
static bool load_relcache_init_file(bool shared);
static void write_relcache_init_file(bool shared);
B
Bruce Momjian 已提交
218
static void write_item(const void *data, Size len, FILE *fp);
219

220
static void formrdesc(const char *relationName, Oid relationReltype,
221 222
		  bool isshared, bool hasoids,
		  int natts, const FormData_pg_attribute *att);
223

224 225
static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK, Relation *pg_class_relation);
static Relation AllocateRelationDesc(Form_pg_class relp);
226
static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
227
static void RelationBuildTupleDesc(Relation relation);
228
static Relation RelationBuildDesc(Oid targetRelId, bool insertIt);
229
static void RelationInitPhysicalAddr(Relation relation);
230
static void RelationInitAppendOnlyInfo(Relation relation);
231
static void load_critical_index(Oid indexoid, Oid heapoid);
232
static TupleDesc GetPgClassDescriptor(void);
233
static TupleDesc GetPgIndexDescriptor(void);
234
static void AttrDefaultFetch(Relation relation);
235
static void CheckConstraintFetch(Relation relation);
236
static List *insert_ordered_oid(List *list, Oid datum);
237
static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
B
Bruce Momjian 已提交
238 239
				  StrategyNumber numStrats,
				  StrategyNumber numSupport);
240 241
static void RelationCacheInitFileRemoveInDir(const char *tblspcpath);
static void unlink_initfile(const char *initfilename);
242

243

244
/*
245
 *		ScanPgRelation
246
 *
247 248 249 250 251
 *		This is used by RelationBuildDesc to find a pg_class
 *		tuple matching targetRelId.  The caller must hold at least
 *		AccessShareLock on the target relid to prevent concurrent-update
 *		scenarios --- else our SnapshotNow scan might fail to find any
 *		version that it thinks is live.
252 253 254
 *
 *		NB: the returned tuple has been copied into palloc'd storage
 *		and must eventually be freed with heap_freetuple.
255
 */
256
static HeapTuple
257
ScanPgRelation(Oid targetRelId, bool indexOK, Relation *pg_class_relation)
258
{
259 260
	HeapTuple	pg_class_tuple;
	Relation	pg_class_desc;
261 262
	SysScanDesc pg_class_scan;
	ScanKeyData key[1];
263 264 265 266 267 268 269 270 271 272 273 274 275

	/*
	 * If something goes wrong during backend startup, we might find ourselves
	 * trying to read pg_class before we've selected a database.  That ain't
	 * gonna work, so bail out with a useful error message.  If this happens,
	 * it probably means a relcache entry that needs to be nailed isn't.
	 */
	if (!OidIsValid(MyDatabaseId))
		elog(FATAL, "cannot read pg_class without having selected a database");

	/*
	 * form a scan key
	 */
276 277 278 279
	ScanKeyInit(&key[0],
				ObjectIdAttributeNumber,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(targetRelId));
280 281 282 283 284 285 286 287

	/*
	 * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).  The caller can also force a heap
	 * scan by setting indexOK == false.
	 */
	pg_class_desc = heap_open(RelationRelationId, AccessShareLock);
288 289 290 291
	pg_class_scan = systable_beginscan(pg_class_desc, ClassOidIndexId,
									   indexOK && criticalRelcachesBuilt,
									   SnapshotNow,
									   1, key);
292

293
	pg_class_tuple = systable_getnext(pg_class_scan);
294 295

	/*
296
	 * Must copy tuple before releasing buffer.
297
	 */
298 299
	if (HeapTupleIsValid(pg_class_tuple))
		pg_class_tuple = heap_copytuple(pg_class_tuple);
300 301

	/* all done */
302
	systable_endscan(pg_class_scan);
303 304 305 306 307 308 309 310 311 312 313 314 315
	if (pg_class_relation == NULL)
		heap_close(pg_class_desc, AccessShareLock);
	else
		*pg_class_relation = pg_class_desc;

	return pg_class_tuple;
}

void
GpRelationNodeBeginScan(
	Snapshot	snapshot,
	Relation 	gp_relation_node,
	Oid		relationId,
316
	Oid 		tablespaceOid,
317 318 319 320 321 322 323 324 325 326 327
	Oid 		relfilenode,
	GpRelationNodeScan 	*gpRelationNodeScan)
{
	Assert (relfilenode != 0);

	MemSet(gpRelationNodeScan, 0, sizeof(GpRelationNodeScan));

	/*
	 * form a scan key
	 */
	ScanKeyInit(&gpRelationNodeScan->scankey[0],
328 329 330 331 332
				Anum_gp_relation_node_tablespace_oid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(tablespaceOid));

	ScanKeyInit(&gpRelationNodeScan->scankey[1],
333 334 335 336 337
				Anum_gp_relation_node_relfilenode_oid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(relfilenode));

	/*
338
	 * Open gp_relation_node and fetch a tuple.  Force heap scan if we haven't yet
339 340 341 342 343 344 345 346
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).  The caller can also force a heap
	 * scan by setting indexOK == false.
	 */
	gpRelationNodeScan->scan = \
		systable_beginscan(gp_relation_node, GpRelationNodeOidIndexId,
						   /* indexOK */ true,
						   snapshot,
347
						   /* nKeys */ 2,
348 349 350 351
						   gpRelationNodeScan->scankey);

	gpRelationNodeScan->gp_relation_node = gp_relation_node;
	gpRelationNodeScan->relationId = relationId;
352
	gpRelationNodeScan->tablespaceOid = tablespaceOid;
353 354 355 356 357 358 359 360 361 362 363
	gpRelationNodeScan->relfilenode = relfilenode;
}

HeapTuple
GpRelationNodeGetNext(
	GpRelationNodeScan 	*gpRelationNodeScan,
	int32				*segmentFileNum,
	ItemPointer			persistentTid,
	int64				*persistentSerialNum)
{
	HeapTuple tuple;
364

365 366
	bool			nulls[Natts_gp_relation_node];
	Datum			values[Natts_gp_relation_node];
367 368

	Oid tablespace;
369
	Oid actualRelationNode;
370

371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388
	int64 createMirrorDataLossTrackingSessionNum;

	tuple = systable_getnext((SysScanDesc)gpRelationNodeScan->scan);

	/*
	 * if no such tuple exists, return NULL
	 */
	if (!HeapTupleIsValid(tuple))
	{
		MemSet(persistentTid, 0, sizeof(ItemPointerData));
		*persistentSerialNum = 0;
		return tuple;
	}
	
	heap_deform_tuple(tuple, RelationGetDescr(gpRelationNodeScan->gp_relation_node), values, nulls);
		
	GpRelationNode_GetValues(
						values,
389
						&tablespace,
390 391 392 393 394 395
						&actualRelationNode,
						segmentFileNum,
						&createMirrorDataLossTrackingSessionNum,
						persistentTid,
						persistentSerialNum);
	if (actualRelationNode != gpRelationNodeScan->relfilenode)
396
		elog(FATAL, "Index on gp_relation_node broken."
397 398 399
			   "Mismatch in node tuple for gp_relation_node for relation %u, tablespace %u, relfilenode %u, relation node %u",
			 gpRelationNodeScan->relationId,
			 gpRelationNodeScan->tablespaceOid,
400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
			 gpRelationNodeScan->relfilenode,
			 actualRelationNode);

	return tuple;
}


void
GpRelationNodeEndScan(
	GpRelationNodeScan 	*gpRelationNodeScan)
{
	/* all done */
	systable_endscan((SysScanDesc)gpRelationNodeScan->scan);
}

415
static HeapTuple
416 417
ScanGpRelationNodeTuple(
	Relation 	gp_relation_node,
418
	Oid 		tablespaceOid,
419 420 421 422 423
	Oid 		relfilenode,
	int32		segmentFileNum)
{
	HeapTuple	tuple;
	SysScanDesc scan;
424
	ScanKeyData key[3];
425

426
	Assert (tablespaceOid != MyDatabaseTableSpace);
427
	Assert (relfilenode != 0);
428

429
	/*
B
Bruce Momjian 已提交
430
	 * form a scan key
431
	 */
432
	ScanKeyInit(&key[0],
433 434 435 436
				Anum_gp_relation_node_tablespace_oid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(tablespaceOid));
	ScanKeyInit(&key[1],
437
				Anum_gp_relation_node_relfilenode_oid,
438
				BTEqualStrategyNumber, F_OIDEQ,
439
				ObjectIdGetDatum(relfilenode));
440
	ScanKeyInit(&key[2],
441 442 443
				Anum_gp_relation_node_segment_file_num,
				BTEqualStrategyNumber, F_INT4EQ,
				Int32GetDatum(segmentFileNum));
444

445
	/*
446
	 * Open gp_relation_node and fetch a tuple.  Force heap scan if we haven't yet
B
Bruce Momjian 已提交
447 448 449
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).  The caller can also force a heap
	 * scan by setting indexOK == false.
450
	 */
451 452
	scan = systable_beginscan(gp_relation_node, GpRelationNodeOidIndexId,
									   /* indexOK */ true,
453
									   SnapshotNow,
454
									   3, key);
455

456
	tuple = systable_getnext(scan);
B
Bruce Momjian 已提交
457

H
Hiroshi Inoue 已提交
458
	/*
459
	 * Must copy tuple before releasing buffer.
H
Hiroshi Inoue 已提交
460
	 */
461 462
	if (HeapTupleIsValid(tuple))
		tuple = heap_copytuple(tuple);
463

464
	/* all done */
465
	systable_endscan(scan);
466

467 468 469 470 471 472
	return tuple;
}

HeapTuple
FetchGpRelationNodeTuple(
	Relation 		gp_relation_node,
473
	Oid 			tablespaceOid,
474 475 476 477 478 479 480 481 482
	Oid 			relfilenode,
	int32			segmentFileNum,
	ItemPointer		persistentTid,
	int64			*persistentSerialNum)
{
	HeapTuple tuple;
	
	bool			nulls[Natts_gp_relation_node];
	Datum			values[Natts_gp_relation_node];
483 484

	Oid tablespace;
485 486 487 488 489
	Oid actualRelationNode;
	int32 actualSegmentFileNum;

	int64 createMirrorDataLossTrackingSessionNum;

490 491 492 493 494
	/*
	 * gp_relation_node stores tablespaceOId in pg_class fashion, hence need
	 * to fetch the similar way.
	 */
	Assert (tablespaceOid != MyDatabaseTableSpace);
495 496 497 498
	Assert (relfilenode != 0);
	
	tuple = ScanGpRelationNodeTuple(
					gp_relation_node,
499
					tablespaceOid,
500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516
					relfilenode,
					segmentFileNum);
	
	/*
	 * if no such tuple exists, return NULL
	 */
	if (!HeapTupleIsValid(tuple))
	{
		MemSet(persistentTid, 0, sizeof(ItemPointerData));
		*persistentSerialNum = 0;
		return tuple;
	}
	
	heap_deform_tuple(tuple, RelationGetDescr(gp_relation_node), values, nulls);
		
	GpRelationNode_GetValues(
						values,
517
						&tablespace,
518 519 520 521 522 523
						&actualRelationNode,
						&actualSegmentFileNum,
						&createMirrorDataLossTrackingSessionNum,
						persistentTid,
						persistentSerialNum);
	
524 525 526 527 528 529 530 531
	if (actualRelationNode != relfilenode)
	{
		elog(ERROR, "Index on gp_relation_node broken."
			   "Mismatch in node tuple for gp_relation_node intended relfilenode %u, fetched relfilenode %u",
			 relfilenode,
			 actualRelationNode);
	}

532 533 534 535 536 537 538 539 540 541 542 543 544 545
	return tuple;
}

/*
 * Deletes the gp relation node entry for the
 * given segment file.
 */ 
void
DeleteGpRelationNodeTuple(
	Relation 	relation,
	int32		segmentFileNum)
{
	Relation	gp_relation_node;
	HeapTuple	tuple;
546 547
	ItemPointerData     persistentTid;
	int64               persistentSerialNum;
548 549 550

	gp_relation_node = heap_open(GpRelationNodeRelationId, RowExclusiveLock);

551
	tuple = FetchGpRelationNodeTuple(gp_relation_node,
552 553 554 555 556
									 relation->rd_rel->reltablespace,
									 relation->rd_rel->relfilenode,
									 segmentFileNum,
									 &persistentTid,
									 &persistentSerialNum);
557

558
	if (!HeapTupleIsValid(tuple))
559
		elog(ERROR, "could not find node tuple for relation %u, tablespace %u, relation file node %u, segment file #%d",
560
			 RelationGetRelid(relation),
561
			 relation->rd_rel->reltablespace,
562 563 564
			 relation->rd_rel->relfilenode,
			 segmentFileNum);

565
	/* delete the relation tuple from gp_relation_node, and finish up */
566 567 568 569 570 571 572 573
	simple_heap_delete(gp_relation_node, &tuple->t_self);
	heap_freetuple(tuple);

	heap_close(gp_relation_node, RowExclusiveLock);
}

bool
ReadGpRelationNode(
574
	Oid 			tablespaceOid,
575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590
	Oid 			relfilenode,
	int32			segmentFileNum,
	ItemPointer		persistentTid,
	int64			*persistentSerialNum)
{
	Relation gp_relation_node;
	HeapTuple tuple;
	bool found;

	MemSet(persistentTid, 0, sizeof(ItemPointerData));
	*persistentSerialNum = 0;

	gp_relation_node = heap_open(GpRelationNodeRelationId, AccessShareLock);

	tuple = FetchGpRelationNodeTuple(
						gp_relation_node,
591
						tablespaceOid,
592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616
						relfilenode,
						segmentFileNum,
						persistentTid,
						persistentSerialNum);

	/*
	 * if no such tuple exists, return NULL
	 */
	if (!HeapTupleIsValid(tuple))
	{
		found = false;
	}
	else
	{
		if (Debug_persistent_print)
		{
			TupleVisibilitySummary tupleVisibilitySummary;
			char *tupleVisibilitySummaryString;
			
			GetTupleVisibilitySummary(
									tuple,
									&tupleVisibilitySummary);
			tupleVisibilitySummaryString = GetTupleVisibilitySummaryString(&tupleVisibilitySummary);
			
			elog(Persistent_DebugPrintLevel(), 
617 618
				 "ReadGpRelationNode: For tablespace %u relfilenode %u, segment file #%d found persistent serial number " INT64_FORMAT ", TID %s (gp_relation_node tuple visibility: %s)",
				 tablespaceOid,
619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653
				 relfilenode,
				 segmentFileNum,
				 *persistentSerialNum,
				 ItemPointerToString(persistentTid),
				 tupleVisibilitySummaryString);
			pfree(tupleVisibilitySummaryString);
		}

		found = true;
		heap_freetuple(tuple);
	}

	heap_close(gp_relation_node, AccessShareLock);

	return found;
}

void
RelationFetchSegFile0GpRelationNode(
	Relation relation)
{
	if (!relation->rd_segfile0_relationnodeinfo.isPresent)
	{
		if (Persistent_BeforePersistenceWork() || InRecovery)
		{
			MemSet(&relation->rd_segfile0_relationnodeinfo.persistentTid, 0, sizeof(ItemPointerData));
			relation->rd_segfile0_relationnodeinfo.persistentSerialNum = 0;
		
			relation->rd_segfile0_relationnodeinfo.isPresent = true;
			relation->rd_segfile0_relationnodeinfo.tidAllowedToBeZero = true;
			
			return; // The initdb process will load the persistent table once we out of bootstrap mode.
		}

		if (!ReadGpRelationNode(
654 655 656 657 658
				relation->rd_rel->reltablespace,
				relation->rd_rel->relfilenode,
				/* segmentFileNum */ 0,
				&relation->rd_segfile0_relationnodeinfo.persistentTid,
				&relation->rd_segfile0_relationnodeinfo.persistentSerialNum))
659
		{
660
			elog(ERROR, "Did not find gp_relation_node entry for relation name %s, relation id %u, tablespaceOid %u, relfilenode %u",
661 662
				 relation->rd_rel->relname.data,
				 relation->rd_id,
663 664
				 relation->rd_rel->reltablespace,
				 relation->rd_rel->relfilenode);
665 666 667
		}

		Assert(!Persistent_BeforePersistenceWork());
668
		if (PersistentStore_IsZeroTid(&relation->rd_segfile0_relationnodeinfo.persistentTid))
669 670 671 672 673 674 675 676 677 678 679 680 681
		{	
			elog(ERROR, 
				 "RelationFetchSegFile0GpRelationNode has invalid TID (0,0) into relation %u/%u/%u '%s', serial number " INT64_FORMAT,
				 relation->rd_node.spcNode,
				 relation->rd_node.dbNode,
				 relation->rd_node.relNode,
				 NameStr(relation->rd_rel->relname),
				 relation->rd_segfile0_relationnodeinfo.persistentSerialNum);
		}

		relation->rd_segfile0_relationnodeinfo.isPresent = true;
		
	}
682 683 684 685 686 687 688 689 690 691 692 693 694
	else if (gp_validate_pt_info_relcache &&
		     !(relation->rd_index &&
			   relation->rd_index->indrelid == GpRelationNodeRelationId))
	{
		/*
		 * bypass the check for gp_relation_node_index because
		 * ReadGpRelationNode() uses the same index to probe relfile node.
		 */

		ItemPointerData persistentTid;
		int64			persistentSerialNum;

		if (!ReadGpRelationNode(
695 696 697 698 699
				relation->rd_rel->reltablespace,
				relation->rd_rel->relfilenode,
				/* segmentFileNum */ 0,
				&persistentTid,
				&persistentSerialNum))
700 701 702
		{
			elog(ERROR,
				 "did not find gp_relation_node entry for relation name %s, "
703 704 705 706 707
				 "relation id %u, tablespace %u, relfilenode %u",
				 relation->rd_rel->relname.data,
				 relation->rd_id,
				 relation->rd_rel->reltablespace,
				 relation->rd_rel->relfilenode);
708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729
		}

		if (ItemPointerCompare(&persistentTid,
							   &relation->rd_segfile0_relationnodeinfo.persistentTid) ||
			(persistentSerialNum != relation->rd_segfile0_relationnodeinfo.persistentSerialNum))
		{
			ereport(ERROR,
					(errmsg("invalid persistent TID and/or serial number in "
							"relcache entry"),
					 errdetail("relation name %s, relation id %u, relfilenode %u "
							   "contains invalid persistent TID %s and/or serial "
							   "number " INT64_FORMAT ".  Expected TID is %s and "
							   "serial number " INT64_FORMAT,
							   relation->rd_rel->relname.data, relation->rd_id,
							   relation->rd_node.relNode,
							   ItemPointerToString(
								   &relation->rd_segfile0_relationnodeinfo.persistentTid),
							   relation->rd_segfile0_relationnodeinfo.persistentSerialNum,
							   ItemPointerToString2(&persistentTid),
							   persistentSerialNum)));
		}
	}
730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751

}

// UNDONE: Temporary
void
RelationFetchGpRelationNodeForXLog_Index(
	Relation relation)
{
	static int countInThisBackend = 0;
	static int deep = 0;
	
	deep++;

	countInThisBackend++;

	if (deep >= 2)
	{
		elog(ERROR, "RelationFetchGpRelationNodeForXLog_Index [%d] for non-heap %u/%u/%u (deep %d)",
			 countInThisBackend,
			 relation->rd_node.spcNode,
			 relation->rd_node.dbNode,
			 relation->rd_node.relNode,
752
			 deep);
753 754 755 756 757
	}

	RelationFetchSegFile0GpRelationNode(relation);

	deep--;
758 759
}

760
/*
761
 *		AllocateRelationDesc
762
 *
763
 *		This is used to allocate memory for a new relation descriptor
764
 *		and initialize the rd_rel field from the given pg_class tuple.
765
 */
766
static Relation
767
AllocateRelationDesc(Form_pg_class relp)
768
{
769
	Relation	relation;
770
	MemoryContext oldcxt;
771
	Form_pg_class relationForm;
772

773 774
	/* Relcache entries must live in CacheMemoryContext */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
775

776
	/*
777
	 * allocate and zero space for new relation descriptor
778
	 */
779
	relation = (Relation) palloc0(sizeof(RelationData));
780

781
	/*
782
	 * clear fields of reldesc that should initialize to something non-zero
783
	 */
784
	relation->rd_targblock = InvalidBlockNumber;
785

786
	/* make sure relation is marked as having no open file yet */
787
	relation->rd_smgr = NULL;
788

789
	/*
B
Bruce Momjian 已提交
790
	 * Copy the relation tuple form
791
	 *
B
Bruce Momjian 已提交
792 793
	 * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE. The
	 * variable-length fields (relacl, reloptions) are NOT stored in the
794 795
	 * relcache --- there'd be little point in it, since we don't copy the
	 * tuple's nulls bitmap and hence wouldn't know if the values are valid.
B
Bruce Momjian 已提交
796 797 798 799
	 * Bottom line is that relacl *cannot* be retrieved from the relcache. Get
	 * it from the syscache if you need it.  The same goes for the original
	 * form of reloptions (however, we do store the parsed form of reloptions
	 * in rd_options).
800 801
	 */
	relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
802

803
	memcpy(relationForm, relp, CLASS_TUPLE_SIZE);
804 805

	/* initialize relation tuple form */
806
	relation->rd_rel = relationForm;
807

808 809 810 811 812 813 814
	/*
	 * This part MUST be remain as a fetch on demand, otherwise you end up
	 * needing it to open pg_class and then relation_open does infinite recursion...
	 */
	relation->rd_segfile0_relationnodeinfo.isPresent = false;
	relation->rd_segfile0_relationnodeinfo.tidAllowedToBeZero = false;

815
	/* and allocate attribute tuple form storage */
816 817
	relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts,
											   relationForm->relhasoids);
818 819
	/* which we mark as a reference-counted tupdesc */
	relation->rd_att->tdrefcount = 1;
820 821 822

	MemoryContextSwitchTo(oldcxt);

823
	return relation;
824 825
}

B
Bruce Momjian 已提交
826
/*
827 828 829 830 831 832
 * RelationParseRelOptions
 *		Convert pg_class.reloptions into pre-parsed rd_options
 *
 * tuple is the real pg_class tuple (not rd_rel!) for relation
 *
 * Note: rd_rel and (if an index) rd_am must be valid already
B
Bruce Momjian 已提交
833 834
 */
static void
835
RelationParseRelOptions(Relation relation, HeapTuple tuple)
B
Bruce Momjian 已提交
836
{
837 838 839
	Datum		datum;
	bool		isnull;
	bytea	   *options;
B
Bruce Momjian 已提交
840

841
	relation->rd_options = NULL;
B
Bruce Momjian 已提交
842

843
	/* Fall out if relkind should not have options */
B
Bruce Momjian 已提交
844 845
	switch (relation->rd_rel->relkind)
	{
846 847
		case RELKIND_RELATION:
		case RELKIND_TOASTVALUE:
848 849 850
		case RELKIND_AOSEGMENTS:
		case RELKIND_AOBLOCKDIR:
		case RELKIND_AOVISIMAP:
851 852 853 854
		case RELKIND_INDEX:
			break;
		default:
			return;
B
Bruce Momjian 已提交
855 856
	}

857
	/*
B
Bruce Momjian 已提交
858 859 860
	 * Fetch reloptions from tuple; have to use a hardwired descriptor because
	 * we might not have any other for pg_class yet (consider executing this
	 * code for pg_class itself)
861 862 863 864 865 866 867
	 */
	datum = fastgetattr(tuple,
						Anum_pg_class_reloptions,
						GetPgClassDescriptor(),
						&isnull);
	if (isnull)
		return;
B
Bruce Momjian 已提交
868

869
	/* Parse into appropriate format; don't error out here */
B
Bruce Momjian 已提交
870 871
	switch (relation->rd_rel->relkind)
	{
872 873
		case RELKIND_RELATION:
		case RELKIND_TOASTVALUE:
874 875 876
		case RELKIND_AOSEGMENTS:
		case RELKIND_AOBLOCKDIR:
		case RELKIND_AOVISIMAP:
877 878 879 880 881 882 883 884 885 886 887 888 889 890
		case RELKIND_UNCATALOGED:
			options = heap_reloptions(relation->rd_rel->relkind, datum,
									  false);
			break;
		case RELKIND_INDEX:
			options = index_reloptions(relation->rd_am->amoptions, datum,
									   false);
			break;
		default:
			Assert(false);		/* can't get here */
			options = NULL;		/* keep compiler quiet */
			break;
	}

891 892 893 894 895 896
	/*
	 * Copy parsed data into CacheMemoryContext.  To guard against the
	 * possibility of leaks in the reloptions code, we want to do the actual
	 * parsing in the caller's memory context and copy the results into
	 * CacheMemoryContext after the fact.
	 */
897 898 899 900 901
	if (options)
	{
		relation->rd_options = MemoryContextAlloc(CacheMemoryContext,
												  VARSIZE(options));
		memcpy(relation->rd_options, options, VARSIZE(options));
902
		pfree(options);
B
Bruce Momjian 已提交
903 904 905
	}
}

906
/*
907
 *		RelationBuildTupleDesc
908
 *
909
 *		Form the relation's tuple descriptor from information in
910
 *		the pg_attribute, pg_attrdef & pg_constraint system catalogs.
911 912
 */
static void
913
RelationBuildTupleDesc(Relation relation)
914
{
915 916
	HeapTuple	pg_attribute_tuple;
	Relation	pg_attribute_desc;
917 918
	SysScanDesc pg_attribute_scan;
	ScanKeyData skey[2];
919
	int			need;
920
	TupleConstr *constr;
H
Hiroshi Inoue 已提交
921
	AttrDefault *attrdef = NULL;
922
	int			ndef = 0;
923

924 925 926 927
	/* copy some fields from pg_class row to rd_att */
	relation->rd_att->tdtypeid = relation->rd_rel->reltype;
	relation->rd_att->tdtypmod = -1;	/* unnecessary, but... */
	relation->rd_att->tdhasoid = relation->rd_rel->relhasoids;
928

929 930
	constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext,
												sizeof(TupleConstr));
H
Hiroshi Inoue 已提交
931
	constr->has_not_null = false;
932

933
	/*
934
	 * Form a scan key that selects only user attributes (attnum > 0).
B
Bruce Momjian 已提交
935 936
	 * (Eliminating system attribute rows at the index level is lots faster
	 * than fetching them.)
937
	 */
938 939 940 941 942 943 944 945
	ScanKeyInit(&skey[0],
				Anum_pg_attribute_attrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
	ScanKeyInit(&skey[1],
				Anum_pg_attribute_attnum,
				BTGreaterStrategyNumber, F_INT2GT,
				Int16GetDatum(0));
946

947
	/*
B
Bruce Momjian 已提交
948 949 950
	 * Open pg_attribute and begin a scan.	Force heap scan if we haven't yet
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).
951
	 */
952
	pg_attribute_desc = heap_open(AttributeRelationId, AccessShareLock);
953 954 955 956 957
	pg_attribute_scan = systable_beginscan(pg_attribute_desc,
										   AttributeRelidNumIndexId,
										   criticalRelcachesBuilt,
										   SnapshotNow,
										   2, skey);
958

959
	/*
B
Bruce Momjian 已提交
960
	 * add attribute data to relation->rd_att
961
	 */
962
	need = relation->rd_rel->relnatts;
963

964
	while (HeapTupleIsValid(pg_attribute_tuple = systable_getnext(pg_attribute_scan)))
965
	{
966 967
		Form_pg_attribute attp;

968
		attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
969

970 971
		if (attp->attnum <= 0 ||
			attp->attnum > relation->rd_rel->relnatts)
972
			elog(ERROR, "invalid attribute number %d for %s",
973 974
				 attp->attnum, RelationGetRelationName(relation));

975 976
		memcpy(relation->rd_att->attrs[attp->attnum - 1],
			   attp,
977
			   ATTRIBUTE_FIXED_PART_SIZE);
978

979 980
		/* Update constraint/default info */
		if (attp->attnotnull)
981
			constr->has_not_null = true;
H
Hiroshi Inoue 已提交
982

983 984 985 986
		if (attp->atthasdef)
		{
			if (attrdef == NULL)
				attrdef = (AttrDefault *)
987 988 989
					MemoryContextAllocZero(CacheMemoryContext,
										   relation->rd_rel->relnatts *
										   sizeof(AttrDefault));
990 991 992
			attrdef[ndef].adnum = attp->attnum;
			attrdef[ndef].adbin = NULL;
			ndef++;
993
		}
994 995 996
		need--;
		if (need == 0)
			break;
997
	}
998

999
	/*
B
Bruce Momjian 已提交
1000
	 * end the scan and close the attribute relation
1001
	 */
1002
	systable_endscan(pg_attribute_scan);
1003
	heap_close(pg_attribute_desc, AccessShareLock);
H
Hiroshi Inoue 已提交
1004

1005 1006 1007 1008
	if (need != 0)
		elog(ERROR, "catalog is missing %d attribute(s) for relid %u",
			 need, RelationGetRelid(relation));

1009
	/*
B
Bruce Momjian 已提交
1010 1011 1012
	 * The attcacheoff values we read from pg_attribute should all be -1
	 * ("unknown").  Verify this if assert checking is on.	They will be
	 * computed when and if needed during tuple access.
1013 1014 1015
	 */
#ifdef USE_ASSERT_CHECKING
	{
B
Bruce Momjian 已提交
1016
		int			i;
1017 1018 1019 1020 1021 1022

		for (i = 0; i < relation->rd_rel->relnatts; i++)
			Assert(relation->rd_att->attrs[i]->attcacheoff == -1);
	}
#endif

1023
	/*
B
Bruce Momjian 已提交
1024
	 * However, we can easily set the attcacheoff value for the first
B
Bruce Momjian 已提交
1025 1026
	 * attribute: it must be zero.	This eliminates the need for special cases
	 * for attnum=1 that used to exist in fastgetattr() and index_getattr().
1027
	 */
1028 1029
	if (relation->rd_rel->relnatts > 0)
		relation->rd_att->attrs[0]->attcacheoff = 0;
1030

1031 1032 1033 1034
	/*
	 * Set up constraint/default info
	 */
	if (constr->has_not_null || ndef > 0 || relation->rd_rel->relchecks)
1035
	{
1036
		relation->rd_att->constr = constr;
1037

1038
		if (ndef > 0)			/* DEFAULTs */
1039
		{
1040 1041 1042 1043 1044 1045 1046
			if (ndef < relation->rd_rel->relnatts)
				constr->defval = (AttrDefault *)
					repalloc(attrdef, ndef * sizeof(AttrDefault));
			else
				constr->defval = attrdef;
			constr->num_defval = ndef;
			AttrDefaultFetch(relation);
1047
		}
1048 1049
		else
			constr->num_defval = 0;
1050

1051
		if (relation->rd_rel->relchecks > 0)	/* CHECKs */
1052
		{
1053 1054
			constr->num_check = relation->rd_rel->relchecks;
			constr->check = (ConstrCheck *)
1055
				MemoryContextAllocZero(CacheMemoryContext,
B
Bruce Momjian 已提交
1056
									constr->num_check * sizeof(ConstrCheck));
1057
			CheckConstraintFetch(relation);
1058
		}
1059 1060 1061 1062 1063 1064 1065
		else
			constr->num_check = 0;
	}
	else
	{
		pfree(constr);
		relation->rd_att->constr = NULL;
1066
	}
1067 1068
}

1069
/*
1070
 *		RelationBuildRuleLock
1071
 *
1072 1073
 *		Form the relation's rewrite rules from information in
 *		the pg_rewrite system catalog.
1074 1075 1076 1077 1078 1079 1080
 *
 * Note: The rule parsetrees are potentially very complex node structures.
 * To allow these trees to be freed when the relcache entry is flushed,
 * we make a private memory context to hold the RuleLock information for
 * each relcache entry that has associated rules.  The context is used
 * just for rule info, not for any other subsidiary data of the relcache
 * entry, because that keeps the update logic in RelationClearRelation()
B
Bruce Momjian 已提交
1081
 * manageable.	The other subsidiary data structures are simple enough
1082
 * to be easy to free explicitly, anyway.
1083 1084 1085 1086
 */
static void
RelationBuildRuleLock(Relation relation)
{
1087 1088
	MemoryContext rulescxt;
	MemoryContext oldcxt;
1089 1090 1091
	HeapTuple	rewrite_tuple;
	Relation	rewrite_desc;
	TupleDesc	rewrite_tupdesc;
1092 1093
	SysScanDesc rewrite_scan;
	ScanKeyData key;
1094 1095 1096 1097
	RuleLock   *rulelock;
	int			numlocks;
	RewriteRule **rules;
	int			maxlocks;
1098

1099
	/*
B
Bruce Momjian 已提交
1100 1101
	 * Make the private context.  Parameters are set on the assumption that
	 * it'll probably not contain much data.
1102 1103 1104
	 */
	rulescxt = AllocSetContextCreate(CacheMemoryContext,
									 RelationGetRelationName(relation),
1105 1106 1107
									 ALLOCSET_SMALL_MINSIZE,
									 ALLOCSET_SMALL_INITSIZE,
									 ALLOCSET_SMALL_MAXSIZE);
1108 1109
	relation->rd_rulescxt = rulescxt;

1110
	/*
B
Bruce Momjian 已提交
1111 1112
	 * allocate an array to hold the rewrite rules (the array is extended if
	 * necessary)
1113 1114
	 */
	maxlocks = 4;
1115 1116
	rules = (RewriteRule **)
		MemoryContextAlloc(rulescxt, sizeof(RewriteRule *) * maxlocks);
1117 1118
	numlocks = 0;

1119 1120 1121 1122 1123 1124 1125 1126
	/*
	 * form a scan key
	 */
	ScanKeyInit(&key,
				Anum_pg_rewrite_ev_class,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));

1127
	/*
B
Bruce Momjian 已提交
1128
	 * open pg_rewrite and begin a scan
1129
	 *
1130 1131
	 * Note: since we scan the rules using RewriteRelRulenameIndexId, we will
	 * be reading the rules in name order, except possibly during
B
Bruce Momjian 已提交
1132 1133
	 * emergency-recovery operations (ie, IgnoreSystemIndexes). This in turn
	 * ensures that rules will be fired in name order.
1134
	 */
1135
	rewrite_desc = heap_open(RewriteRelationId, AccessShareLock);
1136 1137
	rewrite_tupdesc = RelationGetDescr(rewrite_desc);

1138 1139 1140 1141
	rewrite_scan = systable_beginscan(rewrite_desc,
									  RewriteRelRulenameIndexId,
									  true, SnapshotNow,
									  1, &key);
1142

1143
	while (HeapTupleIsValid(rewrite_tuple = systable_getnext(rewrite_scan)))
1144
	{
1145
		Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
1146
		bool		isnull;
1147 1148
		Datum		rule_datum;
		char	   *rule_str;
1149
		RewriteRule *rule;
1150

1151 1152
		rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
												  sizeof(RewriteRule));
1153

1154
		rule->ruleId = HeapTupleGetOid(rewrite_tuple);
1155

1156 1157
		rule->event = rewrite_form->ev_type - '0';
		rule->attrno = rewrite_form->ev_attr;
1158
		rule->enabled = rewrite_form->ev_enabled;
1159 1160
		rule->isInstead = rewrite_form->is_instead;

1161
		/*
B
Bruce Momjian 已提交
1162 1163 1164 1165
		 * Must use heap_getattr to fetch ev_action and ev_qual.  Also, the
		 * rule strings are often large enough to be toasted.  To avoid
		 * leaking memory in the caller's context, do the detoasting here so
		 * we can free the detoasted version.
1166 1167
		 */
		rule_datum = heap_getattr(rewrite_tuple,
1168
								  Anum_pg_rewrite_ev_action,
1169
								  rewrite_tupdesc,
B
Bruce Momjian 已提交
1170
								  &isnull);
B
Bruce Momjian 已提交
1171
		Assert(!isnull);
1172
		rule_str = TextDatumGetCString(rule_datum);
1173
		oldcxt = MemoryContextSwitchTo(rulescxt);
1174
		rule->actions = (List *) stringToNode(rule_str);
1175
		MemoryContextSwitchTo(oldcxt);
1176
		pfree(rule_str);
1177

1178 1179 1180 1181
		rule_datum = heap_getattr(rewrite_tuple,
								  Anum_pg_rewrite_ev_qual,
								  rewrite_tupdesc,
								  &isnull);
B
Bruce Momjian 已提交
1182
		Assert(!isnull);
1183
		rule_str = TextDatumGetCString(rule_datum);
1184
		oldcxt = MemoryContextSwitchTo(rulescxt);
1185
		rule->qual = (Node *) stringToNode(rule_str);
1186
		MemoryContextSwitchTo(oldcxt);
1187
		pfree(rule_str);
1188

1189 1190
		/*
		 * We want the rule's table references to be checked as though by the
B
Bruce Momjian 已提交
1191
		 * table owner, not the user referencing the rule.	Therefore, scan
1192
		 * through the rule's actions and set the checkAsUser field on all
B
Bruce Momjian 已提交
1193
		 * rtable entries.	We have to look at the qual as well, in case it
1194 1195
		 * contains sublinks.
		 *
B
Bruce Momjian 已提交
1196 1197 1198 1199 1200
		 * The reason for doing this when the rule is loaded, rather than when
		 * it is stored, is that otherwise ALTER TABLE OWNER would have to
		 * grovel through stored rules to update checkAsUser fields. Scanning
		 * the rule tree during load is relatively cheap (compared to
		 * constructing it in the first place), so we do it here.
1201 1202 1203 1204
		 */
		setRuleCheckAsUser((Node *) rule->actions, relation->rd_rel->relowner);
		setRuleCheckAsUser(rule->qual, relation->rd_rel->relowner);

1205
		if (numlocks >= maxlocks)
1206 1207
		{
			maxlocks *= 2;
1208 1209
			rules = (RewriteRule **)
				repalloc(rules, sizeof(RewriteRule *) * maxlocks);
1210
		}
1211
		rules[numlocks++] = rule;
1212
	}
1213

1214
	/*
B
Bruce Momjian 已提交
1215
	 * end the scan and close the attribute relation
1216
	 */
1217
	systable_endscan(rewrite_scan);
1218
	heap_close(rewrite_desc, AccessShareLock);
1219

1220
	/*
B
Bruce Momjian 已提交
1221
	 * form a RuleLock and insert into relation
1222
	 */
1223
	rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
1224 1225 1226 1227
	rulelock->numLocks = numlocks;
	rulelock->rules = rules;

	relation->rd_rules = rulelock;
1228 1229
}

1230
/*
1231 1232 1233 1234 1235 1236 1237 1238 1239
 *		equalRuleLocks
 *
 *		Determine whether two RuleLocks are equivalent
 *
 *		Probably this should be in the rules code someplace...
 */
static bool
equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
{
1240
	int			i;
1241

1242
	/*
B
Bruce Momjian 已提交
1243
	 * As of 7.3 we assume the rule ordering is repeatable, because
B
Bruce Momjian 已提交
1244 1245
	 * RelationBuildRuleLock should read 'em in a consistent order.  So just
	 * compare corresponding slots.
1246
	 */
1247 1248 1249 1250 1251 1252 1253 1254 1255
	if (rlock1 != NULL)
	{
		if (rlock2 == NULL)
			return false;
		if (rlock1->numLocks != rlock2->numLocks)
			return false;
		for (i = 0; i < rlock1->numLocks; i++)
		{
			RewriteRule *rule1 = rlock1->rules[i];
1256 1257 1258
			RewriteRule *rule2 = rlock2->rules[i];

			if (rule1->ruleId != rule2->ruleId)
1259 1260 1261 1262 1263
				return false;
			if (rule1->event != rule2->event)
				return false;
			if (rule1->attrno != rule2->attrno)
				return false;
1264 1265
			if (rule1->enabled != rule2->enabled)
				return false;
1266 1267
			if (rule1->isInstead != rule2->isInstead)
				return false;
1268
			if (!equal(rule1->qual, rule2->qual))
1269
				return false;
1270
			if (!equal(rule1->actions, rule2->actions))
1271 1272 1273 1274 1275 1276
				return false;
		}
	}
	else if (rlock2 != NULL)
		return false;
	return true;
1277 1278 1279
}


1280
/*
1281 1282
 *		RelationBuildDesc
 *
1283 1284 1285 1286
 *		Build a relation descriptor.  The caller must hold at least
 *		AccessShareLock on the target relid.
 *
 *		The new descriptor is inserted into the hash table if insertIt is true.
1287 1288 1289 1290
 *
 *		Returns NULL if no pg_class row could be found for the given relid
 *		(suggesting we are trying to access a just-deleted relation).
 *		Any other error is reported via elog.
1291
 */
1292
static Relation
1293
RelationBuildDesc(Oid targetRelId, bool insertIt)
1294
{
1295 1296
	Relation	relation;
	Oid			relid;
1297
	Relation    pg_class_relation;
1298
	HeapTuple	pg_class_tuple;
1299
	Form_pg_class relp;
1300

1301
	/*
B
Bruce Momjian 已提交
1302
	 * find the tuple in pg_class corresponding to the given relation id
1303
	 */
1304
	pg_class_tuple = ScanPgRelation(targetRelId, true, &pg_class_relation);
1305

1306
	/*
B
Bruce Momjian 已提交
1307
	 * if no such tuple exists, return NULL
1308 1309 1310 1311
	 */
	if (!HeapTupleIsValid(pg_class_tuple))
		return NULL;

1312
	/*
B
Bruce Momjian 已提交
1313
	 * get information from the pg_class_tuple
1314
	 */
1315
	relid = HeapTupleGetOid(pg_class_tuple);
1316
	relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
1317
	heap_close(pg_class_relation, AccessShareLock);
1318

1319
	/*
B
Bruce Momjian 已提交
1320
	 * allocate storage for the relation descriptor, and copy pg_class_tuple
1321
	 * to relation->rd_rel and new fields into relation->rd_newfields.
1322
	 */
1323
	relation = AllocateRelationDesc(relp);
1324

1325
	/*
B
Bruce Momjian 已提交
1326
	 * initialize the relation's relation id (relation->rd_id)
1327
	 */
1328
	RelationGetRelid(relation) = relid;
1329

1330
	/*
B
Bruce Momjian 已提交
1331 1332 1333
	 * normal relations are not nailed into the cache; nor can a pre-existing
	 * relation be new.  It could be temp though.  (Actually, it could be new
	 * too, but it's okay to forget that fact if forced to flush the entry.)
1334
	 */
1335
	relation->rd_refcnt = 0;
1336
	relation->rd_isnailed = false;
1337
	relation->rd_createSubid = InvalidSubTransactionId;
1338
	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1339
	relation->rd_istemp = isTempOrToastNamespace(relation->rd_rel->relnamespace);
1340 1341
	relation->rd_issyscat = (strncmp(relation->rd_rel->relname.data, "pg_", 3) == 0);

1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352
	/*
	 * CDB: On QEs, temp relations must use shared buffer cache so data
	 * will be visible to all segmates.  On QD, sequence objects must
	 * use shared buffer cache so data will be visible to sequence server.
	 */
	if (relation->rd_istemp &&
		relation->rd_rel->relkind != RELKIND_SEQUENCE &&
		Gp_role != GP_ROLE_EXECUTE)
		relation->rd_isLocalBuf = true;
	else
		relation->rd_isLocalBuf = false;
1353

1354
	/*
B
Bruce Momjian 已提交
1355
	 * initialize the tuple descriptor (relation->rd_att).
1356
	 */
1357
	RelationBuildTupleDesc(relation);
1358

1359
	/*
B
Bruce Momjian 已提交
1360
	 * Fetch rules and triggers that affect this relation
1361
	 */
1362
	if (relation->rd_rel->relhasrules)
1363 1364
		RelationBuildRuleLock(relation);
	else
1365
	{
1366
		relation->rd_rules = NULL;
1367 1368
		relation->rd_rulescxt = NULL;
	}
1369

1370
	if (relation->rd_rel->reltriggers > 0)
1371 1372 1373 1374
		RelationBuildTriggers(relation);
	else
		relation->trigdesc = NULL;

1375
	/*
1376
	 * if it's an index, initialize index-related information
1377
	 */
1378
	if (OidIsValid(relation->rd_rel->relam))
1379
		RelationInitIndexAccessInfo(relation);
1380

1381 1382 1383 1384 1385 1386 1387 1388 1389
	/*
	 * if it's an append-only table, get information from pg_appendonly
	 */
	if (relation->rd_rel->relstorage == RELSTORAGE_AOROWS ||
		relation->rd_rel->relstorage == RELSTORAGE_AOCOLS)
	{
		RelationInitAppendOnlyInfo(relation);
	}

1390 1391 1392
	/* extract reloptions if any */
	RelationParseRelOptions(relation, pg_class_tuple);

1393
	/*
B
Bruce Momjian 已提交
1394
	 * initialize the relation lock manager information
1395 1396 1397
	 */
	RelationInitLockInfo(relation);		/* see lmgr.c */

1398 1399 1400 1401
	/*
	 * initialize physical addressing information for the relation
	 */
	RelationInitPhysicalAddr(relation);
1402

1403
	/* make sure relation is marked as having no open file yet */
1404
	relation->rd_smgr = NULL;
1405

1406 1407 1408 1409 1410 1411 1412 1413 1414
    /*
     * initialize Greenplum Database partitioning info
     */
    if (relation->rd_rel->relkind == RELKIND_RELATION &&
        !IsSystemRelation(relation))
        relation->rd_cdbpolicy = GpPolicyFetch(CacheMemoryContext, targetRelId);

    relation->rd_cdbDefaultStatsWarningIssued = false;

B
Bruce Momjian 已提交
1415 1416 1417 1418 1419
	/*
	 * now we can free the memory allocated for pg_class_tuple
	 */
	heap_freetuple(pg_class_tuple);

1420
	/*
1421
	 * Insert newly created relation into relcache hash table, if requested.
1422
	 */
1423 1424
	if (insertIt)
		RelationCacheInsert(relation);
1425

1426 1427 1428
	/* It's fully valid */
	relation->rd_isvalid = true;

1429
	return relation;
1430 1431
}

1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448
/*
 * Initialize the physical addressing info (RelFileNode) for a relcache entry
 */
static void
RelationInitPhysicalAddr(Relation relation)
{
	if (relation->rd_rel->reltablespace)
		relation->rd_node.spcNode = relation->rd_rel->reltablespace;
	else
		relation->rd_node.spcNode = MyDatabaseTableSpace;
	if (relation->rd_rel->relisshared)
		relation->rd_node.dbNode = InvalidOid;
	else
		relation->rd_node.dbNode = MyDatabaseId;
	relation->rd_node.relNode = relation->rd_rel->relfilenode;
}

1449 1450 1451 1452 1453
/*
 * Initialize index-access-method support data for an index relation
 */
void
RelationInitIndexAccessInfo(Relation relation)
1454
{
1455 1456
	HeapTuple	tuple;
	Form_pg_am	aform;
1457
	Datum		indclassDatum;
1458
	Datum		indoptionDatum;
1459
	bool		isnull;
1460
	oidvector  *indclass;
B
Bruce Momjian 已提交
1461
	int2vector *indoption;
1462
	MemoryContext indexcxt;
1463
	MemoryContext oldcontext;
1464
	int			natts;
1465 1466
	uint16		amstrategies;
	uint16		amsupport;
1467 1468

	/*
1469
	 * Make a copy of the pg_index entry for the index.  Since pg_index
B
Bruce Momjian 已提交
1470 1471
	 * contains variable-length and possibly-null fields, we have to do this
	 * honestly rather than just treating it as a Form_pg_index struct.
1472 1473 1474 1475 1476
	 */
	tuple = SearchSysCache(INDEXRELID,
						   ObjectIdGetDatum(RelationGetRelid(relation)),
						   0, 0, 0);
	if (!HeapTupleIsValid(tuple))
1477
		elog(ERROR, "cache lookup failed for index %u",
1478
			 RelationGetRelid(relation));
1479 1480 1481 1482
	oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_indextuple = heap_copytuple(tuple);
	relation->rd_index = (Form_pg_index) GETSTRUCT(relation->rd_indextuple);
	MemoryContextSwitchTo(oldcontext);
1483 1484 1485 1486 1487 1488 1489 1490 1491
	ReleaseSysCache(tuple);

	/*
	 * Make a copy of the pg_am entry for the index's access method
	 */
	tuple = SearchSysCache(AMOID,
						   ObjectIdGetDatum(relation->rd_rel->relam),
						   0, 0, 0);
	if (!HeapTupleIsValid(tuple))
1492
		elog(ERROR, "cache lookup failed for access method %u",
1493 1494 1495 1496 1497
			 relation->rd_rel->relam);
	aform = (Form_pg_am) MemoryContextAlloc(CacheMemoryContext, sizeof *aform);
	memcpy(aform, GETSTRUCT(tuple), sizeof *aform);
	ReleaseSysCache(tuple);
	relation->rd_am = aform;
1498 1499

	natts = relation->rd_rel->relnatts;
1500
	if (natts != relation->rd_index->indnatts)
1501
		elog(ERROR, "relnatts disagrees with indnatts for index %u",
1502
			 RelationGetRelid(relation));
1503 1504
	amstrategies = aform->amstrategies;
	amsupport = aform->amsupport;
1505

1506
	/*
B
Bruce Momjian 已提交
1507 1508 1509
	 * Make the private context to hold index access info.	The reason we need
	 * a context, and not just a couple of pallocs, is so that we won't leak
	 * any subsidiary info attached to fmgr lookup records.
1510 1511 1512 1513 1514 1515
	 *
	 * Context parameters are set on the assumption that it'll probably not
	 * contain much data.
	 */
	indexcxt = AllocSetContextCreate(CacheMemoryContext,
									 RelationGetRelationName(relation),
1516 1517 1518
									 ALLOCSET_SMALL_MINSIZE,
									 ALLOCSET_SMALL_INITSIZE,
									 ALLOCSET_SMALL_MAXSIZE);
1519 1520 1521 1522 1523
	relation->rd_indexcxt = indexcxt;

	/*
	 * Allocate arrays to hold data
	 */
1524 1525 1526
	relation->rd_aminfo = (RelationAmInfo *)
		MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));

1527 1528 1529 1530 1531
	relation->rd_opfamily = (Oid *)
		MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
	relation->rd_opcintype = (Oid *)
		MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));

1532
	if (amstrategies > 0)
1533
		relation->rd_operator = (Oid *)
1534 1535
			MemoryContextAllocZero(indexcxt,
								   natts * amstrategies * sizeof(Oid));
1536
	else
1537
		relation->rd_operator = NULL;
1538

1539
	if (amsupport > 0)
1540
	{
1541
		int			nsupport = natts * amsupport;
1542

1543
		relation->rd_support = (RegProcedure *)
1544
			MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
1545
		relation->rd_supportinfo = (FmgrInfo *)
1546
			MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
1547 1548
	}
	else
1549
	{
1550 1551
		relation->rd_support = NULL;
		relation->rd_supportinfo = NULL;
1552
	}
1553

1554 1555 1556
	relation->rd_indoption = (int16 *)
		MemoryContextAllocZero(indexcxt, natts * sizeof(int16));

1557 1558
	/*
	 * indclass cannot be referenced directly through the C struct, because it
B
Bruce Momjian 已提交
1559 1560
	 * comes after the variable-width indkey field.  Must extract the datum
	 * the hard way...
1561 1562 1563 1564 1565 1566 1567
	 */
	indclassDatum = fastgetattr(relation->rd_indextuple,
								Anum_pg_index_indclass,
								GetPgIndexDescriptor(),
								&isnull);
	Assert(!isnull);
	indclass = (oidvector *) DatumGetPointer(indclassDatum);
1568

1569
	/*
B
Bruce Momjian 已提交
1570 1571 1572
	 * Fill the operator and support procedure OID arrays, as well as the info
	 * about opfamilies and opclass input types.  (aminfo and supportinfo are
	 * left as zeroes, and are filled on-the-fly when used)
1573
	 */
1574 1575 1576
	IndexSupportInitialize(indclass,
						   relation->rd_operator, relation->rd_support,
						   relation->rd_opfamily, relation->rd_opcintype,
1577
						   amstrategies, amsupport, natts);
1578

1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589
	/*
	 * Similarly extract indoption and copy it to the cache entry
	 */
	indoptionDatum = fastgetattr(relation->rd_indextuple,
								 Anum_pg_index_indoption,
								 GetPgIndexDescriptor(),
								 &isnull);
	Assert(!isnull);
	indoption = (int2vector *) DatumGetPointer(indoptionDatum);
	memcpy(relation->rd_indoption, indoption->values, natts * sizeof(int16));

1590 1591 1592 1593 1594
	/*
	 * expressions and predicate cache will be filled later
	 */
	relation->rd_indexprs = NIL;
	relation->rd_indpred = NIL;
1595
	relation->rd_amcache = NULL;
1596 1597
}

1598
/*
1599
 * IndexSupportInitialize
1600
 *		Initializes an index's cached opclass information,
1601
 *		given the index's pg_index.indclass entry.
1602
 *
1603 1604
 * Data is returned into *indexOperator, *indexSupport, *opFamily, and
 * *opcInType, which are arrays allocated by the caller.
1605 1606 1607 1608 1609 1610 1611
 *
 * The caller also passes maxStrategyNumber, maxSupportNumber, and
 * maxAttributeNumber, since these indicate the size of the arrays
 * it has allocated --- but in practice these numbers must always match
 * those obtainable from the system catalog entries for the index and
 * access method.
 */
1612
void
1613
IndexSupportInitialize(oidvector *indclass,
1614 1615
					   Oid *indexOperator,
					   RegProcedure *indexSupport,
1616 1617
					   Oid *opFamily,
					   Oid *opcInType,
1618 1619 1620 1621 1622 1623 1624 1625 1626 1627
					   StrategyNumber maxStrategyNumber,
					   StrategyNumber maxSupportNumber,
					   AttrNumber maxAttributeNumber)
{
	int			attIndex;

	for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
	{
		OpClassCacheEnt *opcentry;

1628
		if (!OidIsValid(indclass->values[attIndex]))
1629
			elog(ERROR, "bogus pg_index tuple");
1630 1631

		/* look up the info for this opclass, using a cache */
1632
		opcentry = LookupOpclassInfo(indclass->values[attIndex],
1633 1634 1635
									 maxStrategyNumber,
									 maxSupportNumber);

1636
		/* copy cached data into relcache entry */
1637 1638
		opFamily[attIndex] = opcentry->opcfamily;
		opcInType[attIndex] = opcentry->opcintype;
1639
		if (maxStrategyNumber > 0)
1640 1641 1642
			memcpy(&indexOperator[attIndex * maxStrategyNumber],
				   opcentry->operatorOids,
				   maxStrategyNumber * sizeof(Oid));
1643
		if (maxSupportNumber > 0)
1644 1645 1646
			memcpy(&indexSupport[attIndex * maxSupportNumber],
				   opcentry->supportProcs,
				   maxSupportNumber * sizeof(RegProcedure));
1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661
	}
}

/*
 * LookupOpclassInfo
 *
 * This routine maintains a per-opclass cache of the information needed
 * by IndexSupportInitialize().  This is more efficient than relying on
 * the catalog cache, because we can load all the info about a particular
 * opclass in a single indexscan of pg_amproc or pg_amop.
 *
 * The information from pg_am about expected range of strategy and support
 * numbers is passed in, rather than being looked up, mainly because the
 * caller will have it already.
 *
1662 1663 1664
 * Note there is no provision for flushing the cache.  This is OK at the
 * moment because there is no way to ALTER any interesting properties of an
 * existing opclass --- all you can do is drop it, which will result in
1665
 * a useless but harmless dead entry in the cache.  To support altering
1666 1667 1668
 * opclass membership (not the same as opfamily membership!), we'd need to
 * be able to flush this cache as well as the contents of relcache entries
 * for indexes.
1669 1670 1671 1672 1673 1674 1675 1676
 */
static OpClassCacheEnt *
LookupOpclassInfo(Oid operatorClassOid,
				  StrategyNumber numStrats,
				  StrategyNumber numSupport)
{
	OpClassCacheEnt *opcentry;
	bool		found;
1677 1678
	Relation	rel;
	SysScanDesc scan;
1679
	ScanKeyData skey[3];
1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693
	HeapTuple	htup;
	bool		indexOK;

	if (OpClassCache == NULL)
	{
		/* First time through: initialize the opclass cache */
		HASHCTL		ctl;

		if (!CacheMemoryContext)
			CreateCacheMemoryContext();

		MemSet(&ctl, 0, sizeof(ctl));
		ctl.keysize = sizeof(Oid);
		ctl.entrysize = sizeof(OpClassCacheEnt);
1694
		ctl.hash = oid_hash;
1695 1696 1697 1698 1699 1700 1701 1702
		OpClassCache = hash_create("Operator class cache", 64,
								   &ctl, HASH_ELEM | HASH_FUNCTION);
	}

	opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
											   (void *) &operatorClassOid,
											   HASH_ENTER, &found);

1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724
	if (!found)
	{
		/* Need to allocate memory for new entry */
		opcentry->valid = false;	/* until known OK */
		opcentry->numStrats = numStrats;
		opcentry->numSupport = numSupport;

		if (numStrats > 0)
			opcentry->operatorOids = (Oid *)
				MemoryContextAllocZero(CacheMemoryContext,
									   numStrats * sizeof(Oid));
		else
			opcentry->operatorOids = NULL;

		if (numSupport > 0)
			opcentry->supportProcs = (RegProcedure *)
				MemoryContextAllocZero(CacheMemoryContext,
									   numSupport * sizeof(RegProcedure));
		else
			opcentry->supportProcs = NULL;
	}
	else
1725 1726 1727 1728 1729
	{
		Assert(numStrats == opcentry->numStrats);
		Assert(numSupport == opcentry->numSupport);
	}

1730 1731 1732 1733 1734 1735 1736 1737 1738 1739
	/*
	 * When testing for cache-flush hazards, we intentionally disable the
	 * operator class cache and force reloading of the info on each call.
	 * This is helpful because we want to test the case where a cache flush
	 * occurs while we are loading the info, and it's very hard to provoke
	 * that if this happens only once per opclass per backend.
	 */
#if defined(CLOBBER_CACHE_ALWAYS)
	opcentry->valid = false;
#endif
1740

1741 1742
	if (opcentry->valid)
		return opcentry;
1743 1744

	/*
1745 1746
	 * Need to fill in new entry.
	 *
B
Bruce Momjian 已提交
1747 1748 1749
	 * To avoid infinite recursion during startup, force heap scans if we're
	 * looking up info for the opclasses used by the indexes we would like to
	 * reference here.
1750 1751 1752 1753 1754
	 */
	indexOK = criticalRelcachesBuilt ||
		(operatorClassOid != OID_BTREE_OPS_OID &&
		 operatorClassOid != INT2_BTREE_OPS_OID);

1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782
	/*
	 * We have to fetch the pg_opclass row to determine its opfamily and
	 * opcintype, which are needed to look up the operators and functions.
	 * It'd be convenient to use the syscache here, but that probably doesn't
	 * work while bootstrapping.
	 */
	ScanKeyInit(&skey[0],
				ObjectIdAttributeNumber,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(operatorClassOid));
	rel = heap_open(OperatorClassRelationId, AccessShareLock);
	scan = systable_beginscan(rel, OpclassOidIndexId, indexOK,
							  SnapshotNow, 1, skey);

	if (HeapTupleIsValid(htup = systable_getnext(scan)))
	{
		Form_pg_opclass opclassform = (Form_pg_opclass) GETSTRUCT(htup);

		opcentry->opcfamily = opclassform->opcfamily;
		opcentry->opcintype = opclassform->opcintype;
	}
	else
		elog(ERROR, "could not find tuple for opclass %u", operatorClassOid);

	systable_endscan(scan);
	heap_close(rel, AccessShareLock);


1783
	/*
B
Bruce Momjian 已提交
1784
	 * Scan pg_amop to obtain operators for the opclass.  We only fetch the
1785
	 * default ones (those with lefttype = righttype = opcintype).
1786 1787 1788
	 */
	if (numStrats > 0)
	{
1789
		ScanKeyInit(&skey[0],
1790
					Anum_pg_amop_amopfamily,
1791
					BTEqualStrategyNumber, F_OIDEQ,
1792
					ObjectIdGetDatum(opcentry->opcfamily));
1793
		ScanKeyInit(&skey[1],
1794 1795 1796 1797 1798
					Anum_pg_amop_amoplefttype,
					BTEqualStrategyNumber, F_OIDEQ,
					ObjectIdGetDatum(opcentry->opcintype));
		ScanKeyInit(&skey[2],
					Anum_pg_amop_amoprighttype,
1799
					BTEqualStrategyNumber, F_OIDEQ,
1800
					ObjectIdGetDatum(opcentry->opcintype));
1801 1802
		rel = heap_open(AccessMethodOperatorRelationId, AccessShareLock);
		scan = systable_beginscan(rel, AccessMethodStrategyIndexId, indexOK,
1803
								  SnapshotNow, 3, skey);
1804 1805

		while (HeapTupleIsValid(htup = systable_getnext(scan)))
1806 1807 1808 1809 1810
		{
			Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(htup);

			if (amopform->amopstrategy <= 0 ||
				(StrategyNumber) amopform->amopstrategy > numStrats)
1811
				elog(ERROR, "invalid amopstrategy number %d for opclass %u",
1812 1813 1814 1815 1816
					 amopform->amopstrategy, operatorClassOid);
			opcentry->operatorOids[amopform->amopstrategy - 1] =
				amopform->amopopr;
		}

1817 1818
		systable_endscan(scan);
		heap_close(rel, AccessShareLock);
1819 1820 1821
	}

	/*
B
Bruce Momjian 已提交
1822
	 * Scan pg_amproc to obtain support procs for the opclass.	We only fetch
1823
	 * the default ones (those with lefttype = righttype = opcintype).
1824 1825 1826
	 */
	if (numSupport > 0)
	{
1827
		ScanKeyInit(&skey[0],
1828
					Anum_pg_amproc_amprocfamily,
1829
					BTEqualStrategyNumber, F_OIDEQ,
1830
					ObjectIdGetDatum(opcentry->opcfamily));
1831
		ScanKeyInit(&skey[1],
1832
					Anum_pg_amproc_amproclefttype,
1833
					BTEqualStrategyNumber, F_OIDEQ,
1834 1835 1836 1837 1838
					ObjectIdGetDatum(opcentry->opcintype));
		ScanKeyInit(&skey[2],
					Anum_pg_amproc_amprocrighttype,
					BTEqualStrategyNumber, F_OIDEQ,
					ObjectIdGetDatum(opcentry->opcintype));
1839 1840
		rel = heap_open(AccessMethodProcedureRelationId, AccessShareLock);
		scan = systable_beginscan(rel, AccessMethodProcedureIndexId, indexOK,
1841
								  SnapshotNow, 3, skey);
1842 1843

		while (HeapTupleIsValid(htup = systable_getnext(scan)))
1844 1845 1846 1847 1848
		{
			Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);

			if (amprocform->amprocnum <= 0 ||
				(StrategyNumber) amprocform->amprocnum > numSupport)
1849
				elog(ERROR, "invalid amproc number %d for opclass %u",
1850 1851 1852 1853 1854 1855
					 amprocform->amprocnum, operatorClassOid);

			opcentry->supportProcs[amprocform->amprocnum - 1] =
				amprocform->amproc;
		}

1856 1857
		systable_endscan(scan);
		heap_close(rel, AccessShareLock);
1858 1859 1860 1861 1862 1863 1864 1865 1866 1867
	}

	opcentry->valid = true;
	return opcentry;
}


/*
 *		formrdesc
 *
1868 1869
 *		This is a special cut-down version of RelationBuildDesc(),
 *		used while initializing the relcache.
1870
 *		The relation descriptor is built just from the supplied parameters,
1871 1872
 *		without actually looking at any system table entries.  We cheat
 *		quite a lot since we only need to work for a few basic system
1873 1874
 *		catalogs.
 *
1875 1876 1877
 * formrdesc is currently used for: pg_database, pg_authid, pg_auth_members,
 * pg_class, pg_attribute, pg_proc, and pg_type
 * (see RelationCacheInitializePhase2/3).
1878
 *
1879 1880
 * Note that these catalogs can't have constraints (except attnotnull),
 * default values, rules, or triggers, since we don't cope with any of that.
1881 1882 1883
 * (Well, actually, this only matters for properties that need to be valid
 * during bootstrap or before RelationCacheInitializePhase3 runs, and none of
 * these properties matter then...)
1884
 *
1885
 * NOTE: we assume we are already switched into CacheMemoryContext.
1886 1887
 */
static void
1888
formrdesc(const char *relationName, Oid relationReltype,
1889 1890
		  bool isshared, bool hasoids,
		  int natts, const FormData_pg_attribute *attrs)
1891
{
1892
	Relation	relation;
1893
	int			i;
1894
	bool		has_not_null;
1895

1896
	/*
1897
	 * allocate new relation desc, clear all fields of reldesc
1898
	 */
1899
	relation = (Relation) palloc0(sizeof(RelationData));
1900 1901 1902
	relation->rd_targblock = InvalidBlockNumber;

	/* make sure relation is marked as having no open file yet */
1903
	relation->rd_smgr = NULL;
1904

1905
	/*
1906
	 * initialize reference count: 1 because it is nailed in cache
1907
	 */
1908
	relation->rd_refcnt = 1;
1909

1910
	/*
B
Bruce Momjian 已提交
1911 1912
	 * all entries built with this routine are nailed-in-cache; none are for
	 * new or temp relations.
1913
	 */
1914
	relation->rd_isnailed = true;
1915
	relation->rd_createSubid = InvalidSubTransactionId;
1916
	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1917
	relation->rd_istemp = false;
1918 1919
	relation->rd_issyscat = (strncmp(relationName, "pg_", 3) == 0);	/* GP */
    relation->rd_isLocalBuf = false;    /*CDB*/
1920

1921
	/*
B
Bruce Momjian 已提交
1922
	 * initialize relation tuple form
1923
	 *
1924 1925
	 * The data we insert here is pretty incomplete/bogus, but it'll serve to
	 * get us launched.  RelationCacheInitializePhase2() will read the real
1926 1927 1928
	 * data from pg_class and replace what we've done here.  Note in particular
	 * that relowner is left as zero; this cues RelationCacheInitializePhase2
	 * that the real data isn't there yet.
1929
	 */
1930
	relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
1931

1932 1933
	namestrcpy(&relation->rd_rel->relname, relationName);
	relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
1934
	relation->rd_rel->reltype = relationReltype;
1935 1936

	/*
B
Bruce Momjian 已提交
1937
	 * It's important to distinguish between shared and non-shared relations,
1938
	 * even at bootstrap time, to make sure we know where they are stored.
1939
	 */
1940 1941 1942
	relation->rd_rel->relisshared = isshared;
	if (isshared)
		relation->rd_rel->reltablespace = GLOBALTABLESPACE_OID;
1943

1944 1945
	relation->rd_rel->relpages = 0;
	relation->rd_rel->reltuples = 0;
1946
	relation->rd_rel->relkind = RELKIND_RELATION;
1947
	relation->rd_rel->relstorage = RELSTORAGE_HEAP;
1948
	relation->rd_rel->relhasoids = hasoids;
1949
	relation->rd_rel->relnatts = (int16) natts;
1950

1951 1952 1953 1954 1955 1956
	/*
	 * Physical file-system information.
	 */
	relation->rd_segfile0_relationnodeinfo.isPresent = false;
	relation->rd_segfile0_relationnodeinfo.tidAllowedToBeZero = false;
	
1957
	/*
B
Bruce Momjian 已提交
1958
	 * initialize attribute tuple form
1959
	 *
B
Bruce Momjian 已提交
1960
	 * Unlike the case with the relation tuple, this data had better be right
B
Bruce Momjian 已提交
1961 1962
	 * because it will never be replaced.  The input values must be correctly
	 * defined by macros in src/include/catalog/ headers.
1963
	 */
1964
	relation->rd_att = CreateTemplateTupleDesc(natts, hasoids);
1965 1966
	relation->rd_att->tdrefcount = 1;	/* mark as refcounted */

1967 1968
	relation->rd_att->tdtypeid = relationReltype;
	relation->rd_att->tdtypmod = -1;	/* unnecessary, but... */
1969

1970
	/*
B
Bruce Momjian 已提交
1971
	 * initialize tuple desc info
1972
	 */
1973
	has_not_null = false;
1974 1975
	for (i = 0; i < natts; i++)
	{
1976
		memcpy(relation->rd_att->attrs[i],
1977 1978 1979
			   &attrs[i],
			   ATTRIBUTE_FIXED_PART_SIZE);
		has_not_null |= attrs[i].attnotnull;
1980 1981
		/* make sure attcacheoff is valid */
		relation->rd_att->attrs[i]->attcacheoff = -1;
1982 1983
	}

1984 1985 1986
	/* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
	relation->rd_att->attrs[0]->attcacheoff = 0;

1987 1988 1989 1990 1991 1992 1993 1994 1995
	/* mark not-null status */
	if (has_not_null)
	{
		TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

		constr->has_not_null = true;
		relation->rd_att->constr = constr;
	}

1996
	/*
1997
	 * initialize relation id from info in att array (my, this is ugly)
1998
	 */
1999
	RelationGetRelid(relation) = relation->rd_att->attrs[0]->attrelid;
2000
	relation->rd_rel->relfilenode = RelationGetRelid(relation);
2001

2002
	/*
2003
	 * initialize the relation lock manager information
2004 2005 2006
	 */
	RelationInitLockInfo(relation);		/* see lmgr.c */

2007 2008 2009 2010
	/*
	 * initialize physical addressing information for the relation
	 */
	RelationInitPhysicalAddr(relation);
2011

2012
	/*
B
Bruce Momjian 已提交
2013
	 * initialize the rel-has-index flag, using hardwired knowledge
2014
	 */
2015 2016 2017 2018 2019 2020
	if (IsBootstrapProcessingMode())
	{
		/* In bootstrap mode, we have no indexes */
		relation->rd_rel->relhasindex = false;
	}
	else
2021
	{
2022 2023
		/* Otherwise, all the rels formrdesc is used for have indexes */
		relation->rd_rel->relhasindex = true;
2024 2025
	}

2026
	/*
B
Bruce Momjian 已提交
2027
	 * add new reldesc to relcache
2028
	 */
2029
	RelationCacheInsert(relation);
2030 2031 2032

	/* It's fully valid */
	relation->rd_isvalid = true;
2033 2034 2035
}


2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078
static void
RelationInitAppendOnlyInfo(Relation relation)
{
	Relation	pg_appendonly_rel;
	HeapTuple	tuple;
	MemoryContext oldcontext;
	SysScanDesc scan;
	ScanKeyData skey;

	/*
	 * Check the pg_appendonly relation to be certain the ao table
	 * is there.
	 */
	pg_appendonly_rel = heap_open(AppendOnlyRelationId, AccessShareLock);

	ScanKeyInit(&skey,
				Anum_pg_appendonly_relid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
	/* FIXME: isn't there a mode in relcache code to *not* use an index? Should
	 * we do something here to obey it?
	 */
	scan = systable_beginscan(pg_appendonly_rel, AppendOnlyRelidIndexId, true,
							  SnapshotNow, 1, &skey);

	tuple = systable_getnext(scan);
	if (!tuple)
		elog(ERROR, "could not find pg_appendonly tuple for relation \"%s\"",
			 RelationGetRelationName(relation));

	/*
	 * Make a copy of the pg_appendonly entry for the table.
	 */
	oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_aotuple = heap_copytuple(tuple);
	relation->rd_appendonly = (Form_pg_appendonly) GETSTRUCT(relation->rd_aotuple);
	MemoryContextSwitchTo(oldcontext);
	systable_endscan(scan);
	heap_close(pg_appendonly_rel, AccessShareLock);

}


2079
/* ----------------------------------------------------------------
2080
 *				 Relation Descriptor Lookup Interface
2081 2082 2083
 * ----------------------------------------------------------------
 */

2084
/*
2085
 *		RelationIdGetRelation
2086
 *
2087
 *		Lookup a reldesc by OID; make one if not already in cache.
2088
 *
2089 2090 2091
 *		Returns NULL if no pg_class row could be found for the given relid
 *		(suggesting we are trying to access a just-deleted relation).
 *		Any other error is reported via elog.
2092
 *
2093 2094 2095 2096
 *		NB: caller should already have at least AccessShareLock on the
 *		relation ID, else there are nasty race conditions.
 *
 *		NB: relation ref count is incremented, or set to 1 if new entry.
2097 2098
 *		Caller should eventually decrement count.  (Usually,
 *		that happens by calling RelationClose().)
2099 2100
 */
Relation
2101
RelationIdGetRelation(Oid relationId)
2102
{
2103
	Relation	rd;
2104

2105 2106 2107
	/*
	 * first try to find reldesc in the cache
	 */
2108 2109 2110
	RelationIdCacheLookup(relationId, rd);

	if (RelationIsValid(rd))
2111
	{
2112
		RelationIncrementReferenceCount(rd);
2113
		/* revalidate cache entry if necessary */
2114
		if (!rd->rd_isvalid)
2115 2116 2117 2118 2119 2120 2121
		{
			/*
			 * Indexes only have a limited number of possible schema changes,
			 * and we don't want to use the full-blown procedure because it's
			 * a headache for indexes that reload itself depends on.
			 */
			if (rd->rd_rel->relkind == RELKIND_INDEX)
2122
				RelationReloadIndexInfo(rd);
2123 2124
			else
				RelationClearRelation(rd, true);
2125
		}
2126
		return rd;
2127
	}
2128

2129
	/*
B
Bruce Momjian 已提交
2130 2131
	 * no reldesc in the cache, so have RelationBuildDesc() build one and add
	 * it.
2132
	 */
2133
	rd = RelationBuildDesc(relationId, true);
2134 2135
	if (RelationIsValid(rd))
		RelationIncrementReferenceCount(rd);
2136

2137 2138 2139 2140
	return rd;
}

/* ----------------------------------------------------------------
2141
 *				cache invalidation support routines
2142 2143 2144
 * ----------------------------------------------------------------
 */

2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168
/*
 * RelationIncrementReferenceCount
 *		Increments relation reference count.
 *
 * Note: bootstrap mode has its own weird ideas about relation refcount
 * behavior; we ought to fix it someday, but for now, just disable
 * reference count ownership tracking in bootstrap mode.
 */
void
RelationIncrementReferenceCount(Relation rel)
{
	ResourceOwnerEnlargeRelationRefs(CurrentResourceOwner);
	rel->rd_refcnt += 1;
	if (!IsBootstrapProcessingMode())
		ResourceOwnerRememberRelationRef(CurrentResourceOwner, rel);
}

/*
 * RelationDecrementReferenceCount
 *		Decrements relation reference count.
 */
void
RelationDecrementReferenceCount(Relation rel)
{
2169 2170 2171 2172 2173 2174 2175 2176 2177 2178
	if (rel->rd_refcnt <= 0)
	{
		elog(ERROR,
			 "Relation decrement reference count found relation %u/%u/%u with bad count (reference count %d)",
			 rel->rd_node.spcNode,
			 rel->rd_node.dbNode,
			 rel->rd_node.relNode,
			 rel->rd_refcnt);
	}
	
2179 2180 2181 2182 2183
	rel->rd_refcnt -= 1;
	if (!IsBootstrapProcessingMode())
		ResourceOwnerForgetRelationRef(CurrentResourceOwner, rel);
}

2184
/*
2185 2186
 * RelationClose - close an open relation
 *
2187 2188 2189 2190 2191 2192 2193
 *	Actually, we just decrement the refcount.
 *
 *	NOTE: if compiled with -DRELCACHE_FORCE_RELEASE then relcache entries
 *	will be freed as soon as their refcount goes to zero.  In combination
 *	with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
 *	to catch references to already-released relcache entries.  It slows
 *	things down quite a bit, however.
2194 2195 2196 2197
 */
void
RelationClose(Relation relation)
{
2198 2199
	/* Note: no locking manipulations needed */
	RelationDecrementReferenceCount(relation);
2200 2201

#ifdef RELCACHE_FORCE_RELEASE
2202
	if (RelationHasReferenceCountZero(relation) &&
2203 2204
		relation->rd_createSubid == InvalidSubTransactionId &&
		relation->rd_newRelfilenodeSubid == InvalidSubTransactionId)
2205 2206
		RelationClearRelation(relation, false);
#endif
2207 2208
}

2209
/*
2210
 * RelationReloadIndexInfo - reload minimal information for an open index
2211
 *
2212 2213 2214 2215 2216 2217 2218
 *	This function is used only for indexes.  A relcache inval on an index
 *	can mean that its pg_class or pg_index row changed.  There are only
 *	very limited changes that are allowed to an existing index's schema,
 *	so we can update the relcache entry without a complete rebuild; which
 *	is fortunate because we can't rebuild an index entry that is "nailed"
 *	and/or in active use.  We support full replacement of the pg_class row,
 *	as well as updates of a few simple fields of the pg_index row.
2219
 *
2220
 *	We can't necessarily reread the catalog rows right away; we might be
2221 2222
 *	in a failed transaction when we receive the SI notification.  If so,
 *	RelationClearRelation just marks the entry as invalid by setting
2223
 *	rd_isvalid to false.  This routine is called to fix the entry when it
2224
 *	is next needed.
2225 2226 2227 2228
 *
 *	We assume that at the time we are called, we have at least AccessShareLock
 *	on the target index.  (Note: in the calls from RelationClearRelation,
 *	this is legitimate because we know the rel has positive refcount.)
2229 2230 2231 2232 2233 2234
 *
 *	If the target index is an index on pg_class or pg_index, we'd better have
 *	previously gotten at least AccessShareLock on its underlying catalog,
 *	else we are at risk of deadlock against someone trying to exclusive-lock
 *	the heap and index in that order.  This is ensured in current usage by
 *	only applying this to indexes being opened or having positive refcount.
H
Hiroshi Inoue 已提交
2235 2236
 */
static void
2237
RelationReloadIndexInfo(Relation relation)
H
Hiroshi Inoue 已提交
2238
{
2239
	bool		indexOK;
H
Hiroshi Inoue 已提交
2240
	HeapTuple	pg_class_tuple;
B
Bruce Momjian 已提交
2241
	Form_pg_class relp;
H
Hiroshi Inoue 已提交
2242

2243 2244 2245 2246 2247
	/* Should be called only for invalidated indexes */
	Assert(relation->rd_rel->relkind == RELKIND_INDEX &&
		   !relation->rd_isvalid);
	/* Should be closed at smgr level */
	Assert(relation->rd_smgr == NULL);
B
Bruce Momjian 已提交
2248

2249 2250 2251 2252 2253 2254
	/* Make sure targblock is reset in case rel was truncated */
	relation->rd_targblock = InvalidBlockNumber;
	/* Must free any AM cached data, too */
	if (relation->rd_amcache)
		pfree(relation->rd_amcache);
	relation->rd_amcache = NULL;
2255

2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268
	/*
	 * If it's a shared index, we might be called before backend startup has
	 * finished selecting a database, in which case we have no way to read
	 * pg_class yet.  However, a shared index can never have any significant
	 * schema updates, so it's okay to ignore the invalidation signal.  Just
	 * mark it valid and return without doing anything more.
	 */
	if (relation->rd_rel->relisshared && !criticalRelcachesBuilt)
	{
		relation->rd_isvalid = true;
		return;
	}

2269
	/*
2270 2271
	 * Read the pg_class row
	 *
2272 2273
	 * Don't try to use an indexscan of pg_class_oid_index to reload the info
	 * for pg_class_oid_index ...
2274
	 */
2275
	indexOK = (RelationGetRelid(relation) != ClassOidIndexId);
2276
	pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK, NULL);
H
Hiroshi Inoue 已提交
2277
	if (!HeapTupleIsValid(pg_class_tuple))
2278
		elog(ERROR, "could not find pg_class tuple for index %u",
2279
			 RelationGetRelid(relation));
H
Hiroshi Inoue 已提交
2280
	relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
2281
	memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
2282
	/* Reload reloptions in case they changed */
B
Bruce Momjian 已提交
2283 2284
	if (relation->rd_options)
		pfree(relation->rd_options);
2285 2286
	RelationParseRelOptions(relation, pg_class_tuple);
	/* done with pg_class tuple */
H
Hiroshi Inoue 已提交
2287
	heap_freetuple(pg_class_tuple);
2288 2289
	/* We must recalculate physical address in case it changed */
	RelationInitPhysicalAddr(relation);
2290 2291 2292 2293

	/* Forget gp_relation_node information -- it may have changed. */
	MemSet(&relation->rd_segfile0_relationnodeinfo, 0, sizeof(RelationNodeInfo));

2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310
	/*
	 * For a non-system index, there are fields of the pg_index row that are
	 * allowed to change, so re-read that row and update the relcache entry.
	 * Most of the info derived from pg_index (such as support function lookup
	 * info) cannot change, and indeed the whole point of this routine is to
	 * update the relcache entry without clobbering that data; so wholesale
	 * replacement is not appropriate.
	 */
	if (!IsSystemRelation(relation))
	{
		HeapTuple	tuple;
		Form_pg_index index;

		tuple = SearchSysCache(INDEXRELID,
							   ObjectIdGetDatum(RelationGetRelid(relation)),
							   0, 0, 0);
		if (!HeapTupleIsValid(tuple))
B
Bruce Momjian 已提交
2311 2312
			elog(ERROR, "cache lookup failed for index %u",
				 RelationGetRelid(relation));
2313 2314
		index = (Form_pg_index) GETSTRUCT(tuple);

2315 2316 2317 2318 2319 2320 2321 2322 2323
		/*
		 * Basically, let's just copy all the bool fields.  There are one or
		 * two of these that can't actually change in the current code, but
		 * it's not worth it to track exactly which ones they are.  None of
		 * the array fields are allowed to change, though.
		 */
		relation->rd_index->indisunique = index->indisunique;
		relation->rd_index->indisprimary = index->indisprimary;
		relation->rd_index->indisclustered = index->indisclustered;
2324
		relation->rd_index->indisvalid = index->indisvalid;
2325 2326
		relation->rd_index->indcheckxmin = index->indcheckxmin;
		relation->rd_index->indisready = index->indisready;
2327 2328

		/* Copy xmin too, as that is needed to make sense of indcheckxmin */
2329 2330
		HeapTupleHeaderSetXmin(relation->rd_indextuple->t_data,
							   HeapTupleHeaderGetXmin(tuple->t_data));
2331 2332 2333

		ReleaseSysCache(tuple);
	}
2334

2335
	/* Okay, now it's valid again */
2336
	relation->rd_isvalid = true;
H
Hiroshi Inoue 已提交
2337
}
2338

2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367
/*
 * RelationDestroyRelation
 *
 *	Physically delete a relation cache entry and all subsidiary data.
 *	Caller must already have unhooked the entry from the hash table.
 */
static void
RelationDestroyRelation(Relation relation)
{
	Assert(RelationHasReferenceCountZero(relation));

	/*
	 * Make sure smgr and lower levels close the relation's files, if they
	 * weren't closed already.  (This was probably done by caller, but let's
	 * just be real sure.)
	 */
	RelationCloseSmgr(relation);

	/*
	 * Free all the subsidiary data structures of the relcache entry,
	 * then the entry itself.
	 */
	if (relation->rd_rel)
		pfree(relation->rd_rel);
	/* can't use DecrTupleDescRefCount here */
	Assert(relation->rd_att->tdrefcount > 0);
	if (--relation->rd_att->tdrefcount == 0)
		FreeTupleDesc(relation->rd_att);
	list_free(relation->rd_indexlist);
2368
	bms_free(relation->rd_indexattr);
2369 2370 2371 2372 2373
	FreeTriggerDesc(relation->trigdesc);
	if (relation->rd_options)
		pfree(relation->rd_options);
	if (relation->rd_indextuple)
		pfree(relation->rd_indextuple);
2374 2375
	if (relation->rd_aotuple)
		pfree(relation->rd_aotuple);
2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387
	if (relation->rd_am)
		pfree(relation->rd_am);
	if (relation->rd_indexcxt)
		MemoryContextDelete(relation->rd_indexcxt);
	if (relation->rd_rulescxt)
		MemoryContextDelete(relation->rd_rulescxt);
	if (relation->rd_cdbpolicy)
		pfree(relation->rd_cdbpolicy);

	pfree(relation);
}

2388
/*
2389
 * RelationClearRelation
2390
 *
2391 2392
 *	 Physically blow away a relation cache entry, or reset it and rebuild
 *	 it from scratch (that is, from catalog entries).  The latter path is
2393 2394
 *	 used when we are notified of a change to an open relation (one with
 *	 refcount > 0).
2395
 *
2396 2397 2398 2399 2400 2401
 *	 NB: when rebuilding, we'd better hold some lock on the relation,
 *	 else the catalog data we need to read could be changing under us.
 *	 Also, a rel to be rebuilt had better have refcnt > 0.  This is because
 *	 an sinval reset could happen while we're accessing the catalogs, and
 *	 the rel would get blown away underneath us by RelationCacheInvalidate
 *	 if it has zero refcnt.
2402
 *
2403 2404 2405
 *	 The "rebuild" parameter is redundant in current usage because it has
 *	 to match the relation's refcnt status, but we keep it as a crosscheck
 *	 that we're doing what the caller expects.
2406
 */
2407
static void
2408
RelationClearRelation(Relation relation, bool rebuild)
2409
{
2410 2411 2412 2413 2414 2415 2416
	/*
	 * As per notes above, a rel to be rebuilt MUST have refcnt > 0; while
	 * of course it would be a bad idea to blow away one with nonzero refcnt.
	 */
	Assert(rebuild ?
		   !RelationHasReferenceCountZero(relation) :
		   RelationHasReferenceCountZero(relation));
2417 2418

	/*
2419
	 * Make sure smgr and lower levels close the relation's files, if they
B
Bruce Momjian 已提交
2420 2421 2422 2423
	 * weren't closed already.  If the relation is not getting deleted, the
	 * next smgr access should reopen the files automatically.	This ensures
	 * that the low-level file access state is updated after, say, a vacuum
	 * truncation.
2424
	 */
2425
	RelationCloseSmgr(relation);
2426

2427
	/*
B
Bruce Momjian 已提交
2428 2429 2430
	 * Never, never ever blow away a nailed-in system relation, because we'd
	 * be unable to recover.  However, we must reset rd_targblock, in case we
	 * got called because of a relation cache flush that was triggered by
2431
	 * VACUUM.  Likewise reset the fsm and vm size info.
2432
	 *
2433 2434 2435
	 * If it's a nailed index, then we need to re-read the pg_class row to see
	 * if its relfilenode changed.	We can't necessarily do that here, because
	 * we might be in a failed transaction.  We assume it's okay to do it if
B
Bruce Momjian 已提交
2436 2437 2438
	 * there are open references to the relcache entry (cf notes for
	 * AtEOXact_RelationCache).  Otherwise just mark the entry as possibly
	 * invalid, and it'll be fixed when next opened.
2439 2440
	 */
	if (relation->rd_isnailed)
H
Hiroshi Inoue 已提交
2441
	{
2442
		relation->rd_targblock = InvalidBlockNumber;
2443 2444
		if (relation->rd_rel->relkind == RELKIND_INDEX)
		{
B
Bruce Momjian 已提交
2445
			relation->rd_isvalid = false;		/* needs to be revalidated */
2446
			if (relation->rd_refcnt > 1)
2447
				RelationReloadIndexInfo(relation);
2448
		}
2449
		return;
H
Hiroshi Inoue 已提交
2450
	}
2451

2452 2453 2454 2455
	/*
	 * Even non-system indexes should not be blown away if they are open and
	 * have valid index support information.  This avoids problems with active
	 * use of the index support information.  As with nailed indexes, we
B
Bruce Momjian 已提交
2456
	 * re-read the pg_class row to handle possible physical relocation of the
2457
	 * index, and we check for pg_index updates too.
2458 2459 2460 2461 2462
	 */
	if (relation->rd_rel->relkind == RELKIND_INDEX &&
		relation->rd_refcnt > 0 &&
		relation->rd_indexcxt != NULL)
	{
B
Bruce Momjian 已提交
2463
		relation->rd_isvalid = false;	/* needs to be revalidated */
2464
		RelationReloadIndexInfo(relation);
2465 2466 2467
		return;
	}

2468 2469
	/* Mark it invalid until we've finished rebuild */
	relation->rd_isvalid = false;
2470

2471
	/*
2472
	 * If we're really done with the relcache entry, blow it away. But if
B
Bruce Momjian 已提交
2473 2474 2475
	 * someone is still using it, reconstruct the whole deal without moving
	 * the physical RelationData record (so that the someone's pointer is
	 * still valid).
2476
	 */
2477
	if (!rebuild)
2478
	{
2479 2480 2481 2482 2483
		/* Remove it from the hash table */
		RelationCacheDelete(relation);

		/* And release storage */
		RelationDestroyRelation(relation);
2484 2485 2486
	}
	else
	{
2487
		/*
2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502
		 * Our strategy for rebuilding an open relcache entry is to build
		 * a new entry from scratch, swap its contents with the old entry,
		 * and finally delete the new entry (along with any infrastructure
		 * swapped over from the old entry).  This is to avoid trouble in case
		 * an error causes us to lose control partway through.  The old entry
		 * will still be marked !rd_isvalid, so we'll try to rebuild it again
		 * on next access.  Meanwhile it's not any less valid than it was
		 * before, so any code that might expect to continue accessing it
		 * isn't hurt by the rebuild failure.  (Consider for example a
		 * subtransaction that ALTERs a table and then gets cancelled partway
		 * through the cache entry rebuild.  The outer transaction should
		 * still see the not-modified cache entry as valid.)  The worst
		 * consequence of an error is leaking the necessarily-unreferenced
		 * new entry, and this shouldn't happen often enough for that to be
		 * a big problem.
2503
		 *
2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519
		 * When rebuilding an open relcache entry, we must preserve ref count
		 * and rd_createSubid/rd_newRelfilenodeSubid state.  Also attempt to
		 * preserve the pg_class entry (rd_rel), tupledesc, and rewrite-rule
		 * substructures in place, because various places assume that these
		 * structures won't move while they are working with an open relcache
		 * entry.  (Note: the refcount mechanism for tupledescs might someday
		 * allow us to remove this hack for the tupledesc.)
 		 *
 		 * Note that this process does not touch CurrentResourceOwner; which
 		 * is good because whatever ref counts the entry may have do not
 		 * necessarily belong to that resource owner.
 		 */
		Relation	newrel;
 		Oid			save_relid = RelationGetRelid(relation);
		bool		keep_tupdesc;
		bool		keep_rules;
2520
		bool		keep_pt_info;
2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533

		/* Build temporary entry, but don't link it into hashtable */
		newrel = RelationBuildDesc(save_relid, false);
		if (newrel == NULL)
 		{
 			/* Should only get here if relation was deleted */
			RelationCacheDelete(relation);
			RelationDestroyRelation(relation);
 			elog(ERROR, "relation %u deleted while still in use", save_relid);
 		}
 
		keep_tupdesc = equalTupleDescs(relation->rd_att, newrel->rd_att, true);
		keep_rules = equalRuleLocks(relation->rd_rules, newrel->rd_rules);
2534 2535
		keep_pt_info = (relation->rd_rel->relfilenode ==
						newrel->rd_rel->relfilenode);
2536 2537 2538 2539 2540 2541 2542 2543 2544 2545

		/*
		 * Perform swapping of the relcache entry contents.  Within this
		 * process the old entry is momentarily invalid, so there *must*
		 * be no possibility of CHECK_FOR_INTERRUPTS within this sequence.
		 * Do it in all-in-line code for safety.
		 *
		 * Since the vast majority of fields should be swapped, our method
		 * is to swap the whole structures and then re-swap those few fields
		 * we didn't want swapped.
2546
		 */
2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560
#define SWAPFIELD(fldtype, fldname) \
		do { \
			fldtype _tmp = newrel->fldname; \
			newrel->fldname = relation->fldname; \
			relation->fldname = _tmp; \
		} while (0)

		/* swap all Relation struct fields */
 		{
			RelationData tmpstruct;

			memcpy(&tmpstruct, newrel, sizeof(RelationData));
			memcpy(newrel, relation, sizeof(RelationData));
			memcpy(relation, &tmpstruct, sizeof(RelationData));
2561
		}
2562 2563 2564 2565 2566 2567 2568 2569 2570

		/* rd_smgr must not be swapped, due to back-links from smgr level */
		SWAPFIELD(SMgrRelation, rd_smgr);
		/* rd_refcnt must be preserved */
		SWAPFIELD(int, rd_refcnt);
		/* isnailed shouldn't change */
		Assert(newrel->rd_isnailed == relation->rd_isnailed);
		/* creation sub-XIDs must be preserved */
		SWAPFIELD(SubTransactionId, rd_createSubid);
2571
		SWAPFIELD(SubTransactionId, rd_newRelfilenodeSubid);
2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587
		/* un-swap rd_rel pointers, swap contents instead */
		SWAPFIELD(Form_pg_class, rd_rel);
		/* ... but actually, we don't have to update newrel->rd_rel */
		memcpy(relation->rd_rel, newrel->rd_rel, CLASS_TUPLE_SIZE);
		/* preserve old tupledesc and rules if no logical change */
		if (keep_tupdesc)
			SWAPFIELD(TupleDesc, rd_att);
		if (keep_rules)
 		{
			SWAPFIELD(RuleLock *, rd_rules);
			SWAPFIELD(MemoryContext, rd_rulescxt);
 		}
		/* pgstat_info must be preserved */
		SWAPFIELD(struct PgStat_TableStatus *, pgstat_info);

		/* preserve persistent table information for the relation  */
2588 2589
		if (keep_pt_info)
			SWAPFIELD(struct RelationNodeInfo, rd_segfile0_relationnodeinfo);
2590 2591 2592 2593 2594

#undef SWAPFIELD

		/* And now we can throw away the temporary entry */
		RelationDestroyRelation(newrel);
2595
	}
2596 2597
}

2598
/*
2599 2600 2601 2602 2603
 * RelationFlushRelation
 *
 *	 Rebuild the relation if it is open (refcount > 0), else blow it away.
 */
static void
2604
RelationFlushRelation(Relation relation)
2605
{
2606 2607
	if (relation->rd_createSubid != InvalidSubTransactionId ||
		relation->rd_newRelfilenodeSubid != InvalidSubTransactionId)
2608 2609
	{
		/*
2610 2611
		 * New relcache entries are always rebuilt, not flushed; else we'd
		 * forget the "new" status of the relation, which is a useful
2612
		 * optimization to have.  Ditto for the new-relfilenode status.
2613 2614 2615 2616
		 *
		 * The rel could have zero refcnt here, so temporarily increment
		 * the refcnt to ensure it's safe to rebuild it.  We can assume that
		 * the current transaction has some lock on the rel already.
2617
		 */
2618 2619 2620
		RelationIncrementReferenceCount(relation);
		RelationClearRelation(relation, true);
		RelationDecrementReferenceCount(relation);
2621 2622 2623 2624
	}
	else
	{
		/*
2625
		 * Pre-existing rels can be dropped from the relcache if not open.
2626
		 */
2627
		bool	rebuild = !RelationHasReferenceCountZero(relation);
2628

2629 2630
		RelationClearRelation(relation, rebuild);
	}
2631 2632
}

2633
/*
2634
 * RelationForgetRelation - unconditionally remove a relcache entry
2635
 *
2636 2637
 *		   External interface for destroying a relcache entry when we
 *		   drop the relation.
2638 2639
 */
void
2640
RelationForgetRelation(Oid rid)
2641
{
2642
	Relation	relation;
2643 2644 2645

	RelationIdCacheLookup(rid, relation);

2646 2647 2648 2649
	if (!PointerIsValid(relation))
		return;					/* not in cache, nothing to do */

	if (!RelationHasReferenceCountZero(relation))
2650
		elog(ERROR, "relation %u is still open", rid);
2651 2652 2653

	/* Unconditionally destroy the relcache entry */
	RelationClearRelation(relation, false);
2654 2655
}

2656
/*
2657
 *		RelationCacheInvalidateEntry
2658 2659 2660
 *
 *		This routine is invoked for SI cache flush messages.
 *
2661 2662
 * Any relcache entry matching the relid must be flushed.  (Note: caller has
 * already determined that the relid belongs to our database or is a shared
2663
 * relation.)
2664 2665 2666 2667 2668 2669
 *
 * We used to skip local relations, on the grounds that they could
 * not be targets of cross-backend SI update messages; but it seems
 * safer to process them, so that our *own* SI update messages will
 * have the same effects during CommandCounterIncrement for both
 * local and nonlocal relations.
2670 2671
 */
void
2672
RelationCacheInvalidateEntry(Oid relationId)
2673
{
2674
	Relation	relation;
2675 2676 2677

	RelationIdCacheLookup(relationId, relation);

2678
	if (PointerIsValid(relation))
2679
	{
2680
		relcacheInvalsReceived++;
2681
		RelationFlushRelation(relation);
2682
	}
2683 2684 2685 2686
}

/*
 * RelationCacheInvalidate
2687
 *	 Blow away cached relation descriptors that have zero reference counts,
B
Bruce Momjian 已提交
2688
 *	 and rebuild those with positive reference counts.	Also reset the smgr
2689
 *	 relation cache.
2690
 *
2691
 *	 This is currently used only to recover from SI message buffer overflow,
2692
 *	 so we do not touch new-in-transaction relations; they cannot be targets
2693 2694
 *	 of cross-backend SI updates (and our own updates now go through a
 *	 separate linked list that isn't limited by the SI message buffer size).
2695 2696 2697
 *	 Likewise, we need not discard new-relfilenode-in-transaction hints,
 *	 since any invalidation of those would be a local event.
 *
2698 2699
 *	 We do this in two phases: the first pass deletes deletable items, and
 *	 the second one rebuilds the rebuildable items.  This is essential for
2700
 *	 safety, because hash_seq_search only copes with concurrent deletion of
B
Bruce Momjian 已提交
2701
 *	 the element it is currently visiting.	If a second SI overflow were to
2702 2703 2704 2705
 *	 occur while we are walking the table, resulting in recursive entry to
 *	 this routine, we could crash because the inner invocation blows away
 *	 the entry next to be visited by the outer scan.  But this way is OK,
 *	 because (a) during the first pass we won't process any more SI messages,
2706
 *	 so hash_seq_search will complete safely; (b) during the second pass we
2707
 *	 only hold onto pointers to nondeletable entries.
2708 2709 2710 2711 2712 2713
 *
 *	 The two-phase approach also makes it easy to ensure that we process
 *	 nailed-in-cache indexes before other nondeletable items, and that we
 *	 process pg_class_oid_index first of all.  In scenarios where a nailed
 *	 index has been given a new relfilenode, we have to detect that update
 *	 before the nailed index is used in reloading any other relcache entry.
2714 2715
 */
void
2716
RelationCacheInvalidate(void)
2717
{
2718
	HASH_SEQ_STATUS status;
2719
	RelIdCacheEnt *idhentry;
2720
	Relation	relation;
2721
	List	   *rebuildFirstList = NIL;
B
Bruce Momjian 已提交
2722
	List	   *rebuildList = NIL;
2723
	ListCell   *l;
2724 2725

	/* Phase 1 */
2726
	hash_seq_init(&status, RelationIdCache);
2727

2728
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2729
	{
2730
		relation = idhentry->reldesc;
2731

2732
		/* Must close all smgr references to avoid leaving dangling ptrs */
2733
		RelationCloseSmgr(relation);
2734

2735
		/* Ignore new relations, since they are never SI targets */
2736
		if (relation->rd_createSubid != InvalidSubTransactionId)
2737
			continue;
2738

2739 2740
		relcacheInvalsReceived++;

2741
		if (RelationHasReferenceCountZero(relation))
2742 2743
		{
			/* Delete this entry immediately */
2744
			Assert(!relation->rd_isnailed);
2745 2746 2747 2748
			RelationClearRelation(relation, false);
		}
		else
		{
2749 2750
			/*
			 * Add this entry to list of stuff to rebuild in second pass.
B
Bruce Momjian 已提交
2751 2752
			 * pg_class_oid_index goes on the front of rebuildFirstList, other
			 * nailed indexes on the back, and everything else into
2753 2754 2755 2756 2757
			 * rebuildList (in no particular order).
			 */
			if (relation->rd_isnailed &&
				relation->rd_rel->relkind == RELKIND_INDEX)
			{
2758
				if (RelationGetRelid(relation) == ClassOidIndexId)
2759 2760 2761 2762 2763 2764
					rebuildFirstList = lcons(relation, rebuildFirstList);
				else
					rebuildFirstList = lappend(rebuildFirstList, relation);
			}
			else
				rebuildList = lcons(relation, rebuildList);
2765
		}
2766
	}
2767

2768
	/*
B
Bruce Momjian 已提交
2769 2770 2771
	 * Now zap any remaining smgr cache entries.  This must happen before we
	 * start to rebuild entries, since that may involve catalog fetches which
	 * will re-open catalog files.
2772 2773 2774
	 */
	smgrcloseall();

2775
	/* Phase 2: rebuild the items found to need rebuild in phase 1 */
2776 2777 2778 2779 2780 2781
	foreach(l, rebuildFirstList)
	{
		relation = (Relation) lfirst(l);
		RelationClearRelation(relation, true);
	}
	list_free(rebuildFirstList);
2782
	foreach(l, rebuildList)
2783
	{
2784 2785
		relation = (Relation) lfirst(l);
		RelationClearRelation(relation, true);
2786
	}
2787
	list_free(rebuildList);
2788
}
2789

2790
/*
2791
 * AtEOXact_RelationCache
2792
 *
2793
 *	Clean up the relcache at main-transaction commit or abort.
2794 2795 2796 2797 2798
 *
 * Note: this must be called *before* processing invalidation messages.
 * In the case of abort, we don't want to try to rebuild any invalidated
 * cache entries (since we can't safely do database accesses).  Therefore
 * we must reset refcnts before handling pending invalidations.
2799 2800 2801 2802 2803 2804
 *
 * As of PostgreSQL 8.1, relcache refcnts should get released by the
 * ResourceOwner mechanism.  This routine just does a debugging
 * cross-check that no pins remain.  However, we also need to do special
 * cleanup when the current transaction created any relations or made use
 * of forced index lists.
2805 2806
 */
void
2807
AtEOXact_RelationCache(bool isCommit)
2808
{
2809
	HASH_SEQ_STATUS status;
2810
	RelIdCacheEnt *idhentry;
2811

2812 2813
	/*
	 * To speed up transaction exit, we want to avoid scanning the relcache
B
Bruce Momjian 已提交
2814 2815 2816 2817
	 * unless there is actually something for this routine to do.  Other than
	 * the debug-only Assert checks, most transactions don't create any work
	 * for us to do here, so we keep a static flag that gets set if there is
	 * anything to do.	(Currently, this means either a relation is created in
2818 2819 2820 2821
	 * the current xact, or one is given a new relfilenode, or an index list
	 * is forced.)  For simplicity, the flag remains set till end of top-level
	 * transaction, even though we could clear it at subtransaction end in
	 * some cases.
2822 2823 2824
	 *
	 * MPP-3333: READERS need to *always* scan, otherwise they will not be able
	 * to maintain a coherent view of the storage layer.
2825 2826
	 */
	if (!need_eoxact_work
2827
		&& DistributedTransactionContext != DTX_CONTEXT_QE_READER
2828 2829 2830 2831 2832 2833
#ifdef USE_ASSERT_CHECKING
		&& !assert_enabled
#endif
		)
		return;

2834
	hash_seq_init(&status, RelationIdCache);
2835

2836
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2837
	{
2838
		Relation	relation = idhentry->reldesc;
2839 2840 2841 2842 2843

		/*
		 * The relcache entry's ref count should be back to its normal
		 * not-in-a-transaction state: 0 unless it's nailed in cache.
		 *
B
Bruce Momjian 已提交
2844 2845 2846
		 * In bootstrap mode, this is NOT true, so don't check it --- the
		 * bootstrap code expects relations to stay open across start/commit
		 * transaction calls.  (That seems bogus, but it's not worth fixing.)
2847 2848 2849 2850 2851 2852 2853 2854 2855 2856
		 */
#ifdef USE_ASSERT_CHECKING
		if (!IsBootstrapProcessingMode())
		{
			int			expected_refcnt;

			expected_refcnt = relation->rd_isnailed ? 1 : 0;
			Assert(relation->rd_refcnt == expected_refcnt);
		}
#endif
2857

2858 2859 2860 2861 2862 2863 2864 2865 2866
		/*
		 * QE-readers aren't properly enrolled in transactions, they
		 * just get the snapshot which corresponds -- so here, where
		 * we are maintaining their relcache, we want to just clean
		 * up (almost as if we had aborted). (MPP-3338)
		 */
		if (DistributedTransactionContext == DTX_CONTEXT_QE_ENTRY_DB_SINGLETON ||
			DistributedTransactionContext == DTX_CONTEXT_QE_READER)
		{
2867
			RelationClearRelation(relation, relation->rd_isnailed ? true : false);
2868 2869 2870
			continue;
		}

2871 2872 2873
		/*
		 * Is it a relation created in the current transaction?
		 *
B
Bruce Momjian 已提交
2874 2875 2876 2877 2878 2879
		 * During commit, reset the flag to zero, since we are now out of the
		 * creating transaction.  During abort, simply delete the relcache
		 * entry --- it isn't interesting any longer.  (NOTE: if we have
		 * forgotten the new-ness of a new relation due to a forced cache
		 * flush, the entry will get deleted anyway by shared-cache-inval
		 * processing of the aborted pg_class insertion.)
2880
		 */
2881
		if (relation->rd_createSubid != InvalidSubTransactionId)
2882
		{
2883
			if (isCommit)
2884
				relation->rd_createSubid = InvalidSubTransactionId;
2885 2886
			else
			{
2887 2888 2889 2890 2891
				/*
				 * In abort, delete the error log file before forgetting
				 * this relation.
				 */
				ErrorLogDelete(MyDatabaseId, RelationGetRelid(relation));
2892 2893 2894 2895 2896
				RelationClearRelation(relation, false);
				continue;
			}
		}

2897 2898 2899
		/*
		 * Likewise, reset the hint about the relfilenode being new.
		 */
2900
		relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
2901

2902 2903 2904 2905 2906
		/*
		 * Flush any temporary index list.
		 */
		if (relation->rd_indexvalid == 2)
		{
2907
			list_free(relation->rd_indexlist);
2908
			relation->rd_indexlist = NIL;
2909
			relation->rd_oidindex = InvalidOid;
2910 2911
			relation->rd_indexvalid = 0;
		}
2912
	}
2913

2914 2915
	/* Once done with the transaction, we can reset need_eoxact_work */
	need_eoxact_work = false;
2916
}
2917

2918 2919 2920 2921 2922 2923 2924 2925
/*
 * AtEOSubXact_RelationCache
 *
 *	Clean up the relcache at sub-transaction commit or abort.
 *
 * Note: this must be called *before* processing invalidation messages.
 */
void
2926 2927
AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
						  SubTransactionId parentSubid)
2928 2929 2930 2931
{
	HASH_SEQ_STATUS status;
	RelIdCacheEnt *idhentry;

2932
	/*
2933 2934
	 * Skip the relcache scan if nothing to do --- see notes for
	 * AtEOXact_RelationCache.
2935
	 */
2936 2937
	if (!need_eoxact_work &&
		DistributedTransactionContext != DTX_CONTEXT_QE_READER)
2938 2939
		return;

2940 2941 2942 2943 2944 2945
	hash_seq_init(&status, RelationIdCache);

	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
	{
		Relation	relation = idhentry->reldesc;

2946 2947 2948 2949 2950 2951 2952 2953
		/*
		 * As opposed to AtEOXact_RelationCache, subtransactions
		 * in readers are only caused by internal commands, and
		 * there shouldn't be interaction with global transactions,
		 * (reader gangs commit their transaction independently)
		 * we must not clear the relcache here.
		 */

2954 2955 2956
		/*
		 * Is it a relation created in the current subtransaction?
		 *
2957 2958
		 * During subcommit, mark it as belonging to the parent, instead.
		 * During subabort, simply delete the relcache entry.
2959
		 */
2960
		if (relation->rd_createSubid == mySubid)
2961 2962
		{
			if (isCommit)
2963
				relation->rd_createSubid = parentSubid;
2964
			else if (RelationHasReferenceCountZero(relation))
2965
			{
2966 2967 2968 2969 2970
				/*
				 * In abort, delete the error log file before forgetting
				 * this relation.
				 */
				ErrorLogDelete(MyDatabaseId, RelationGetRelid(relation));
2971

2972 2973 2974
				RelationClearRelation(relation, false);
				continue;
			}
2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987 2988
			else
			{
				/*
				 * Hmm, somewhere there's a (leaked?) reference to the
				 * relation.  We daren't remove the entry for fear of
				 * dereferencing a dangling pointer later.  Bleat, and mark it
				 * as not belonging to the current transaction.  Hopefully
				 * it'll get cleaned up eventually.  This must be just a
				 * WARNING to avoid error-during-error-recovery loops.
				 */
				relation->rd_createSubid = InvalidSubTransactionId;
				elog(WARNING, "cannot remove relcache entry for \"%s\" because it has nonzero refcount",
					 RelationGetRelationName(relation));
			}
2989 2990
		}

2991
		/*
B
Bruce Momjian 已提交
2992 2993
		 * Likewise, update or drop any new-relfilenode-in-subtransaction
		 * hint.
2994
		 */
2995 2996 2997 2998 2999
		if (relation->rd_newRelfilenodeSubid == mySubid)
		{
			if (isCommit)
				relation->rd_newRelfilenodeSubid = parentSubid;
			else
3000
				relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3001
		}
3002 3003 3004 3005 3006 3007 3008 3009

		/*
		 * Flush any temporary index list.
		 */
		if (relation->rd_indexvalid == 2)
		{
			list_free(relation->rd_indexlist);
			relation->rd_indexlist = NIL;
3010
			relation->rd_oidindex = InvalidOid;
3011 3012 3013 3014 3015
			relation->rd_indexvalid = 0;
		}
	}
}

3016 3017 3018 3019
/*
 * RelationCacheMarkNewRelfilenode
 *
 *	Mark the rel as having been given a new relfilenode in the current
B
Bruce Momjian 已提交
3020
 *	(sub) transaction.	This is a hint that can be used to optimize
3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032
 *	later operations on the rel in the same transaction.
 */
void
RelationCacheMarkNewRelfilenode(Relation rel)
{
	/* Mark it... */
	rel->rd_newRelfilenodeSubid = GetCurrentSubTransactionId();
	/* ... and now we have eoxact cleanup work to do */
	need_eoxact_work = true;
}


3033
/*
3034 3035 3036
 *		RelationBuildLocalRelation
 *			Build a relcache entry for an about-to-be-created relation,
 *			and enter it into the relcache.
3037
 */
3038 3039
Relation
RelationBuildLocalRelation(const char *relname,
3040
						   Oid relnamespace,
3041
						   TupleDesc tupDesc,
3042 3043
						   Oid relid,
						   Oid reltablespace,
3044
			               char relkind,            /*CDB*/
3045
						   bool shared_relation)
3046
{
3047
	Relation	rel;
3048
	MemoryContext oldcxt;
3049 3050
	int			natts = tupDesc->natts;
	int			i;
3051
	bool		has_not_null;
3052
	bool		nailit;
3053

3054
	AssertArg(natts >= 0);
3055

3056 3057 3058
	/*
	 * check for creation of a rel that must be nailed in cache.
	 *
3059 3060
	 * XXX this list had better match the relations specially handled in
	 * RelationCacheInitializePhase2/3.
3061 3062 3063
	 */
	switch (relid)
	{
3064 3065 3066
		case DatabaseRelationId:
		case AuthIdRelationId:
		case AuthMemRelationId:
3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077
		case RelationRelationId:
		case AttributeRelationId:
		case ProcedureRelationId:
		case TypeRelationId:
			nailit = true;
			break;
		default:
			nailit = false;
			break;
	}

3078 3079
	/*
	 * check that hardwired list of shared rels matches what's in the
B
Bruce Momjian 已提交
3080 3081 3082
	 * bootstrap .bki file.  If you get a failure here during initdb, you
	 * probably need to fix IsSharedRelation() to match whatever you've done
	 * to the set of shared relations.
3083 3084 3085 3086 3087
	 */
	if (shared_relation != IsSharedRelation(relid))
		elog(ERROR, "shared_relation flag for \"%s\" does not match IsSharedRelation(%u)",
			 relname, relid);

3088 3089 3090 3091 3092
	/*
	 * switch to the cache context to create the relcache entry.
	 */
	if (!CacheMemoryContext)
		CreateCacheMemoryContext();
3093

3094 3095
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

3096
	/*
3097
	 * allocate a new relation descriptor and fill in basic state fields.
3098
	 */
3099
	rel = (Relation) palloc0(sizeof(RelationData));
3100

3101 3102 3103
	rel->rd_targblock = InvalidBlockNumber;

	/* make sure relation is marked as having no open file yet */
3104
	rel->rd_smgr = NULL;
3105

3106 3107 3108
	/* mark it nailed if appropriate */
	rel->rd_isnailed = nailit;

3109
	rel->rd_refcnt = nailit ? 1 : 0;
3110

3111
	/* it's being created in this transaction */
3112
	rel->rd_createSubid = GetCurrentSubTransactionId();
3113
	rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3114

3115
	/* must flag that we have rels created in this transaction */
3116
	need_eoxact_work = true;
3117

3118
	/* is it a temporary relation? */
3119
	rel->rd_istemp = isTempOrToastNamespace(relnamespace);
3120

3121 3122 3123
	/* is it a system catalog? */
	rel->rd_issyscat = (strncmp(relname, "pg_", 3) == 0);

3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134
	/*
	 * CDB: On QEs, temp relations must use shared buffer cache so data
	 * will be visible to all segmates.  On QD, sequence objects must
	 * use shared buffer cache so data will be visible to sequence server.
	 */
	if (rel->rd_istemp &&
		relkind != RELKIND_SEQUENCE &&
		Gp_role != GP_ROLE_EXECUTE)
		rel->rd_isLocalBuf = true;
	else
		rel->rd_isLocalBuf = false;
3135

3136
	/*
3137
	 * create a new tuple descriptor from the one passed in.  We do this
B
Bruce Momjian 已提交
3138 3139 3140 3141
	 * partly to copy it into the cache context, and partly because the new
	 * relation can't have any defaults or constraints yet; they have to be
	 * added in later steps, because they require additions to multiple system
	 * catalogs.  We can copy attnotnull constraints here, however.
3142
	 */
3143
	rel->rd_att = CreateTupleDescCopy(tupDesc);
3144
	rel->rd_att->tdrefcount = 1;	/* mark as refcounted */
3145
	has_not_null = false;
3146
	for (i = 0; i < natts; i++)
3147
	{
3148
		rel->rd_att->attrs[i]->attnotnull = tupDesc->attrs[i]->attnotnull;
3149 3150 3151 3152 3153 3154 3155 3156 3157 3158
		has_not_null |= tupDesc->attrs[i]->attnotnull;
	}

	if (has_not_null)
	{
		TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

		constr->has_not_null = true;
		rel->rd_att->constr = constr;
	}
3159 3160 3161 3162

	/*
	 * initialize relation tuple form (caller may add/override data later)
	 */
3163
	rel->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
3164

3165 3166
	namestrcpy(&rel->rd_rel->relname, relname);
	rel->rd_rel->relnamespace = relnamespace;
3167 3168

	rel->rd_rel->relkind = RELKIND_UNCATALOGED;
3169
	rel->rd_rel->relstorage = RELSTORAGE_HEAP;
3170
	rel->rd_rel->relhasoids = rel->rd_att->tdhasoid;
3171 3172
	rel->rd_rel->relnatts = natts;
	rel->rd_rel->reltype = InvalidOid;
3173 3174
	/* needed when bootstrapping: */
	rel->rd_rel->relowner = BOOTSTRAP_SUPERUSERID;
3175

3176 3177 3178 3179 3180 3181 3182
	/*
	 * Create zeroed-out gp_relation_node data.  It will be filled in when the
	 * disk file is created.
	 */
	rel->rd_segfile0_relationnodeinfo.isPresent = false;
	rel->rd_segfile0_relationnodeinfo.tidAllowedToBeZero = false;

3183
	/*
B
Bruce Momjian 已提交
3184
	 * Insert relation physical and logical identifiers (OIDs) into the right
H
Heikki Linnakangas 已提交
3185 3186 3187 3188 3189 3190
	 * places.
	 *
	 * In PostgreSQL, the physical ID (relfilenode) is initially the same
	 * as the logical ID (OID). In GPDB, the table's logical OID is allocated
	 * in the master, and might already be in use as a relfilenode of an
	 * existing relation in a segment.
3191
	 */
3192
	rel->rd_rel->relisshared = shared_relation;
3193 3194 3195 3196 3197 3198

	RelationGetRelid(rel) = relid;

	for (i = 0; i < natts; i++)
		rel->rd_att->attrs[i]->attrelid = relid;

3199 3200 3201
	if (relid < FirstNormalObjectId /* bootstrap only */
		|| (Gp_role != GP_ROLE_EXECUTE && relkind == RELKIND_SEQUENCE))
		rel->rd_rel->relfilenode = relid;
H
Heikki Linnakangas 已提交
3202 3203
	else
	{
3204 3205 3206
		rel->rd_rel->relfilenode = GetNewRelFileNode(reltablespace, shared_relation);
		if (Gp_role == GP_ROLE_EXECUTE)
			AdvanceObjectId(relid);
H
Heikki Linnakangas 已提交
3207 3208
	}

3209
	rel->rd_rel->reltablespace = reltablespace;
3210

3211
	RelationInitLockInfo(rel);	/* see lmgr.c */
3212

3213 3214
	RelationInitPhysicalAddr(rel);

3215 3216 3217 3218
	/*
	 * Okay to insert into the relcache hash tables.
	 */
	RelationCacheInsert(rel);
3219

3220 3221 3222
	/*
	 * done building relcache entry.
	 */
3223
	MemoryContextSwitchTo(oldcxt);
3224

3225 3226 3227
	/* It's fully valid */
	rel->rd_isvalid = true;

3228 3229 3230 3231 3232
	/*
	 * Caller expects us to pin the returned entry.
	 */
	RelationIncrementReferenceCount(rel);

3233
	return rel;
3234 3235
}

3236
/*
3237
 *		RelationCacheInitialize
3238
 *
3239 3240
 *		This initializes the relation descriptor cache.  At the time
 *		that this is invoked, we can't do database access yet (mainly
3241 3242 3243 3244 3245
 *		because the transaction subsystem is not up); all we are doing
 *		is making an empty cache hashtable.  This must be done before
 *		starting the initialization transaction, because otherwise
 *		AtEOXact_RelationCache would crash if that transaction aborts
 *		before we can get the relcache set up.
3246 3247
 */

3248
#define INITRELCACHESIZE		400
3249 3250

void
3251
RelationCacheInitialize(void)
3252
{
3253 3254
	MemoryContext oldcxt;
	HASHCTL		ctl;
3255

3256
	/*
3257
	 * make sure cache memory context exists
3258
	 */
3259 3260
	if (!CacheMemoryContext)
		CreateCacheMemoryContext();
3261

3262 3263 3264
    /*
	 * switch to cache memory context
	 */
3265
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3266

3267
	/*
3268
	 * create hashtable that indexes the relcache
3269
	 */
3270
	MemSet(&ctl, 0, sizeof(ctl));
3271
	ctl.keysize = sizeof(Oid);
3272
	ctl.entrysize = sizeof(RelIdCacheEnt);
3273
	ctl.hash = oid_hash;
3274 3275
	RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
								  &ctl, HASH_ELEM | HASH_FUNCTION);
3276

3277 3278 3279 3280 3281 3282
	MemoryContextSwitchTo(oldcxt);
}

/*
 *		RelationCacheInitializePhase2
 *
3283 3284 3285 3286 3287 3288 3289
 *		This is called to prepare for access to shared catalogs during startup.
 *		We must at least set up nailed reldescs for pg_database, pg_authid,
 *		and pg_auth_members.  Ideally we'd like to have reldescs for their
 *		indexes, too.  We attempt to load this information from the shared
 *		relcache init file.  If that's missing or broken, just make phony
 *		entries for the catalogs themselves.  RelationCacheInitializePhase3
 *		will clean up as needed.
3290 3291 3292
 */
void
RelationCacheInitializePhase2(void)
3293 3294 3295 3296 3297 3298 3299 3300 3301 3302 3303 3304 3305 3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316 3317 3318 3319 3320 3321 3322 3323 3324 3325 3326 3327 3328 3329 3330 3331 3332 3333 3334 3335 3336 3337 3338 3339 3340 3341 3342
{
	MemoryContext oldcxt;

	/*
	 * In bootstrap mode, the shared catalogs aren't there yet anyway, so do
	 * nothing.
	 */
	if (IsBootstrapProcessingMode())
		return;

	/*
	 * switch to cache memory context
	 */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

	/*
	 * Try to load the shared relcache cache file.	If unsuccessful, bootstrap
	 * the cache with pre-made descriptors for the critical shared catalogs.
	 */
	if (!load_relcache_init_file(true))
	{
		formrdesc("pg_database", PG_DATABASE_RELTYPE_OID, true,
				  true, Natts_pg_database, Desc_pg_database);
		formrdesc("pg_authid", PG_AUTHID_RELTYPE_OID, true,
				  true, Natts_pg_authid, Desc_pg_authid);
		formrdesc("pg_auth_members", PG_AUTH_MEMBERS_RELTYPE_OID, true,
				  false, Natts_pg_auth_members, Desc_pg_auth_members);

#define NUM_CRITICAL_SHARED_RELS	3	/* fix if you change list above */
	}

	MemoryContextSwitchTo(oldcxt);
}

/*
 *		RelationCacheInitializePhase3
 *
 *		This is called as soon as the catcache and transaction system
 *		are functional and we have determined MyDatabaseId.  At this point
 *		we can actually read data from the database's system catalogs.
 *		We first try to read pre-computed relcache entries from the local
 *		relcache init file.  If that's missing or broken, make phony entries
 *		for the minimum set of nailed-in-cache relations.  Then (unless
 *		bootstrapping) make sure we have entries for the critical system
 *		indexes.  Once we've done all this, we have enough infrastructure to
 *		open any system catalog or use any catcache.  The last step is to
 *		rewrite the cache files if needed.
 */
void
RelationCacheInitializePhase3(void)
3343 3344 3345 3346
{
	HASH_SEQ_STATUS status;
	RelIdCacheEnt *idhentry;
	MemoryContext oldcxt;
3347
	bool		needNewCacheFile = !criticalSharedRelcachesBuilt;
3348

3349 3350 3351 3352 3353 3354 3355
	/*
	 * Relation cache initialization or any sort of heap access is
	 * dangerous before recovery is finished.
	 */
	if (!IsBootstrapProcessingMode() && RecoveryInProgress())
		elog(ERROR, "relation cache initialization during recovery or non-bootstrap processes.");

3356
	/*
3357 3358 3359 3360 3361
	 * switch to cache memory context
	 */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

	/*
3362 3363
	 * Try to load the local relcache cache file.  If unsuccessful, bootstrap
	 * the cache with pre-made descriptors for the critical "nailed-in" system
3364
	 * catalogs.
3365
	 */
3366
	if (IsBootstrapProcessingMode() ||
3367
		!load_relcache_init_file(false))
3368
	{
3369 3370
		needNewCacheFile = true;

3371
		formrdesc("pg_class", PG_CLASS_RELTYPE_OID, false,
3372
				  true, Natts_pg_class, Desc_pg_class);
3373
		formrdesc("pg_attribute", PG_ATTRIBUTE_RELTYPE_OID, false,
3374
				  false, Natts_pg_attribute, Desc_pg_attribute);
3375
		formrdesc("pg_proc", PG_PROC_RELTYPE_OID, false,
3376
				  true, Natts_pg_proc, Desc_pg_proc);
3377
		formrdesc("pg_type", PG_TYPE_RELTYPE_OID, false,
3378
				  true, Natts_pg_type, Desc_pg_type);
3379

3380
#define NUM_CRITICAL_LOCAL_RELS 4		/* fix if you change list above */
3381
	}
3382 3383

	MemoryContextSwitchTo(oldcxt);
3384

3385
	/* In bootstrap mode, the faked-up formrdesc info is all we'll have */
3386 3387 3388
	if (IsBootstrapProcessingMode())
		return;

3389
	/*
B
Bruce Momjian 已提交
3390
	 * If we didn't get the critical system indexes loaded into relcache, do
3391 3392
	 * so now.	These are critical because the catcache and/or opclass cache
	 * depend on them for fetches done during relcache load.  Thus, we have an
B
Bruce Momjian 已提交
3393 3394 3395 3396 3397 3398
	 * infinite-recursion problem.	We can break the recursion by doing
	 * heapscans instead of indexscans at certain key spots. To avoid hobbling
	 * performance, we only want to do that until we have the critical indexes
	 * loaded into relcache.  Thus, the flag criticalRelcachesBuilt is used to
	 * decide whether to do heapscan or indexscan at the key spots, and we set
	 * it true after we've loaded the critical indexes.
3399
	 *
B
Bruce Momjian 已提交
3400 3401 3402 3403 3404 3405
	 * The critical indexes are marked as "nailed in cache", partly to make it
	 * easy for load_relcache_init_file to count them, but mainly because we
	 * cannot flush and rebuild them once we've set criticalRelcachesBuilt to
	 * true.  (NOTE: perhaps it would be possible to reload them by
	 * temporarily setting criticalRelcachesBuilt to false again.  For now,
	 * though, we just nail 'em in.)
3406 3407 3408 3409
	 *
	 * RewriteRelRulenameIndexId and TriggerRelidNameIndexId are not critical
	 * in the same way as the others, because the critical catalogs don't
	 * (currently) have any rules or triggers, and so these indexes can be
B
Bruce Momjian 已提交
3410
	 * rebuilt without inducing recursion.	However they are used during
3411 3412
	 * relcache load when a rel does have rules or triggers, so we choose to
	 * nail them for performance reasons.
3413
	 */
B
Bruce Momjian 已提交
3414
	if (!criticalRelcachesBuilt)
3415
	{
3416 3417 3418 3419 3420 3421 3422 3423 3424 3425 3426 3427
		load_critical_index(ClassOidIndexId,
							RelationRelationId);
		load_critical_index(AttributeRelidNumIndexId,
							AttributeRelationId);
		load_critical_index(IndexRelidIndexId,
							IndexRelationId);
		load_critical_index(AccessMethodStrategyIndexId,
							AccessMethodOperatorRelationId);
		load_critical_index(OpclassOidIndexId,
							OperatorClassRelationId);
		load_critical_index(AccessMethodProcedureIndexId,
							AccessMethodProcedureRelationId);
3428 3429
		load_critical_index(OperatorOidIndexId,
							OperatorRelationId);
3430 3431 3432 3433 3434
		load_critical_index(RewriteRelRulenameIndexId,
							RewriteRelationId);
		load_critical_index(TriggerRelidNameIndexId,
							TriggerRelationId);

3435
#define NUM_CRITICAL_LOCAL_INDEXES	9	/* fix if you change list above */
3436 3437 3438 3439

		criticalRelcachesBuilt = true;
	}

3440 3441 3442 3443 3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 3461 3462 3463 3464 3465 3466 3467
	/*
	 * Process critical shared indexes too.
	 *
	 * DatabaseNameIndexId isn't critical for relcache loading, but rather for
	 * initial lookup of MyDatabaseId, without which we'll never find any
	 * non-shared catalogs at all.	Autovacuum calls InitPostgres with a
	 * database OID, so it instead depends on DatabaseOidIndexId.  We also
	 * need to nail up some indexes on pg_authid and pg_auth_members for use
	 * during client authentication.
	 */
	if (!criticalSharedRelcachesBuilt)
	{
		load_critical_index(DatabaseNameIndexId,
							DatabaseRelationId);
		load_critical_index(DatabaseOidIndexId,
							DatabaseRelationId);
		load_critical_index(AuthIdRolnameIndexId,
							AuthIdRelationId);
		load_critical_index(AuthIdOidIndexId,
							AuthIdRelationId);
		load_critical_index(AuthMemMemRoleIndexId,
							AuthMemRelationId);

#define NUM_CRITICAL_SHARED_INDEXES 5	/* fix if you change list above */

		criticalSharedRelcachesBuilt = true;
	}

3468
	/*
B
Bruce Momjian 已提交
3469 3470 3471 3472 3473 3474
	 * Now, scan all the relcache entries and update anything that might be
	 * wrong in the results from formrdesc or the relcache cache file. If we
	 * faked up relcache entries using formrdesc, then read the real pg_class
	 * rows and replace the fake entries with them. Also, if any of the
	 * relcache entries have rules or triggers, load that info the hard way
	 * since it isn't recorded in the cache file.
3475
	 *
3476 3477
	 * Whenever we access the catalogs to read data, there is a possibility
	 * of a shared-inval cache flush causing relcache entries to be removed.
3478 3479 3480 3481 3482
	 * Since hash_seq_search only guarantees to still work after the *current*
	 * entry is removed, it's unsafe to continue the hashtable scan afterward.
	 * We handle this by restarting the scan from scratch after each access.
	 * This is theoretically O(N^2), but the number of entries that actually
	 * need to be fixed is small enough that it doesn't matter.
3483
	 */
3484
	hash_seq_init(&status, RelationIdCache);
3485

3486
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3487
	{
3488
		Relation	relation = idhentry->reldesc;
3489 3490 3491 3492 3493 3494
		bool		restart = false;

		/*
		 * Make sure *this* entry doesn't get flushed while we work with it.
		 */
		RelationIncrementReferenceCount(relation);
3495

3496
		/*
3497
		 * If it's a faked-up entry, read the real pg_class tuple.
3498
		 */
3499
		if (relation->rd_rel->relowner == InvalidOid)
3500 3501 3502
		{
			HeapTuple	htup;
			Form_pg_class relp;
B
Bruce Momjian 已提交
3503

3504
			htup = SearchSysCache(RELOID,
3505
							   ObjectIdGetDatum(RelationGetRelid(relation)),
3506 3507
								  0, 0, 0);
			if (!HeapTupleIsValid(htup))
3508 3509
				elog(FATAL, "cache lookup failed for relation %u",
					 RelationGetRelid(relation));
3510
			relp = (Form_pg_class) GETSTRUCT(htup);
B
Bruce Momjian 已提交
3511

3512 3513 3514 3515 3516
			/*
			 * Copy tuple to relation->rd_rel. (See notes in
			 * AllocateRelationDesc())
			 */
			memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
3517

3518 3519 3520 3521 3522
			/* Update rd_options while we have the tuple */
			if (relation->rd_options)
				pfree(relation->rd_options);
			RelationParseRelOptions(relation, htup);

3523
			/*
3524
			 * Check the values in rd_att were set up correctly.  (We cannot
3525 3526 3527
			 * just copy them over now: formrdesc must have set up the
			 * rd_att data correctly to start with, because it may already
			 * have been copied into one or more catcache entries.)
3528
			 */
3529 3530 3531
			Assert(relation->rd_att->tdtypeid == relp->reltype);
			Assert(relation->rd_att->tdtypmod == -1);
			Assert(relation->rd_att->tdhasoid == relp->relhasoids);
3532

3533
			ReleaseSysCache(htup);
3534 3535 3536 3537 3538 3539 3540

			/* relowner had better be OK now, else we'll loop forever */
			if (relation->rd_rel->relowner == InvalidOid)
				elog(ERROR, "invalid relowner in pg_class entry for \"%s\"",
					 RelationGetRelationName(relation));

			restart = true;
3541 3542 3543 3544
		}

		/*
		 * Fix data that isn't saved in relcache cache file.
3545 3546 3547 3548 3549
		 *
		 * relhasrules or relhastriggers could possibly be wrong or out of
		 * date.  If we don't actually find any rules or triggers, clear the
		 * local copy of the flag so that we don't get into an infinite loop
		 * here.  We don't make any attempt to fix the pg_class entry, though.
3550 3551
		 */
		if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
3552
		{
3553
			RelationBuildRuleLock(relation);
3554 3555 3556 3557
			if (relation->rd_rules == NULL)
				relation->rd_rel->relhasrules = false;
			restart = true;
		}
3558
		if (relation->rd_rel->reltriggers > 0 && relation->trigdesc == NULL)
3559
		{
3560
			RelationBuildTriggers(relation);
3561 3562 3563 3564 3565 3566 3567 3568 3569 3570 3571 3572 3573 3574
			if (relation->trigdesc == NULL)
				relation->rd_rel->reltriggers = 0;
			restart = true;
		}

		/* Release hold on the relation */
		RelationDecrementReferenceCount(relation);

		/* Now, restart the hashtable scan if needed */
		if (restart)
		{
			hash_seq_term(&status);
			hash_seq_init(&status, RelationIdCache);
		}
3575
	}
3576

3577
	/*
3578 3579
	 * Lastly, write out new relcache cache files if needed.  We don't bother
	 * to distinguish cases where only one of the two needs an update.
3580
	 */
3581 3582 3583
	if (needNewCacheFile)
	{
		/*
B
Bruce Momjian 已提交
3584 3585 3586
		 * Force all the catcaches to finish initializing and thereby open the
		 * catalogs and indexes they use.  This will preload the relcache with
		 * entries for all the most important system catalogs and indexes, so
3587
		 * that the init files will be most useful for future backends.
3588 3589 3590
		 */
		InitCatalogCachePhase2();

3591 3592 3593 3594 3595 3596
		/* reset initFileRelationIds list; we'll fill it during write */
		initFileRelationIds = NIL;

		/* now write the files */
		write_relcache_init_file(true);
		write_relcache_init_file(false);
3597 3598 3599
	}
}

3600 3601 3602 3603 3604 3605 3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616 3617 3618 3619 3620 3621 3622 3623 3624 3625 3626 3627
/*
 * Load one critical system index into the relcache
 *
 * indexoid is the OID of the target index, heapoid is the OID of the catalog
 * it belongs to.
 */
static void
load_critical_index(Oid indexoid, Oid heapoid)
{
	Relation	ird;

	/*
	 * We must lock the underlying catalog before locking the index to avoid
	 * deadlock, since RelationBuildDesc might well need to read the catalog,
	 * and if anyone else is exclusive-locking this catalog and index they'll
	 * be doing it in that order.
	 */
	LockRelationOid(heapoid, AccessShareLock);
	LockRelationOid(indexoid, AccessShareLock);
	ird = RelationBuildDesc(indexoid, true);
	if (ird == NULL)
		elog(PANIC, "could not open critical system index %u", indexoid);
	ird->rd_isnailed = true;
	ird->rd_refcnt = 1;
	UnlockRelationOid(indexoid, AccessShareLock);
	UnlockRelationOid(heapoid, AccessShareLock);
}

3628
/*
3629
 * GetPgClassDescriptor -- get a predefined tuple descriptor for pg_class
3630 3631 3632
 * GetPgIndexDescriptor -- get a predefined tuple descriptor for pg_index
 *
 * We need this kluge because we have to be able to access non-fixed-width
3633 3634 3635 3636 3637 3638
 * fields of pg_class and pg_index before we have the standard catalog caches
 * available.  We use predefined data that's set up in just the same way as
 * the bootstrapped reldescs used by formrdesc().  The resulting tupdesc is
 * not 100% kosher: it does not have the correct rowtype OID in tdtypeid, nor
 * does it have a TupleConstr field.  But it's good enough for the purpose of
 * extracting fields.
3639 3640
 */
static TupleDesc
3641 3642
BuildHardcodedDescriptor(int natts, const FormData_pg_attribute *attrs,
						 bool hasoids)
3643
{
3644
	TupleDesc	result;
3645 3646 3647 3648 3649
	MemoryContext oldcxt;
	int			i;

	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

3650
	result = CreateTemplateTupleDesc(natts, hasoids);
B
Bruce Momjian 已提交
3651
	result->tdtypeid = RECORDOID;		/* not right, but we don't care */
3652
	result->tdtypmod = -1;
3653

3654
	for (i = 0; i < natts; i++)
3655
	{
3656
		memcpy(result->attrs[i], &attrs[i], ATTRIBUTE_FIXED_PART_SIZE);
3657
		/* make sure attcacheoff is valid */
3658
		result->attrs[i]->attcacheoff = -1;
3659 3660 3661
	}

	/* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
3662
	result->attrs[0]->attcacheoff = 0;
3663 3664 3665 3666 3667

	/* Note: we don't bother to set up a TupleConstr entry */

	MemoryContextSwitchTo(oldcxt);

3668 3669 3670 3671 3672 3673 3674 3675 3676 3677 3678 3679 3680 3681 3682 3683 3684 3685 3686 3687 3688 3689 3690 3691 3692 3693 3694 3695
	return result;
}

static TupleDesc
GetPgClassDescriptor(void)
{
	static TupleDesc pgclassdesc = NULL;

	/* Already done? */
	if (pgclassdesc == NULL)
		pgclassdesc = BuildHardcodedDescriptor(Natts_pg_class,
											   Desc_pg_class,
											   true);

	return pgclassdesc;
}

static TupleDesc
GetPgIndexDescriptor(void)
{
	static TupleDesc pgindexdesc = NULL;

	/* Already done? */
	if (pgindexdesc == NULL)
		pgindexdesc = BuildHardcodedDescriptor(Natts_pg_index,
											   Desc_pg_index,
											   false);

3696 3697 3698
	return pgindexdesc;
}

3699 3700 3701
/*
 * Load any default attribute value definitions for the relation.
 */
3702
static void
3703
AttrDefaultFetch(Relation relation)
3704
{
3705 3706 3707
	AttrDefault *attrdef = relation->rd_att->constr->defval;
	int			ndef = relation->rd_att->constr->num_defval;
	Relation	adrel;
3708 3709
	SysScanDesc adscan;
	ScanKeyData skey;
H
Hiroshi Inoue 已提交
3710
	HeapTuple	htup;
3711
	Datum		val;
3712 3713 3714
	bool		isnull;
	int			found;
	int			i;
3715

3716 3717 3718 3719
	ScanKeyInit(&skey,
				Anum_pg_attrdef_adrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
3720

3721 3722 3723
	adrel = heap_open(AttrDefaultRelationId, AccessShareLock);
	adscan = systable_beginscan(adrel, AttrDefaultIndexId, true,
								SnapshotNow, 1, &skey);
3724
	found = 0;
3725

3726
	while (HeapTupleIsValid(htup = systable_getnext(adscan)))
3727
	{
3728
		Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
3729

3730 3731 3732 3733
		for (i = 0; i < ndef; i++)
		{
			if (adform->adnum != attrdef[i].adnum)
				continue;
3734
			if (attrdef[i].adbin != NULL)
3735
				elog(WARNING, "multiple attrdef records found for attr %s of rel %s",
B
Bruce Momjian 已提交
3736
				NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
3737
					 RelationGetRelationName(relation));
3738 3739
			else
				found++;
3740

3741 3742 3743
			val = fastgetattr(htup,
							  Anum_pg_attrdef_adbin,
							  adrel->rd_att, &isnull);
3744
			if (isnull)
3745
				elog(WARNING, "null adbin for attr %s of rel %s",
B
Bruce Momjian 已提交
3746
				NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
3747
					 RelationGetRelationName(relation));
3748 3749
			else
				attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext,
3750
												   TextDatumGetCString(val));
3751 3752
			break;
		}
3753

3754
		if (i >= ndef)
3755 3756
			elog(WARNING, "unexpected attrdef record found for attr %d of rel %s",
				 adform->adnum, RelationGetRelationName(relation));
3757 3758
	}

3759
	systable_endscan(adscan);
3760
	heap_close(adrel, AccessShareLock);
3761 3762

	if (found != ndef)
3763
		elog(WARNING, "%d attrdef record(s) missing for rel %s",
3764
			 ndef - found, RelationGetRelationName(relation));
3765 3766
}

3767 3768 3769
/*
 * Load any check constraints for the relation.
 */
3770
static void
3771
CheckConstraintFetch(Relation relation)
3772
{
3773 3774
	ConstrCheck *check = relation->rd_att->constr->check;
	int			ncheck = relation->rd_att->constr->num_check;
3775
	Relation	conrel;
3776 3777
	SysScanDesc conscan;
	ScanKeyData skey[1];
H
Hiroshi Inoue 已提交
3778
	HeapTuple	htup;
3779
	Datum		val;
3780
	bool		isnull;
3781
	int			found = 0;
3782

3783 3784 3785 3786
	ScanKeyInit(&skey[0],
				Anum_pg_constraint_conrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
3787

3788 3789 3790
	conrel = heap_open(ConstraintRelationId, AccessShareLock);
	conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true,
								 SnapshotNow, 1, skey);
3791

3792
	while (HeapTupleIsValid(htup = systable_getnext(conscan)))
3793
	{
3794 3795 3796 3797 3798 3799
		Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);

		/* We want check constraints only */
		if (conform->contype != CONSTRAINT_CHECK)
			continue;

3800
		if (found >= ncheck)
3801 3802 3803
			elog(ERROR,
			     "pg_class reports %d constraint record(s) for rel %s, but found extra in pg_constraint",
			     ncheck, RelationGetRelationName(relation));
3804

3805
		check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
B
Bruce Momjian 已提交
3806
												  NameStr(conform->conname));
3807 3808

		/* Grab and test conbin is actually set */
3809
		val = fastgetattr(htup,
3810 3811
						  Anum_pg_constraint_conbin,
						  conrel->rd_att, &isnull);
3812
		if (isnull)
3813
			elog(ERROR, "null conbin for rel %s",
3814
				 RelationGetRelationName(relation));
3815

3816
		check[found].ccbin = MemoryContextStrdup(CacheMemoryContext,
3817
												 TextDatumGetCString(val));
3818 3819 3820
		found++;
	}

3821
	systable_endscan(conscan);
3822
	heap_close(conrel, AccessShareLock);
3823 3824

	if (found != ncheck)
3825 3826 3827
		elog(ERROR,
		     "found %d in pg_constraint, but pg_class reports %d constraint record(s) for rel %s",
		     found, ncheck, RelationGetRelationName(relation));
3828 3829
}

3830 3831 3832 3833 3834 3835 3836 3837 3838 3839 3840 3841 3842 3843 3844

/*
 * RelationGetPartitioningKey -- get GpPolicy struct for distributed relation
 *
 * Returns a copy of the relation's GpPolicy object, palloc'd in
 * the caller's context.  Caller should pfree() it.  If NULL is
 * returned, relation should be accessed locally.
 */
GpPolicy*
RelationGetPartitioningKey(Relation relation)
{
    return GpPolicyCopy(CurrentMemoryContext, relation->rd_cdbpolicy);
}                                       /* RelationGetPartitioningKey */


3845 3846 3847 3848 3849 3850
/*
 * RelationGetIndexList -- get a list of OIDs of indexes on this relation
 *
 * The index list is created only if someone requests it.  We scan pg_index
 * to find relevant indexes, and add the list to the relcache entry so that
 * we won't have to compute it again.  Note that shared cache inval of a
3851
 * relcache entry will delete the old list and set rd_indexvalid to 0,
3852 3853 3854
 * so that we must recompute the index list on next request.  This handles
 * creation or deletion of an index.
 *
3855 3856 3857 3858 3859 3860
 * The returned list is guaranteed to be sorted in order by OID.  This is
 * needed by the executor, since for index types that we obtain exclusive
 * locks on when updating the index, all backends must lock the indexes in
 * the same order or we will get deadlocks (see ExecOpenIndices()).  Any
 * consistent ordering would do, but ordering by OID is easy.
 *
3861 3862
 * Since shared cache inval causes the relcache's copy of the list to go away,
 * we return a copy of the list palloc'd in the caller's context.  The caller
B
Bruce Momjian 已提交
3863
 * may list_free() the returned list after scanning it. This is necessary
3864 3865
 * since the caller will typically be doing syscache lookups on the relevant
 * indexes, and syscache lookup could cause SI messages to be processed!
3866 3867 3868 3869 3870
 *
 * We also update rd_oidindex, which this module treats as effectively part
 * of the index list.  rd_oidindex is valid when rd_indexvalid isn't zero;
 * it is the pg_class OID of a unique index on OID when the relation has one,
 * and InvalidOid if there is no such index.
3871 3872 3873 3874 3875
 */
List *
RelationGetIndexList(Relation relation)
{
	Relation	indrel;
3876 3877
	SysScanDesc indscan;
	ScanKeyData skey;
3878
	HeapTuple	htup;
3879
	List	   *result;
3880
	Oid			oidIndex;
3881 3882 3883
	MemoryContext oldcxt;

	/* Quick exit if we already computed the list. */
3884
	if (relation->rd_indexvalid != 0)
3885
		return list_copy(relation->rd_indexlist);
3886 3887

	/*
B
Bruce Momjian 已提交
3888 3889 3890 3891
	 * We build the list we intend to return (in the caller's context) while
	 * doing the scan.	After successfully completing the scan, we copy that
	 * list into the relcache entry.  This avoids cache-context memory leakage
	 * if we get some sort of error partway through.
3892 3893
	 */
	result = NIL;
3894
	oidIndex = InvalidOid;
B
Bruce Momjian 已提交
3895

3896
	/* Prepare to scan pg_index for entries having indrelid = this rel. */
3897 3898 3899 3900
	ScanKeyInit(&skey,
				Anum_pg_index_indrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
3901

3902
	indrel = heap_open(IndexRelationId, AccessShareLock);
3903 3904
	indscan = systable_beginscan(indrel, IndexIndrelidIndexId, true,
								 SnapshotNow, 1, &skey);
3905

3906
	while (HeapTupleIsValid(htup = systable_getnext(indscan)))
3907 3908
	{
		Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
3909

3910
		/* Add index's OID to result list in the proper order */
3911
		result = insert_ordered_oid(result, index->indexrelid);
3912 3913

		/* Check to see if it is a unique, non-partial btree index on OID */
3914 3915
		if (IndexIsValid(index) &&
			index->indnatts == 1 &&
3916 3917 3918 3919 3920
			index->indisunique &&
			index->indkey.values[0] == ObjectIdAttributeNumber &&
			index->indclass.values[0] == OID_BTREE_OPS_OID &&
			heap_attisnull(htup, Anum_pg_index_indpred))
			oidIndex = index->indexrelid;
3921 3922
	}

3923
	systable_endscan(indscan);
3924 3925
	heap_close(indrel, AccessShareLock);

3926
	/* Now save a copy of the completed list in the relcache entry. */
3927
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3928
	relation->rd_indexlist = list_copy(result);
3929
	relation->rd_oidindex = oidIndex;
3930
	relation->rd_indexvalid = 1;
3931 3932 3933 3934 3935
	MemoryContextSwitchTo(oldcxt);

	return result;
}

3936 3937 3938 3939 3940 3941 3942 3943 3944 3945 3946 3947
/*
 * insert_ordered_oid
 *		Insert a new Oid into a sorted list of Oids, preserving ordering
 *
 * Building the ordered list this way is O(N^2), but with a pretty small
 * constant, so for the number of entries we expect it will probably be
 * faster than trying to apply qsort().  Most tables don't have very many
 * indexes...
 */
static List *
insert_ordered_oid(List *list, Oid datum)
{
B
Bruce Momjian 已提交
3948
	ListCell   *prev;
3949 3950

	/* Does the datum belong at the front? */
3951 3952
	if (list == NIL || datum < linitial_oid(list))
		return lcons_oid(datum, list);
3953
	/* No, so find the entry it belongs after */
3954
	prev = list_head(list);
3955 3956
	for (;;)
	{
B
Bruce Momjian 已提交
3957
		ListCell   *curr = lnext(prev);
3958

3959
		if (curr == NULL || datum < lfirst_oid(curr))
B
Bruce Momjian 已提交
3960
			break;				/* it belongs after 'prev', before 'curr' */
3961 3962

		prev = curr;
3963
	}
3964 3965
	/* Insert datum into list after 'prev' */
	lappend_cell_oid(list, prev, datum);
3966 3967 3968
	return list;
}

3969 3970 3971 3972
/*
 * RelationSetIndexList -- externally force the index list contents
 *
 * This is used to temporarily override what we think the set of valid
3973 3974
 * indexes is (including the presence or absence of an OID index).
 * The forcing will be valid only until transaction commit or abort.
3975 3976 3977 3978 3979 3980
 *
 * This should only be applied to nailed relations, because in a non-nailed
 * relation the hacked index list could be lost at any time due to SI
 * messages.  In practice it is only used on pg_class (see REINDEX).
 *
 * It is up to the caller to make sure the given list is correctly ordered.
3981 3982 3983 3984 3985 3986 3987
 *
 * We deliberately do not change rd_indexattr here: even when operating
 * with a temporary partial index list, HOT-update decisions must be made
 * correctly with respect to the full index set.  It is up to the caller
 * to ensure that a correct rd_indexattr set has been cached before first
 * calling RelationSetIndexList; else a subsequent inquiry might cause a
 * wrong rd_indexattr set to get computed and cached.
3988 3989
 */
void
3990
RelationSetIndexList(Relation relation, List *indexIds, Oid oidIndex)
3991 3992 3993
{
	MemoryContext oldcxt;

3994
	Assert(relation->rd_isnailed);
3995 3996
	/* Copy the list into the cache context (could fail for lack of mem) */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3997
	indexIds = list_copy(indexIds);
3998 3999
	MemoryContextSwitchTo(oldcxt);
	/* Okay to replace old list */
4000
	list_free(relation->rd_indexlist);
4001
	relation->rd_indexlist = indexIds;
4002
	relation->rd_oidindex = oidIndex;
B
Bruce Momjian 已提交
4003
	relation->rd_indexvalid = 2;	/* mark list as forced */
4004
	/* must flag that we have a forced index list */
4005
	need_eoxact_work = true;
4006 4007
}

4008 4009 4010 4011 4012 4013 4014 4015 4016 4017 4018
/*
 * RelationGetOidIndex -- get the pg_class OID of the relation's OID index
 *
 * Returns InvalidOid if there is no such index.
 */
Oid
RelationGetOidIndex(Relation relation)
{
	List	   *ilist;

	/*
B
Bruce Momjian 已提交
4019 4020 4021
	 * If relation doesn't have OIDs at all, caller is probably confused. (We
	 * could just silently return InvalidOid, but it seems better to throw an
	 * assertion.)
4022 4023 4024 4025 4026 4027 4028 4029 4030 4031 4032 4033 4034 4035
	 */
	Assert(relation->rd_rel->relhasoids);

	if (relation->rd_indexvalid == 0)
	{
		/* RelationGetIndexList does the heavy lifting. */
		ilist = RelationGetIndexList(relation);
		list_free(ilist);
		Assert(relation->rd_indexvalid != 0);
	}

	return relation->rd_oidindex;
}

4036 4037 4038 4039 4040 4041 4042 4043 4044 4045 4046 4047 4048 4049 4050 4051 4052 4053 4054 4055 4056 4057 4058 4059 4060 4061 4062 4063
/*
 * RelationGetIndexExpressions -- get the index expressions for an index
 *
 * We cache the result of transforming pg_index.indexprs into a node tree.
 * If the rel is not an index or has no expressional columns, we return NIL.
 * Otherwise, the returned tree is copied into the caller's memory context.
 * (We don't want to return a pointer to the relcache copy, since it could
 * disappear due to relcache invalidation.)
 */
List *
RelationGetIndexExpressions(Relation relation)
{
	List	   *result;
	Datum		exprsDatum;
	bool		isnull;
	char	   *exprsString;
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_indexprs)
		return (List *) copyObject(relation->rd_indexprs);

	/* Quick exit if there is nothing to do. */
	if (relation->rd_indextuple == NULL ||
		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs))
		return NIL;

	/*
B
Bruce Momjian 已提交
4064 4065 4066
	 * We build the tree we intend to return in the caller's context. After
	 * successfully completing the work, we copy it into the relcache entry.
	 * This avoids problems if we get some sort of error partway through.
4067 4068 4069 4070 4071
	 */
	exprsDatum = heap_getattr(relation->rd_indextuple,
							  Anum_pg_index_indexprs,
							  GetPgIndexDescriptor(),
							  &isnull);
4072
	Assert(!isnull);
4073
	exprsString = TextDatumGetCString(exprsDatum);
4074 4075 4076 4077
	result = (List *) stringToNode(exprsString);
	pfree(exprsString);

	/*
4078 4079 4080 4081
	 * Run the expressions through eval_const_expressions. This is not just an
	 * optimization, but is necessary, because the planner will be comparing
	 * them to similarly-processed qual clauses, and may fail to detect valid
	 * matches without this.  We don't bother with canonicalize_qual, however.
4082
	 */
4083
	result = (List *) eval_const_expressions(NULL, (Node *) result);
4084

4085 4086 4087 4088 4089 4090
	/*
	 * Also mark any coercion format fields as "don't care", so that the
	 * planner can match to both explicit and implicit coercions.
	 */
	set_coercionform_dontcare((Node *) result);

4091 4092 4093 4094
	/* May as well fix opfuncids too */
	fix_opfuncids((Node *) result);

	/* Now save a copy of the completed tree in the relcache entry. */
4095
	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
4096 4097 4098 4099 4100 4101 4102 4103 4104
	relation->rd_indexprs = (List *) copyObject(result);
	MemoryContextSwitchTo(oldcxt);

	return result;
}

/*
 * RelationGetIndexPredicate -- get the index predicate for an index
 *
4105 4106
 * We cache the result of transforming pg_index.indpred into an implicit-AND
 * node tree (suitable for ExecQual).
4107 4108 4109 4110 4111 4112 4113 4114 4115 4116 4117 4118 4119 4120 4121 4122 4123 4124 4125 4126 4127 4128 4129 4130
 * If the rel is not an index or has no predicate, we return NIL.
 * Otherwise, the returned tree is copied into the caller's memory context.
 * (We don't want to return a pointer to the relcache copy, since it could
 * disappear due to relcache invalidation.)
 */
List *
RelationGetIndexPredicate(Relation relation)
{
	List	   *result;
	Datum		predDatum;
	bool		isnull;
	char	   *predString;
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_indpred)
		return (List *) copyObject(relation->rd_indpred);

	/* Quick exit if there is nothing to do. */
	if (relation->rd_indextuple == NULL ||
		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indpred))
		return NIL;

	/*
B
Bruce Momjian 已提交
4131 4132 4133
	 * We build the tree we intend to return in the caller's context. After
	 * successfully completing the work, we copy it into the relcache entry.
	 * This avoids problems if we get some sort of error partway through.
4134 4135 4136 4137 4138
	 */
	predDatum = heap_getattr(relation->rd_indextuple,
							 Anum_pg_index_indpred,
							 GetPgIndexDescriptor(),
							 &isnull);
4139
	Assert(!isnull);
4140
	predString = TextDatumGetCString(predDatum);
4141 4142 4143 4144
	result = (List *) stringToNode(predString);
	pfree(predString);

	/*
4145 4146 4147 4148 4149
	 * Run the expression through const-simplification and canonicalization.
	 * This is not just an optimization, but is necessary, because the planner
	 * will be comparing it to similarly-processed qual clauses, and may fail
	 * to detect valid matches without this.  This must match the processing
	 * done to qual clauses in preprocess_expression()!  (We can skip the
B
Bruce Momjian 已提交
4150 4151
	 * stuff involving subqueries, however, since we don't allow any in index
	 * predicates.)
4152
	 */
4153
	result = (List *) eval_const_expressions(NULL, (Node *) result);
4154

4155 4156
	result = (List *) canonicalize_qual((Expr *) result);

4157 4158 4159 4160 4161 4162
	/*
	 * Also mark any coercion format fields as "don't care", so that the
	 * planner can match to both explicit and implicit coercions.
	 */
	set_coercionform_dontcare((Node *) result);

4163 4164 4165
	/* Also convert to implicit-AND format */
	result = make_ands_implicit((Expr *) result);

4166 4167 4168 4169
	/* May as well fix opfuncids too */
	fix_opfuncids((Node *) result);

	/* Now save a copy of the completed tree in the relcache entry. */
4170
	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
4171 4172 4173 4174 4175 4176
	relation->rd_indpred = (List *) copyObject(result);
	MemoryContextSwitchTo(oldcxt);

	return result;
}

4177 4178 4179 4180 4181 4182 4183 4184 4185 4186 4187
/*
 * RelationGetIndexAttrBitmap -- get a bitmap of index attribute numbers
 *
 * The result has a bit set for each attribute used anywhere in the index
 * definitions of all the indexes on this relation.  (This includes not only
 * simple index keys, but attributes used in expressions and partial-index
 * predicates.)
 *
 * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
 * we can include system attributes (e.g., OID) in the bitmap representation.
 *
4188 4189 4190 4191
 * Caller had better hold at least RowExclusiveLock on the target relation
 * to ensure that it has a stable set of indexes.  This also makes it safe
 * (deadlock-free) for us to take locks on the relation's indexes.
 *
4192 4193 4194 4195 4196 4197
 * The returned result is palloc'd in the caller's memory context and should
 * be bms_free'd when not needed anymore.
 */
Bitmapset *
RelationGetIndexAttrBitmap(Relation relation)
{
B
Bruce Momjian 已提交
4198 4199 4200
	Bitmapset  *indexattrs;
	List	   *indexoidlist;
	ListCell   *l;
4201 4202 4203 4204 4205 4206 4207 4208 4209 4210 4211 4212 4213 4214 4215 4216 4217 4218 4219 4220 4221
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_indexattr != NULL)
		return bms_copy(relation->rd_indexattr);

	/* Fast path if definitely no indexes */
	if (!RelationGetForm(relation)->relhasindex)
		return NULL;

	/*
	 * Get cached list of index OIDs
	 */
	indexoidlist = RelationGetIndexList(relation);

	/* Fall out if no indexes (but relhasindex was set) */
	if (indexoidlist == NIL)
		return NULL;

	/*
	 * For each index, add referenced attributes to indexattrs.
4222 4223 4224 4225 4226
	 *
	 * Note: we consider all indexes returned by RelationGetIndexList, even if
	 * they are not indisready or indisvalid.  This is important because an
	 * index for which CREATE INDEX CONCURRENTLY has just started must be
	 * included in HOT-safety decisions (see README.HOT).
4227 4228 4229 4230 4231 4232 4233
	 */
	indexattrs = NULL;
	foreach(l, indexoidlist)
	{
		Oid			indexOid = lfirst_oid(l);
		Relation	indexDesc;
		IndexInfo  *indexInfo;
B
Bruce Momjian 已提交
4234
		int			i;
4235 4236 4237 4238 4239 4240 4241 4242 4243

		indexDesc = index_open(indexOid, AccessShareLock);

		/* Extract index key information from the index's pg_index row */
		indexInfo = BuildIndexInfo(indexDesc);

		/* Collect simple attribute references */
		for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
		{
B
Bruce Momjian 已提交
4244
			int			attrnum = indexInfo->ii_KeyAttrNumbers[i];
4245 4246 4247

			if (attrnum != 0)
				indexattrs = bms_add_member(indexattrs,
B
Bruce Momjian 已提交
4248
							   attrnum - FirstLowInvalidHeapAttributeNumber);
4249 4250 4251 4252 4253 4254 4255 4256 4257 4258 4259 4260 4261 4262 4263 4264 4265 4266 4267 4268 4269 4270
		}

		/* Collect all attributes used in expressions, too */
		pull_varattnos((Node *) indexInfo->ii_Expressions, &indexattrs);

		/* Collect all attributes in the index predicate, too */
		pull_varattnos((Node *) indexInfo->ii_Predicate, &indexattrs);

		index_close(indexDesc, AccessShareLock);
	}

	list_free(indexoidlist);

	/* Now save a copy of the bitmap in the relcache entry. */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_indexattr = bms_copy(indexattrs);
	MemoryContextSwitchTo(oldcxt);

	/* We return our original working copy for caller to play with */
	return indexattrs;
}

4271

4272
/*
4273
 *	load_relcache_init_file, write_relcache_init_file
4274
 *
4275 4276 4277
 *		In late 1992, we started regularly having databases with more than
 *		a thousand classes in them.  With this number of classes, it became
 *		critical to do indexed lookups on the system catalogs.
4278
 *
4279 4280 4281 4282
 *		Bootstrapping these lookups is very hard.  We want to be able to
 *		use an index on pg_attribute, for example, but in order to do so,
 *		we must have read pg_attribute for the attributes in the index,
 *		which implies that we need to use the index.
4283
 *
4284
 *		In order to get around the problem, we do the following:
4285
 *
4286
 *		   +  When the database system is initialized (at initdb time), we
4287
 *			  don't use indexes.  We do sequential scans.
4288
 *
4289 4290 4291
 *		   +  When the backend is started up in normal mode, we load an image
 *			  of the appropriate relation descriptors, in internal format,
 *			  from an initialization file in the data/base/... directory.
4292
 *
4293
 *		   +  If the initialization file isn't there, then we create the
4294
 *			  relation descriptors using sequential scans and write 'em to
4295
 *			  the initialization file for use by subsequent backends.
4296
 *
4297
 *		We could dispense with the initialization files and just build the
4298
 *		critical reldescs the hard way on every backend startup, but that
4299 4300 4301 4302 4303 4304
 *		slows down backend startup noticeably.
 *
 *		We can in fact go further, and save more relcache entries than
 *		just the ones that are absolutely critical; this allows us to speed
 *		up backend startup by not having to build such entries the hard way.
 *		Presently, all the catalog and index entries that are referred to
4305
 *		by catcaches are stored in the initialization files.
4306
 *
T
Tom Lane 已提交
4307 4308
 *		The same mechanism that detects when catcache and relcache entries
 *		need to be invalidated (due to catalog updates) also arranges to
4309 4310
 *		unlink the initialization files when the contents may be out of date.
 *		The files will then be rebuilt during the next backend startup.
4311 4312
 */

4313 4314 4315 4316
/*
 * load_relcache_init_file -- attempt to load cache from the init file
 *
 * If successful, return TRUE and set criticalRelcachesBuilt to true.
4317
 * If not successful, return FALSE.
4318 4319 4320 4321
 *
 * NOTE: we assume we are already switched into CacheMemoryContext.
 */
static bool
4322
load_relcache_init_file(bool shared)
4323
{
4324 4325 4326 4327 4328 4329 4330
	FILE	   *fp;
	char		initfilename[MAXPGPATH];
	Relation   *rels;
	int			relno,
				num_rels,
				max_rels,
				nailed_rels,
4331 4332
				nailed_indexes,
				magic;
4333
	int			i;
4334

4335 4336 4337 4338 4339 4340
	if (shared)
		snprintf(initfilename, sizeof(initfilename), "global/%s",
				 RELCACHE_INIT_FILENAME);
	else
		snprintf(initfilename, sizeof(initfilename), "%s/%s",
				 DatabasePath, RELCACHE_INIT_FILENAME);
4341 4342 4343 4344

	fp = AllocateFile(initfilename, PG_BINARY_R);
	if (fp == NULL)
		return false;
4345

4346
	/*
B
Bruce Momjian 已提交
4347 4348 4349
	 * Read the index relcache entries from the file.  Note we will not enter
	 * any of them into the cache if the read fails partway through; this
	 * helps to guard against broken init files.
4350 4351 4352 4353 4354 4355 4356
	 */
	max_rels = 100;
	rels = (Relation *) palloc(max_rels * sizeof(Relation));
	num_rels = 0;
	nailed_rels = nailed_indexes = 0;
	initFileRelationIds = NIL;

4357 4358 4359 4360 4361 4362
	/* check for correct magic number (compatible version) */
	if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
		goto read_failed;
	if (magic != RELCACHE_INIT_FILEMAGIC)
		goto read_failed;

B
Bruce Momjian 已提交
4363
	for (relno = 0;; relno++)
4364
	{
4365 4366 4367 4368
		Size		len;
		size_t		nread;
		Relation	rel;
		Form_pg_class relform;
4369
		bool		has_not_null;
4370

4371
		/* first read the relation descriptor length */
4372 4373 4374 4375
		if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
		{
			if (nread == 0)
				break;			/* end of file */
4376
			goto read_failed;
4377
		}
4378

4379 4380
		/* safety check for incompatible relcache layout */
		if (len != sizeof(RelationData))
4381
			goto read_failed;
4382

4383 4384 4385 4386 4387 4388
		/* allocate another relcache header */
		if (num_rels >= max_rels)
		{
			max_rels *= 2;
			rels = (Relation *) repalloc(rels, max_rels * sizeof(Relation));
		}
4389

4390
		rel = rels[num_rels++] = (Relation) palloc(len);
4391

4392 4393
		/* then, read the Relation structure */
		if ((nread = fread(rel, 1, len, fp)) != len)
4394
			goto read_failed;
4395 4396

		/* next read the relation tuple form */
4397
		if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
4398
			goto read_failed;
4399 4400

		relform = (Form_pg_class) palloc(len);
4401
		if ((nread = fread(relform, 1, len, fp)) != len)
4402
			goto read_failed;
4403

4404
		rel->rd_rel = relform;
4405 4406

		/* initialize attribute tuple forms */
4407 4408
		rel->rd_att = CreateTemplateTupleDesc(relform->relnatts,
											  relform->relhasoids);
4409 4410
		rel->rd_att->tdrefcount = 1;	/* mark as refcounted */

4411
		rel->rd_att->tdtypeid = relform->reltype;
B
Bruce Momjian 已提交
4412
		rel->rd_att->tdtypmod = -1;		/* unnecessary, but... */
4413 4414

		/* next read all the attribute tuple form data entries */
4415
		has_not_null = false;
4416 4417
		for (i = 0; i < relform->relnatts; i++)
		{
4418
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
4419
				goto read_failed;
4420
			if (len != ATTRIBUTE_FIXED_PART_SIZE)
4421
				goto read_failed;
4422
			if ((nread = fread(rel->rd_att->attrs[i], 1, len, fp)) != len)
4423
				goto read_failed;
4424 4425 4426 4427

			has_not_null |= rel->rd_att->attrs[i]->attnotnull;
		}

B
Bruce Momjian 已提交
4428 4429 4430 4431 4432 4433 4434 4435
		/* next read the access method specific field */
		if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
			goto read_failed;
		if (len > 0)
		{
			rel->rd_options = palloc(len);
			if ((nread = fread(rel->rd_options, 1, len, fp)) != len)
				goto read_failed;
4436
			if (len != VARSIZE(rel->rd_options))
B
Bruce Momjian 已提交
4437
				goto read_failed;		/* sanity check */
B
Bruce Momjian 已提交
4438 4439 4440 4441 4442 4443
		}
		else
		{
			rel->rd_options = NULL;
		}

4444 4445 4446 4447 4448 4449 4450
		/* mark not-null status */
		if (has_not_null)
		{
			TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

			constr->has_not_null = true;
			rel->rd_att->constr = constr;
4451 4452
		}

4453 4454 4455 4456 4457
		/* If it's an index, there's more to do */
		if (rel->rd_rel->relkind == RELKIND_INDEX)
		{
			Form_pg_am	am;
			MemoryContext indexcxt;
4458 4459
			Oid		   *opfamily;
			Oid		   *opcintype;
4460 4461
			Oid		   *operator;
			RegProcedure *support;
4462
			int			nsupport;
4463
			int16	   *indoption;
4464 4465 4466 4467 4468

			/* Count nailed indexes to ensure we have 'em all */
			if (rel->rd_isnailed)
				nailed_indexes++;

4469
			/* next, read the pg_index tuple */
4470 4471
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;
4472

4473 4474
			rel->rd_indextuple = (HeapTuple) palloc(len);
			if ((nread = fread(rel->rd_indextuple, 1, len, fp)) != len)
4475
				goto read_failed;
4476

4477 4478 4479 4480
			/* Fix up internal pointers in the tuple -- see heap_copytuple */
			rel->rd_indextuple->t_data = (HeapTupleHeader) ((char *) rel->rd_indextuple + HEAPTUPLESIZE);
			rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);

4481 4482 4483
			/* next, read the access method tuple form */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;
4484

4485 4486 4487 4488
			am = (Form_pg_am) palloc(len);
			if ((nread = fread(am, 1, len, fp)) != len)
				goto read_failed;
			rel->rd_am = am;
4489

4490 4491 4492 4493 4494 4495
			/*
			 * prepare index info context --- parameters should match
			 * RelationInitIndexAccessInfo
			 */
			indexcxt = AllocSetContextCreate(CacheMemoryContext,
											 RelationGetRelationName(rel),
4496 4497 4498
											 ALLOCSET_SMALL_MINSIZE,
											 ALLOCSET_SMALL_INITSIZE,
											 ALLOCSET_SMALL_MAXSIZE);
4499 4500
			rel->rd_indexcxt = indexcxt;

4501 4502 4503 4504 4505 4506 4507 4508 4509 4510 4511 4512 4513 4514 4515 4516 4517 4518 4519 4520
			/* next, read the vector of opfamily OIDs */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			opfamily = (Oid *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(opfamily, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_opfamily = opfamily;

			/* next, read the vector of opcintype OIDs */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			opcintype = (Oid *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(opcintype, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_opcintype = opcintype;

4521 4522 4523 4524 4525 4526 4527 4528 4529
			/* next, read the vector of operator OIDs */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			operator = (Oid *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(operator, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_operator = operator;
4530

4531
			/* next, read the vector of support procedures */
4532 4533 4534 4535 4536 4537 4538 4539
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;
			support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(support, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_support = support;

4540 4541 4542 4543 4544 4545 4546 4547 4548 4549
			/* finally, read the vector of indoption values */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			indoption = (int16 *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(indoption, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_indoption = indoption;

4550 4551 4552
			/* set up zeroed fmgr-info vectors */
			rel->rd_aminfo = (RelationAmInfo *)
				MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));
4553 4554
			nsupport = relform->relnatts * am->amsupport;
			rel->rd_supportinfo = (FmgrInfo *)
4555
				MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
4556 4557 4558 4559 4560 4561 4562 4563
		}
		else
		{
			/* Count nailed rels to ensure we have 'em all */
			if (rel->rd_isnailed)
				nailed_rels++;

			Assert(rel->rd_index == NULL);
4564
			Assert(rel->rd_indextuple == NULL);
4565 4566
			Assert(rel->rd_am == NULL);
			Assert(rel->rd_indexcxt == NULL);
4567
			Assert(rel->rd_aminfo == NULL);
4568 4569
			Assert(rel->rd_opfamily == NULL);
			Assert(rel->rd_opcintype == NULL);
4570 4571 4572
			Assert(rel->rd_operator == NULL);
			Assert(rel->rd_support == NULL);
			Assert(rel->rd_supportinfo == NULL);
4573
			Assert(rel->rd_indoption == NULL);
4574 4575 4576 4577
		}

		/*
		 * Rules and triggers are not saved (mainly because the internal
B
Bruce Momjian 已提交
4578
		 * format is complex and subject to change).  They must be rebuilt if
4579
		 * needed by RelationCacheInitializePhase3.  This is not expected to
4580 4581
		 * be a big performance hit since few system catalogs have such. Ditto
		 * for index expressions and predicates.
4582 4583 4584 4585
		 */
		rel->rd_rules = NULL;
		rel->rd_rulescxt = NULL;
		rel->trigdesc = NULL;
4586 4587
		rel->rd_indexprs = NIL;
		rel->rd_indpred = NIL;
4588 4589 4590 4591

		/*
		 * Reset transient-state fields in the relcache entry
		 */
4592
		rel->rd_smgr = NULL;
4593 4594
		rel->rd_targblock = InvalidBlockNumber;
		if (rel->rd_isnailed)
4595
			rel->rd_refcnt = 1;
4596
		else
4597
			rel->rd_refcnt = 0;
4598
		rel->rd_indexvalid = 0;
4599
		rel->rd_indexlist = NIL;
4600
		rel->rd_indexattr = NULL;
4601
		rel->rd_oidindex = InvalidOid;
4602
		rel->rd_createSubid = InvalidSubTransactionId;
4603
		rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
4604
		rel->rd_amcache = NULL;
4605
		MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
4606 4607
        rel->rd_cdbpolicy = NULL;
        rel->rd_cdbDefaultStatsWarningIssued = false;
4608

4609
		/*
4610
		 * Recompute lock and physical addressing info.  This is needed in
B
Bruce Momjian 已提交
4611 4612
		 * case the pg_internal.init file was copied from some other database
		 * by CREATE DATABASE.
4613 4614
		 */
		RelationInitLockInfo(rel);
4615
		RelationInitPhysicalAddr(rel);
4616 4617 4618
	}

	/*
B
Bruce Momjian 已提交
4619 4620 4621
	 * We reached the end of the init file without apparent problem. Did we
	 * get the right number of nailed items?  (This is a useful crosscheck in
	 * case the set of critical rels or indexes changes.)
4622
	 */
4623 4624 4625 4626 4627 4628 4629 4630 4631 4632 4633 4634 4635
	if (shared)
	{
		if (nailed_rels != NUM_CRITICAL_SHARED_RELS ||
			nailed_indexes != NUM_CRITICAL_SHARED_INDEXES)
			goto read_failed;
	}
	else
	{
		if (nailed_rels != NUM_CRITICAL_LOCAL_RELS ||
			nailed_indexes != NUM_CRITICAL_LOCAL_INDEXES)
			goto read_failed;
	}

4636 4637 4638 4639 4640 4641 4642 4643 4644 4645

	/*
	 * OK, all appears well.
	 *
	 * Now insert all the new relcache entries into the cache.
	 */
	for (relno = 0; relno < num_rels; relno++)
	{
		RelationCacheInsert(rels[relno]);
		/* also make a list of their OIDs, for RelationIdIsInInitFile */
4646 4647 4648
		if (!shared)
			initFileRelationIds = lcons_oid(RelationGetRelid(rels[relno]),
											initFileRelationIds);
4649
	}
4650

4651 4652 4653
	pfree(rels);
	FreeFile(fp);

4654 4655 4656 4657
	if (shared)
		criticalSharedRelcachesBuilt = true;
	else
		criticalRelcachesBuilt = true;
4658
	return true;
4659

4660
	/*
B
Bruce Momjian 已提交
4661 4662 4663
	 * init file is broken, so do it the hard way.	We don't bother trying to
	 * free the clutter we just allocated; it's not in the relcache so it
	 * won't hurt.
4664
	 */
4665
read_failed:
4666 4667 4668 4669
	pfree(rels);
	FreeFile(fp);

	return false;
4670 4671
}

4672 4673 4674 4675
/*
 * Write out a new initialization file with the current contents
 * of the relcache.
 */
4676
static void
4677
write_relcache_init_file(bool shared)
4678
{
4679
	FILE	   *fp;
4680 4681
	char		tempfilename[MAXPGPATH];
	char		finalfilename[MAXPGPATH];
4682
	int			magic;
4683
	HASH_SEQ_STATUS status;
4684
	RelIdCacheEnt *idhentry;
4685 4686
	MemoryContext oldcxt;
	int			i;
4687 4688

	/*
4689
	 * We must write a temporary file and rename it into place. Otherwise,
B
Bruce Momjian 已提交
4690 4691
	 * another backend starting at about the same time might crash trying to
	 * read the partially-complete file.
4692
	 */
4693 4694 4695 4696 4697 4698 4699 4700 4701 4702 4703 4704 4705 4706
	if (shared)
	{
		snprintf(tempfilename, sizeof(tempfilename), "global/%s.%d",
				 RELCACHE_INIT_FILENAME, MyProcPid);
		snprintf(finalfilename, sizeof(finalfilename), "global/%s",
				 RELCACHE_INIT_FILENAME);
	}
	else
	{
		snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
				 DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
		snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
				 DatabasePath, RELCACHE_INIT_FILENAME);
	}
4707

4708 4709 4710 4711
	unlink(tempfilename);		/* in case it exists w/wrong permissions */

	fp = AllocateFile(tempfilename, PG_BINARY_W);
	if (fp == NULL)
4712 4713 4714 4715 4716
	{
		/*
		 * We used to consider this a fatal error, but we might as well
		 * continue with backend startup ...
		 */
4717 4718
		ereport(WARNING,
				(errcode_for_file_access(),
4719
				 errmsg("could not create relation-cache initialization file \"%s\": %m",
4720
						tempfilename),
B
Bruce Momjian 已提交
4721
			  errdetail("Continuing anyway, but there's something wrong.")));
4722 4723
		return;
	}
4724

4725
	/*
B
Bruce Momjian 已提交
4726
	 * Write a magic number to serve as a file version identifier.	We can
4727 4728 4729 4730 4731 4732
	 * change the magic number whenever the relcache layout changes.
	 */
	magic = RELCACHE_INIT_FILEMAGIC;
	if (fwrite(&magic, 1, sizeof(magic), fp) != sizeof(magic))
		elog(FATAL, "could not write init file");

4733
	/*
4734
	 * Write all the reldescs (in no particular order).
H
Hiroshi Inoue 已提交
4735
	 */
4736
	hash_seq_init(&status, RelationIdCache);
4737

4738
	initFileRelationIds = NIL;
4739

4740
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
4741
	{
4742
		Relation	rel = idhentry->reldesc;
4743
		Form_pg_class relform = rel->rd_rel;
4744

4745 4746 4747 4748
		/* ignore if not correct group */
		if (relform->relisshared != shared)
			continue;

B
Bruce Momjian 已提交
4749 4750
		/* first write the relcache entry proper */
		write_item(rel, sizeof(RelationData), fp);
4751 4752

		/* next write the relation tuple form */
B
Bruce Momjian 已提交
4753
		write_item(relform, CLASS_TUPLE_SIZE, fp);
4754 4755 4756 4757

		/* next, do all the attribute tuple form data entries */
		for (i = 0; i < relform->relnatts; i++)
		{
4758
			write_item(rel->rd_att->attrs[i], ATTRIBUTE_FIXED_PART_SIZE, fp);
4759 4760
		}

B
Bruce Momjian 已提交
4761 4762
		/* next, do the access method specific field */
		write_item(rel->rd_options,
4763
				   (rel->rd_options ? VARSIZE(rel->rd_options) : 0),
4764
				   fp);
B
Bruce Momjian 已提交
4765

4766 4767 4768 4769
		/* If it's an index, there's more to do */
		if (rel->rd_rel->relkind == RELKIND_INDEX)
		{
			Form_pg_am	am = rel->rd_am;
4770

4771 4772
			/* write the pg_index tuple */
			/* we assume this was created by heap_copytuple! */
B
Bruce Momjian 已提交
4773
			write_item(rel->rd_indextuple,
4774 4775
					   HEAPTUPLESIZE + rel->rd_indextuple->t_len,
					   fp);
4776 4777

			/* next, write the access method tuple form */
B
Bruce Momjian 已提交
4778
			write_item(am, sizeof(FormData_pg_am), fp);
4779

4780 4781 4782 4783 4784 4785 4786 4787 4788 4789
			/* next, write the vector of opfamily OIDs */
			write_item(rel->rd_opfamily,
					   relform->relnatts * sizeof(Oid),
					   fp);

			/* next, write the vector of opcintype OIDs */
			write_item(rel->rd_opcintype,
					   relform->relnatts * sizeof(Oid),
					   fp);

4790
			/* next, write the vector of operator OIDs */
4791 4792 4793
			write_item(rel->rd_operator,
					   relform->relnatts * (am->amstrategies * sizeof(Oid)),
					   fp);
4794

4795
			/* next, write the vector of support procedures */
4796
			write_item(rel->rd_support,
B
Bruce Momjian 已提交
4797
				  relform->relnatts * (am->amsupport * sizeof(RegProcedure)),
4798
					   fp);
4799 4800 4801 4802 4803

			/* finally, write the vector of indoption values */
			write_item(rel->rd_indoption,
					   relform->relnatts * sizeof(int16),
					   fp);
4804
		}
4805

4806
		/* also make a list of their OIDs, for RelationIdIsInInitFile */
4807 4808 4809 4810 4811 4812 4813
		if (!shared)
		{
			oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
			initFileRelationIds = lcons_oid(RelationGetRelid(rel),
											initFileRelationIds);
			MemoryContextSwitchTo(oldcxt);
		}
4814
	}
4815

4816 4817
	if (FreeFile(fp))
		elog(FATAL, "could not write init file");
4818

4819
	/*
4820
	 * Now we have to check whether the data we've so painstakingly
B
Bruce Momjian 已提交
4821 4822 4823 4824 4825
	 * accumulated is already obsolete due to someone else's just-committed
	 * catalog changes.  If so, we just delete the temp file and leave it to
	 * the next backend to try again.  (Our own relcache entries will be
	 * updated by SI message processing, but we can't be sure whether what we
	 * wrote out was up-to-date.)
4826
	 *
4827 4828
	 * This mustn't run concurrently with the code that unlinks an init file
	 * and sends SI messages, so grab a serialization lock for the duration.
4829
	 */
4830 4831 4832 4833 4834 4835
	LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);

	/* Make sure we have seen all incoming SI messages */
	AcceptInvalidationMessages();

	/*
B
Bruce Momjian 已提交
4836 4837
	 * If we have received any SI relcache invals since backend start, assume
	 * we may have written out-of-date data.
4838 4839
	 */
	if (relcacheInvalsReceived == 0L)
4840 4841
	{
		/*
4842 4843
		 * OK, rename the temp file to its final name, deleting any
		 * previously-existing init file.
4844
		 *
4845 4846 4847 4848
		 * Note: a failure here is possible under Cygwin, if some other
		 * backend is holding open an unlinked-but-not-yet-gone init file. So
		 * treat this as a noncritical failure; just remove the useless temp
		 * file on failure.
4849
		 */
4850 4851
		if (rename(tempfilename, finalfilename) < 0)
			unlink(tempfilename);
4852 4853 4854 4855
	}
	else
	{
		/* Delete the already-obsolete temp file */
4856 4857
		unlink(tempfilename);
	}
4858 4859

	LWLockRelease(RelCacheInitLock);
4860 4861
}

4862 4863 4864 4865 4866 4867 4868 4869 4870 4871
/* write a chunk of data preceded by its length */
static void
write_item(const void *data, Size len, FILE *fp)
{
	if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
		elog(FATAL, "could not write init file");
	if (fwrite(data, 1, len, fp) != len)
		elog(FATAL, "could not write init file");
}

4872 4873 4874 4875 4876 4877 4878 4879 4880 4881 4882 4883
/*
 * Detect whether a given relation (identified by OID) is one of the ones
 * we store in the init file.
 *
 * Note that we effectively assume that all backends running in a database
 * would choose to store the same set of relations in the init file;
 * otherwise there are cases where we'd fail to detect the need for an init
 * file invalidation.  This does not seem likely to be a problem in practice.
 */
bool
RelationIdIsInInitFile(Oid relationId)
{
4884
	return list_member_oid(initFileRelationIds, relationId);
4885 4886 4887 4888 4889
}

/*
 * Invalidate (remove) the init file during commit of a transaction that
 * changed one or more of the relation cache entries that are kept in the
4890
 * local init file.
4891
 *
4892 4893 4894 4895 4896 4897 4898 4899 4900 4901 4902 4903
 * To be safe against concurrent inspection or rewriting of the init file,
 * we must take RelCacheInitLock, then remove the old init file, then send
 * the SI messages that include relcache inval for such relations, and then
 * release RelCacheInitLock.  This serializes the whole affair against
 * write_relcache_init_file, so that we can be sure that any other process
 * that's concurrently trying to create a new init file won't move an
 * already-stale version into place after we unlink.  Also, because we unlink
 * before sending the SI messages, a backend that's currently starting cannot
 * read the now-obsolete init file and then miss the SI messages that will
 * force it to update its relcache entries.  (This works because the backend
 * startup sequence gets into the sinval array before trying to load the init
 * file.)
4904
 *
4905 4906 4907
 * We take the lock and do the unlink in RelationCacheInitFilePreInvalidate,
 * then release the lock in RelationCacheInitFilePostInvalidate.  Caller must
 * send any pending SI messages between those calls.
4908 4909 4910 4911 4912 4913 4914
 *
 * Notice this deals only with the local init file, not the shared init file.
 * The reason is that there can never be a "significant" change to the
 * relcache entry of a shared relation; the most that could happen is
 * updates of noncritical fields such as relpages/reltuples.  So, while
 * it's worth updating the shared init file from time to time, it can never
 * be invalid enough to make it necessary to remove it.
4915 4916
 */
void
4917
RelationCacheInitFilePreInvalidate(void)
4918 4919 4920 4921 4922 4923
{
	char		initfilename[MAXPGPATH];

	snprintf(initfilename, sizeof(initfilename), "%s/%s",
			 DatabasePath, RELCACHE_INIT_FILENAME);

4924 4925 4926
	LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);

	if (unlink(initfilename) < 0)
4927 4928
	{
		/*
4929 4930 4931 4932
		 * The file might not be there if no backend has been started since
		 * the last removal.  But complain about failures other than ENOENT.
		 * Fortunately, it's not too late to abort the transaction if we
		 * can't get rid of the would-be-obsolete init file.
4933
		 */
4934 4935 4936 4937 4938
		if (errno != ENOENT)
			ereport(ERROR,
					(errcode_for_file_access(),
					 errmsg("could not remove cache file \"%s\": %m",
							initfilename)));
4939
	}
4940
}
4941

4942 4943 4944 4945 4946 4947
void
RelationCacheInitFilePostInvalidate(void)
{
	LWLockRelease(RelCacheInitLock);
}

4948
/*
4949 4950 4951 4952 4953 4954 4955
 * Remove the init files during postmaster startup.
 *
 * We used to keep the init files across restarts, but that is unsafe even in simple
 * crash-recovery cases as there are windows for the init files to become out-of-sync
 * with the database. So now we just remove them during startup and expect the
 * first backend launch to rebuild them. Of course, this has to happen in each
 * database of the cluster.
4956 4957
 */
void
4958 4959 4960 4961 4962 4963 4964 4965 4966 4967 4968 4969 4970 4971 4972 4973 4974 4975 4976
RelationCacheInitFileRemove(void)
{
	char		path[MAXPGPATH];

	/*
	 * We zap the shared cache file too.  In theory it can't get out of sync
	 * enough to be a problem, but in data-corruption cases, who knows ...
	 */
	snprintf(path, sizeof(path), "global/%s",
			 RELCACHE_INIT_FILENAME);
	unlink_initfile(path);

	/* Scan everything in the default tablespace */
	RelationCacheInitFileRemoveInDir("base");
}

/* Process one per-tablespace directory for RelationCacheInitFileRemove */
static void
RelationCacheInitFileRemoveInDir(const char *tblspcpath)
4977
{
4978 4979
	DIR		   *dir;
	struct dirent *de;
4980 4981
	char		initfilename[MAXPGPATH];

4982 4983 4984 4985 4986 4987 4988 4989 4990 4991 4992 4993 4994 4995 4996 4997 4998 4999 5000 5001 5002 5003 5004 5005 5006 5007 5008 5009 5010 5011 5012 5013
	/* Scan the tablespace directory to find per-database directories */
	dir = AllocateDir(tblspcpath);
	if (dir == NULL)
	{
		elog(LOG, "could not open tablespace directory \"%s\": %m",
			 tblspcpath);
		return;
	}

	while ((de = ReadDir(dir, tblspcpath)) != NULL)
	{
		if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
		{
			/* Try to remove the init file in each database */
			snprintf(initfilename, sizeof(initfilename), "%s/%s/%s",
					 tblspcpath, de->d_name, RELCACHE_INIT_FILENAME);
			unlink_initfile(initfilename);
		}
	}

	FreeDir(dir);
}

static void
unlink_initfile(const char *initfilename)
{
	if (unlink(initfilename) < 0)
	{
		/* It might not be there, but log any error other than ENOENT */
		if (errno != ENOENT)
			elog(LOG, "could not remove cache file \"%s\": %m", initfilename);
	}
5014
}
5015 5016 5017 5018 5019 5020 5021 5022 5023 5024 5025 5026 5027 5028 5029 5030 5031

void
RelationGetPTInfo(Relation rel,
	ItemPointer persistentTid,
	int64 *persistentSerialNum)
{
	if (! GpPersistent_SkipXLogInfo(rel->rd_node.relNode) &&
		! rel->rd_segfile0_relationnodeinfo.isPresent)
	{
		elog(ERROR,
			 "required Persistent Table information missing for relation %u/%u/%u",
			 rel->rd_node.spcNode, rel->rd_node.dbNode, rel->rd_node.relNode);
	}

	*persistentTid = rel->rd_segfile0_relationnodeinfo.persistentTid;
	*persistentSerialNum = rel->rd_segfile0_relationnodeinfo.persistentSerialNum;
}