relcache.c 151.8 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * relcache.c
4
 *	  POSTGRES relation descriptor cache code
5
 *
6 7
 * Portions Copyright (c) 2005-2009, Greenplum inc.
 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
B
Add:  
Bruce Momjian 已提交
8
 * Portions Copyright (c) 1994, Regents of the University of California
9 10 11
 *
 *
 * IDENTIFICATION
12
 *	  $PostgreSQL: pgsql/src/backend/utils/cache/relcache.c,v 1.266.2.10 2010/09/02 03:17:06 tgl Exp $
13 14 15 16 17
 *
 *-------------------------------------------------------------------------
 */
/*
 * INTERFACE ROUTINES
18
 *		RelationCacheInitialize			- initialize relcache (to empty)
19 20
 *		RelationCacheInitializePhase2	- initialize shared-catalog entries
 *		RelationCacheInitializePhase3	- finish initializing relcache
21 22
 *		RelationIdGetRelation			- get a reldesc by relation id
 *		RelationClose					- close an open relation
23 24
 *
 * NOTES
25 26
 *		The following code contains many undocumented hacks.  Please be
 *		careful....
27
 */
28 29
#include "postgres.h"

30
#include <sys/file.h>
31
#include <fcntl.h>
32
#include <unistd.h>
33

34 35
#include "access/genam.h"
#include "access/heapam.h"
36
#include "access/reloptions.h"
37
#include "access/sysattr.h"
38
#include "access/xact.h"
39
#include "catalog/catalog.h"
40
#include "catalog/index.h"
B
Bruce Momjian 已提交
41
#include "catalog/indexing.h"
42
#include "catalog/namespace.h"
43 44
#include "catalog/pg_amop.h"
#include "catalog/pg_amproc.h"
B
Bruce Momjian 已提交
45
#include "catalog/pg_attrdef.h"
46
#include "catalog/pg_authid.h"
47
#include "catalog/pg_auth_members.h"
48
#include "catalog/pg_constraint.h"
49
#include "catalog/pg_database.h"
50
#include "catalog/pg_namespace.h"
51
#include "catalog/pg_opclass.h"
52
#include "catalog/pg_operator.h"
B
Bruce Momjian 已提交
53
#include "catalog/pg_proc.h"
54
#include "catalog/pg_rewrite.h"
55 56
#include "catalog/pg_tablespace.h"
#include "catalog/pg_trigger.h"
57
#include "catalog/pg_type.h"
58
#include "commands/trigger.h"
B
Bruce Momjian 已提交
59
#include "miscadmin.h"
60 61
#include "optimizer/clauses.h"
#include "optimizer/planmain.h"
62
#include "optimizer/prep.h"
63
#include "optimizer/var.h"
64
#include "rewrite/rewriteDefine.h"
65
#include "storage/fd.h"
B
Bruce Momjian 已提交
66
#include "storage/smgr.h"
67
#include "utils/builtins.h"
68
#include "utils/fmgroids.h"
69
#include "utils/inval.h"
70
#include "utils/memutils.h"
B
Bruce Momjian 已提交
71
#include "utils/relcache.h"
72
#include "utils/relationnode.h"
73
#include "utils/resowner.h"
74
#include "utils/syscache.h"
B
Bruce Momjian 已提交
75

76
#include "catalog/gp_policy.h"         /* GpPolicy */
77 78 79 80 81 82
#include "cdb/cdbtm.h"
#include "cdb/cdbvars.h"        /* Gp_role */
#include "cdb/cdbmirroredflatfile.h"
#include "cdb/cdbpersistentfilesysobj.h"
#include "cdb/cdbsreh.h"

83

84 85 86 87 88
/*
 * name of relcache init file, used to speed up backend startup
 */
#define RELCACHE_INIT_FILENAME	"pg_internal.init"

89
#define RELCACHE_INIT_FILEMAGIC		0x773264	/* version ID value */
90

91
/*
92
 *		hardcoded tuple descriptors.  see include/catalog/pg_attribute.h
93
 */
94 95 96 97 98 99 100 101 102
static const FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
static const FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
static const FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
static const FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
static const FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};

static const FormData_pg_attribute Desc_pg_database[Natts_pg_database] = {Schema_pg_database};
static const FormData_pg_attribute Desc_pg_authid[Natts_pg_authid] = {Schema_pg_authid};
static const FormData_pg_attribute Desc_pg_auth_members[Natts_pg_auth_members] = {Schema_pg_auth_members};
103

104
/*
105
 *		Hash tables that index the relation cache
106
 *
107 108
 *		We used to index the cache by both name and OID, but now there
 *		is only an index by OID.
109
 */
110 111 112 113 114 115
typedef struct relidcacheent
{
	Oid			reloid;
	Relation	reldesc;
} RelIdCacheEnt;

116
static HTAB *RelationIdCache;
117

118 119 120 121
/*
 * This flag is false until we have prepared the critical relcache entries
 * that are needed to do indexscans on the tables read by relcache building.
 */
B
Bruce Momjian 已提交
122
bool		criticalRelcachesBuilt = false;
123

124 125 126 127 128 129
/*
 * This flag is false until we have prepared the critical relcache entries
 * for shared catalogs (which are the tables needed for login).
 */
bool		criticalSharedRelcachesBuilt = false;

130 131
/*
 * This counter counts relcache inval events received since backend startup
B
Bruce Momjian 已提交
132
 * (but only for rels that are actually in cache).	Presently, we use it only
133 134 135 136
 * to detect whether data about to be written by write_relcache_init_file()
 * might already be obsolete.
 */
static long relcacheInvalsReceived = 0L;
137

138
/*
139 140 141 142
 * This list remembers the OIDs of the non-shared relations cached in the
 * database's local relcache init file.  Note that there is no corresponding
 * list for the shared relcache init file, for reasons explained in the
 * comments for RelationCacheInitFileRemove.
143 144
 */
static List *initFileRelationIds = NIL;
145

146
/*
147
 * This flag lets us optimize away work in AtEO(Sub)Xact_RelationCache().
148
 */
149
static bool need_eoxact_work = false;
150

151

152
/*
153
 *		macros to manipulate the lookup hashtables
154 155
 */
#define RelationCacheInsert(RELATION)	\
156
do { \
157
	RelIdCacheEnt *idhentry; bool found; \
158
	idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
159
										   (void *) &(RELATION->rd_id), \
160
										   HASH_ENTER, &found); \
161
	/* used to give notice if found -- now just keep quiet */ \
162 163 164
	idhentry->reldesc = RELATION; \
} while(0)

165
#define RelationIdCacheLookup(ID, RELATION) \
166
do { \
167 168
	RelIdCacheEnt *hentry; \
	hentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
169 170
										 (void *) &(ID), \
										 HASH_FIND, NULL); \
171
	if (hentry) \
172 173 174 175 176 177 178
		RELATION = hentry->reldesc; \
	else \
		RELATION = NULL; \
} while(0)

#define RelationCacheDelete(RELATION) \
do { \
179
	RelIdCacheEnt *idhentry; \
180
	idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
181
										   (void *) &(RELATION->rd_id), \
182
										   HASH_REMOVE, NULL); \
183
	if (idhentry == NULL) \
184
		elog(WARNING, "trying to delete a rd_id reldesc that does not exist"); \
185
} while(0)
186

187 188 189

/*
 * Special cache for opclass-related information
190
 *
191 192
 * Note: only default operators and support procs get cached, ie, those with
 * lefttype = righttype = opcintype.
193 194 195 196 197 198 199
 */
typedef struct opclasscacheent
{
	Oid			opclassoid;		/* lookup key: OID of opclass */
	bool		valid;			/* set TRUE after successful fill-in */
	StrategyNumber numStrats;	/* max # of strategies (from pg_am) */
	StrategyNumber numSupport;	/* max # of support procs (from pg_am) */
200 201
	Oid			opcfamily;		/* OID of opclass's family */
	Oid			opcintype;		/* OID of opclass's declared input type */
202
	Oid		   *operatorOids;	/* strategy operators' OIDs */
B
Bruce Momjian 已提交
203
	RegProcedure *supportProcs; /* support procs */
204 205 206 207 208
} OpClassCacheEnt;

static HTAB *OpClassCache = NULL;


209
/* non-export function prototypes */
210

211
static void RelationDestroyRelation(Relation relation);
212
static void RelationClearRelation(Relation relation, bool rebuild);
B
Bruce Momjian 已提交
213

214
static void RelationReloadIndexInfo(Relation relation);
215
static void RelationFlushRelation(Relation relation);
216 217
static bool load_relcache_init_file(bool shared);
static void write_relcache_init_file(bool shared);
B
Bruce Momjian 已提交
218
static void write_item(const void *data, Size len, FILE *fp);
219

220
static void formrdesc(const char *relationName, Oid relationReltype,
221 222
		  bool isshared, bool hasoids,
		  int natts, const FormData_pg_attribute *att);
223

224 225
static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK, Relation *pg_class_relation);
static Relation AllocateRelationDesc(Form_pg_class relp);
226
static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
227
static void RelationBuildTupleDesc(Relation relation);
228
static Relation RelationBuildDesc(Oid targetRelId, bool insertIt);
229
static void RelationInitPhysicalAddr(Relation relation);
230
static void RelationInitAppendOnlyInfo(Relation relation);
231
static void load_critical_index(Oid indexoid, Oid heapoid);
232
static TupleDesc GetPgClassDescriptor(void);
233
static TupleDesc GetPgIndexDescriptor(void);
234
static void AttrDefaultFetch(Relation relation);
235
static void CheckConstraintFetch(Relation relation);
236
static List *insert_ordered_oid(List *list, Oid datum);
237
static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
B
Bruce Momjian 已提交
238 239
				  StrategyNumber numStrats,
				  StrategyNumber numSupport);
240 241
static void RelationCacheInitFileRemoveInDir(const char *tblspcpath);
static void unlink_initfile(const char *initfilename);
242

243

244
/*
245
 *		ScanPgRelation
246
 *
247 248 249 250 251
 *		This is used by RelationBuildDesc to find a pg_class
 *		tuple matching targetRelId.  The caller must hold at least
 *		AccessShareLock on the target relid to prevent concurrent-update
 *		scenarios --- else our SnapshotNow scan might fail to find any
 *		version that it thinks is live.
252 253 254
 *
 *		NB: the returned tuple has been copied into palloc'd storage
 *		and must eventually be freed with heap_freetuple.
255
 */
256
static HeapTuple
257
ScanPgRelation(Oid targetRelId, bool indexOK, Relation *pg_class_relation)
258
{
259 260
	HeapTuple	pg_class_tuple;
	Relation	pg_class_desc;
261 262
	SysScanDesc pg_class_scan;
	ScanKeyData key[1];
263 264 265 266 267 268 269 270 271 272 273 274 275

	/*
	 * If something goes wrong during backend startup, we might find ourselves
	 * trying to read pg_class before we've selected a database.  That ain't
	 * gonna work, so bail out with a useful error message.  If this happens,
	 * it probably means a relcache entry that needs to be nailed isn't.
	 */
	if (!OidIsValid(MyDatabaseId))
		elog(FATAL, "cannot read pg_class without having selected a database");

	/*
	 * form a scan key
	 */
276 277 278 279
	ScanKeyInit(&key[0],
				ObjectIdAttributeNumber,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(targetRelId));
280 281 282 283 284 285 286 287

	/*
	 * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).  The caller can also force a heap
	 * scan by setting indexOK == false.
	 */
	pg_class_desc = heap_open(RelationRelationId, AccessShareLock);
288 289 290 291
	pg_class_scan = systable_beginscan(pg_class_desc, ClassOidIndexId,
									   indexOK && criticalRelcachesBuilt,
									   SnapshotNow,
									   1, key);
292

293
	pg_class_tuple = systable_getnext(pg_class_scan);
294 295

	/*
296
	 * Must copy tuple before releasing buffer.
297
	 */
298 299
	if (HeapTupleIsValid(pg_class_tuple))
		pg_class_tuple = heap_copytuple(pg_class_tuple);
300 301

	/* all done */
302
	systable_endscan(pg_class_scan);
303 304 305 306 307 308 309 310 311 312 313 314 315
	if (pg_class_relation == NULL)
		heap_close(pg_class_desc, AccessShareLock);
	else
		*pg_class_relation = pg_class_desc;

	return pg_class_tuple;
}

void
GpRelationNodeBeginScan(
	Snapshot	snapshot,
	Relation 	gp_relation_node,
	Oid		relationId,
316
	Oid 		tablespaceOid,
317 318 319 320 321 322 323 324 325 326 327
	Oid 		relfilenode,
	GpRelationNodeScan 	*gpRelationNodeScan)
{
	Assert (relfilenode != 0);

	MemSet(gpRelationNodeScan, 0, sizeof(GpRelationNodeScan));

	/*
	 * form a scan key
	 */
	ScanKeyInit(&gpRelationNodeScan->scankey[0],
328 329 330 331 332
				Anum_gp_relation_node_tablespace_oid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(tablespaceOid));

	ScanKeyInit(&gpRelationNodeScan->scankey[1],
333 334 335 336 337
				Anum_gp_relation_node_relfilenode_oid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(relfilenode));

	/*
338
	 * Open gp_relation_node and fetch a tuple.  Force heap scan if we haven't yet
339 340 341 342 343 344 345 346
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).  The caller can also force a heap
	 * scan by setting indexOK == false.
	 */
	gpRelationNodeScan->scan = \
		systable_beginscan(gp_relation_node, GpRelationNodeOidIndexId,
						   /* indexOK */ true,
						   snapshot,
347
						   /* nKeys */ 2,
348 349 350 351
						   gpRelationNodeScan->scankey);

	gpRelationNodeScan->gp_relation_node = gp_relation_node;
	gpRelationNodeScan->relationId = relationId;
352
	gpRelationNodeScan->tablespaceOid = tablespaceOid;
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393
	gpRelationNodeScan->relfilenode = relfilenode;
}

HeapTuple
GpRelationNodeGetNext(
	GpRelationNodeScan 	*gpRelationNodeScan,
	int32				*segmentFileNum,
	ItemPointer			persistentTid,
	int64				*persistentSerialNum)
{
	HeapTuple tuple;
	
	bool			nulls[Natts_gp_relation_node];
	Datum			values[Natts_gp_relation_node];
	
	Oid actualRelationNode;
	
	int64 createMirrorDataLossTrackingSessionNum;

	tuple = systable_getnext((SysScanDesc)gpRelationNodeScan->scan);

	/*
	 * if no such tuple exists, return NULL
	 */
	if (!HeapTupleIsValid(tuple))
	{
		MemSet(persistentTid, 0, sizeof(ItemPointerData));
		*persistentSerialNum = 0;
		return tuple;
	}
	
	heap_deform_tuple(tuple, RelationGetDescr(gpRelationNodeScan->gp_relation_node), values, nulls);
		
	GpRelationNode_GetValues(
						values,
						&actualRelationNode,
						segmentFileNum,
						&createMirrorDataLossTrackingSessionNum,
						persistentTid,
						persistentSerialNum);
	if (actualRelationNode != gpRelationNodeScan->relfilenode)
394
		elog(FATAL, "Index on gp_relation_node broken."
395 396 397
			   "Mismatch in node tuple for gp_relation_node for relation %u, tablespace %u, relfilenode %u, relation node %u",
			 gpRelationNodeScan->relationId,
			 gpRelationNodeScan->tablespaceOid,
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
			 gpRelationNodeScan->relfilenode,
			 actualRelationNode);

	return tuple;
}


void
GpRelationNodeEndScan(
	GpRelationNodeScan 	*gpRelationNodeScan)
{
	/* all done */
	systable_endscan((SysScanDesc)gpRelationNodeScan->scan);
}

413
static HeapTuple
414 415
ScanGpRelationNodeTuple(
	Relation 	gp_relation_node,
416
	Oid 		tablespaceOid,
417 418 419 420 421
	Oid 		relfilenode,
	int32		segmentFileNum)
{
	HeapTuple	tuple;
	SysScanDesc scan;
422
	ScanKeyData key[3];
423

424
	Assert (tablespaceOid != MyDatabaseTableSpace);
425
	Assert (relfilenode != 0);
426

427
	/*
B
Bruce Momjian 已提交
428
	 * form a scan key
429
	 */
430
	ScanKeyInit(&key[0],
431 432 433 434
				Anum_gp_relation_node_tablespace_oid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(tablespaceOid));
	ScanKeyInit(&key[1],
435
				Anum_gp_relation_node_relfilenode_oid,
436
				BTEqualStrategyNumber, F_OIDEQ,
437
				ObjectIdGetDatum(relfilenode));
438
	ScanKeyInit(&key[2],
439 440 441
				Anum_gp_relation_node_segment_file_num,
				BTEqualStrategyNumber, F_INT4EQ,
				Int32GetDatum(segmentFileNum));
442

443
	/*
444
	 * Open gp_relation_node and fetch a tuple.  Force heap scan if we haven't yet
B
Bruce Momjian 已提交
445 446 447
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).  The caller can also force a heap
	 * scan by setting indexOK == false.
448
	 */
449 450
	scan = systable_beginscan(gp_relation_node, GpRelationNodeOidIndexId,
									   /* indexOK */ true,
451
									   SnapshotNow,
452
									   3, key);
453

454
	tuple = systable_getnext(scan);
B
Bruce Momjian 已提交
455

H
Hiroshi Inoue 已提交
456
	/*
457
	 * Must copy tuple before releasing buffer.
H
Hiroshi Inoue 已提交
458
	 */
459 460
	if (HeapTupleIsValid(tuple))
		tuple = heap_copytuple(tuple);
461

462
	/* all done */
463
	systable_endscan(scan);
464

465 466 467 468 469 470
	return tuple;
}

HeapTuple
FetchGpRelationNodeTuple(
	Relation 		gp_relation_node,
471
	Oid 			tablespaceOid,
472 473 474 475 476 477 478 479 480 481 482 483 484 485 486
	Oid 			relfilenode,
	int32			segmentFileNum,
	ItemPointer		persistentTid,
	int64			*persistentSerialNum)
{
	HeapTuple tuple;
	
	bool			nulls[Natts_gp_relation_node];
	Datum			values[Natts_gp_relation_node];
	
	Oid actualRelationNode;
	int32 actualSegmentFileNum;

	int64 createMirrorDataLossTrackingSessionNum;

487 488 489 490 491
	/*
	 * gp_relation_node stores tablespaceOId in pg_class fashion, hence need
	 * to fetch the similar way.
	 */
	Assert (tablespaceOid != MyDatabaseTableSpace);
492 493 494 495
	Assert (relfilenode != 0);
	
	tuple = ScanGpRelationNodeTuple(
					gp_relation_node,
496
					tablespaceOid,
497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519
					relfilenode,
					segmentFileNum);
	
	/*
	 * if no such tuple exists, return NULL
	 */
	if (!HeapTupleIsValid(tuple))
	{
		MemSet(persistentTid, 0, sizeof(ItemPointerData));
		*persistentSerialNum = 0;
		return tuple;
	}
	
	heap_deform_tuple(tuple, RelationGetDescr(gp_relation_node), values, nulls);
		
	GpRelationNode_GetValues(
						values,
						&actualRelationNode,
						&actualSegmentFileNum,
						&createMirrorDataLossTrackingSessionNum,
						persistentTid,
						persistentSerialNum);
	
520 521 522 523 524 525 526 527
	if (actualRelationNode != relfilenode)
	{
		elog(ERROR, "Index on gp_relation_node broken."
			   "Mismatch in node tuple for gp_relation_node intended relfilenode %u, fetched relfilenode %u",
			 relfilenode,
			 actualRelationNode);
	}

528 529 530 531 532 533 534 535 536 537 538 539 540 541
	return tuple;
}

/*
 * Deletes the gp relation node entry for the
 * given segment file.
 */ 
void
DeleteGpRelationNodeTuple(
	Relation 	relation,
	int32		segmentFileNum)
{
	Relation	gp_relation_node;
	HeapTuple	tuple;
542 543
	ItemPointerData     persistentTid;
	int64               persistentSerialNum;
544 545 546

	gp_relation_node = heap_open(GpRelationNodeRelationId, RowExclusiveLock);

547
	tuple = FetchGpRelationNodeTuple(gp_relation_node,
548 549 550 551 552
									 relation->rd_rel->reltablespace,
									 relation->rd_rel->relfilenode,
									 segmentFileNum,
									 &persistentTid,
									 &persistentSerialNum);
553

554
	if (!HeapTupleIsValid(tuple))
555
		elog(ERROR, "could not find node tuple for relation %u, tablespace %u, relation file node %u, segment file #%d",
556
			 RelationGetRelid(relation),
557
			 relation->rd_rel->reltablespace,
558 559 560
			 relation->rd_rel->relfilenode,
			 segmentFileNum);

561
	/* delete the relation tuple from gp_relation_node, and finish up */
562 563 564 565 566 567 568 569
	simple_heap_delete(gp_relation_node, &tuple->t_self);
	heap_freetuple(tuple);

	heap_close(gp_relation_node, RowExclusiveLock);
}

bool
ReadGpRelationNode(
570
	Oid 			tablespaceOid,
571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586
	Oid 			relfilenode,
	int32			segmentFileNum,
	ItemPointer		persistentTid,
	int64			*persistentSerialNum)
{
	Relation gp_relation_node;
	HeapTuple tuple;
	bool found;

	MemSet(persistentTid, 0, sizeof(ItemPointerData));
	*persistentSerialNum = 0;

	gp_relation_node = heap_open(GpRelationNodeRelationId, AccessShareLock);

	tuple = FetchGpRelationNodeTuple(
						gp_relation_node,
587
						tablespaceOid,
588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612
						relfilenode,
						segmentFileNum,
						persistentTid,
						persistentSerialNum);

	/*
	 * if no such tuple exists, return NULL
	 */
	if (!HeapTupleIsValid(tuple))
	{
		found = false;
	}
	else
	{
		if (Debug_persistent_print)
		{
			TupleVisibilitySummary tupleVisibilitySummary;
			char *tupleVisibilitySummaryString;
			
			GetTupleVisibilitySummary(
									tuple,
									&tupleVisibilitySummary);
			tupleVisibilitySummaryString = GetTupleVisibilitySummaryString(&tupleVisibilitySummary);
			
			elog(Persistent_DebugPrintLevel(), 
613 614
				 "ReadGpRelationNode: For tablespace %u relfilenode %u, segment file #%d found persistent serial number " INT64_FORMAT ", TID %s (gp_relation_node tuple visibility: %s)",
				 tablespaceOid,
615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649
				 relfilenode,
				 segmentFileNum,
				 *persistentSerialNum,
				 ItemPointerToString(persistentTid),
				 tupleVisibilitySummaryString);
			pfree(tupleVisibilitySummaryString);
		}

		found = true;
		heap_freetuple(tuple);
	}

	heap_close(gp_relation_node, AccessShareLock);

	return found;
}

void
RelationFetchSegFile0GpRelationNode(
	Relation relation)
{
	if (!relation->rd_segfile0_relationnodeinfo.isPresent)
	{
		if (Persistent_BeforePersistenceWork() || InRecovery)
		{
			MemSet(&relation->rd_segfile0_relationnodeinfo.persistentTid, 0, sizeof(ItemPointerData));
			relation->rd_segfile0_relationnodeinfo.persistentSerialNum = 0;
		
			relation->rd_segfile0_relationnodeinfo.isPresent = true;
			relation->rd_segfile0_relationnodeinfo.tidAllowedToBeZero = true;
			
			return; // The initdb process will load the persistent table once we out of bootstrap mode.
		}

		if (!ReadGpRelationNode(
650 651 652 653 654
				relation->rd_rel->reltablespace,
				relation->rd_rel->relfilenode,
				/* segmentFileNum */ 0,
				&relation->rd_segfile0_relationnodeinfo.persistentTid,
				&relation->rd_segfile0_relationnodeinfo.persistentSerialNum))
655
		{
656
			elog(ERROR, "Did not find gp_relation_node entry for relation name %s, relation id %u, tablespaceOid %u, relfilenode %u",
657 658
				 relation->rd_rel->relname.data,
				 relation->rd_id,
659 660
				 relation->rd_rel->reltablespace,
				 relation->rd_rel->relfilenode);
661 662 663
		}

		Assert(!Persistent_BeforePersistenceWork());
664
		if (PersistentStore_IsZeroTid(&relation->rd_segfile0_relationnodeinfo.persistentTid))
665 666 667 668 669 670 671 672 673 674 675 676 677
		{	
			elog(ERROR, 
				 "RelationFetchSegFile0GpRelationNode has invalid TID (0,0) into relation %u/%u/%u '%s', serial number " INT64_FORMAT,
				 relation->rd_node.spcNode,
				 relation->rd_node.dbNode,
				 relation->rd_node.relNode,
				 NameStr(relation->rd_rel->relname),
				 relation->rd_segfile0_relationnodeinfo.persistentSerialNum);
		}

		relation->rd_segfile0_relationnodeinfo.isPresent = true;
		
	}
678 679 680 681 682 683 684 685 686 687 688 689 690
	else if (gp_validate_pt_info_relcache &&
		     !(relation->rd_index &&
			   relation->rd_index->indrelid == GpRelationNodeRelationId))
	{
		/*
		 * bypass the check for gp_relation_node_index because
		 * ReadGpRelationNode() uses the same index to probe relfile node.
		 */

		ItemPointerData persistentTid;
		int64			persistentSerialNum;

		if (!ReadGpRelationNode(
691 692 693 694 695
				relation->rd_rel->reltablespace,
				relation->rd_rel->relfilenode,
				/* segmentFileNum */ 0,
				&persistentTid,
				&persistentSerialNum))
696 697 698
		{
			elog(ERROR,
				 "did not find gp_relation_node entry for relation name %s, "
699 700 701 702 703
				 "relation id %u, tablespace %u, relfilenode %u",
				 relation->rd_rel->relname.data,
				 relation->rd_id,
				 relation->rd_rel->reltablespace,
				 relation->rd_rel->relfilenode);
704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725
		}

		if (ItemPointerCompare(&persistentTid,
							   &relation->rd_segfile0_relationnodeinfo.persistentTid) ||
			(persistentSerialNum != relation->rd_segfile0_relationnodeinfo.persistentSerialNum))
		{
			ereport(ERROR,
					(errmsg("invalid persistent TID and/or serial number in "
							"relcache entry"),
					 errdetail("relation name %s, relation id %u, relfilenode %u "
							   "contains invalid persistent TID %s and/or serial "
							   "number " INT64_FORMAT ".  Expected TID is %s and "
							   "serial number " INT64_FORMAT,
							   relation->rd_rel->relname.data, relation->rd_id,
							   relation->rd_node.relNode,
							   ItemPointerToString(
								   &relation->rd_segfile0_relationnodeinfo.persistentTid),
							   relation->rd_segfile0_relationnodeinfo.persistentSerialNum,
							   ItemPointerToString2(&persistentTid),
							   persistentSerialNum)));
		}
	}
726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777

}

// UNDONE: Temporary
void
RelationFetchGpRelationNodeForXLog_Index(
	Relation relation)
{
	static int countInThisBackend = 0;
	static int deep = 0;
	
	deep++;

	countInThisBackend++;

	if (deep >= 2)
	{
		int saveDeep;

		if (Debug_gp_relation_node_fetch_wait_for_debugging)
		{
			/* Code for investigating MPP-16395, will be removed as part of the fix */
			elog(WARNING, "RelationFetchGpRelationNodeForXLog_Index [%d] for non-heap %u/%u/%u (deep %d) -- waiting for debug attach...",
				 countInThisBackend,
				 relation->rd_node.spcNode,
				 relation->rd_node.dbNode,
				 relation->rd_node.relNode,
				 deep);

			for (int i=0; i < 24 * 60; i++)
			{
				pg_usleep(60000000L); /* 60 sec */
			}
		}

		/*
		 * Reset counter in case the user continues to use the session.
		 */
		saveDeep = deep;
		deep = 0;

		elog(ERROR, "RelationFetchGpRelationNodeForXLog_Index [%d] for non-heap %u/%u/%u (deep %d)",
			 countInThisBackend,
			 relation->rd_node.spcNode,
			 relation->rd_node.dbNode,
			 relation->rd_node.relNode,
			 saveDeep);
	}

	RelationFetchSegFile0GpRelationNode(relation);

	deep--;
778 779
}

780
/*
781
 *		AllocateRelationDesc
782
 *
783
 *		This is used to allocate memory for a new relation descriptor
784
 *		and initialize the rd_rel field from the given pg_class tuple.
785
 */
786
static Relation
787
AllocateRelationDesc(Form_pg_class relp)
788
{
789
	Relation	relation;
790
	MemoryContext oldcxt;
791
	Form_pg_class relationForm;
792

793 794
	/* Relcache entries must live in CacheMemoryContext */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
795

796
	/*
797
	 * allocate and zero space for new relation descriptor
798
	 */
799
	relation = (Relation) palloc0(sizeof(RelationData));
800

801
	/*
802
	 * clear fields of reldesc that should initialize to something non-zero
803
	 */
804
	relation->rd_targblock = InvalidBlockNumber;
805

806
	/* make sure relation is marked as having no open file yet */
807
	relation->rd_smgr = NULL;
808

809
	/*
B
Bruce Momjian 已提交
810
	 * Copy the relation tuple form
811
	 *
B
Bruce Momjian 已提交
812 813
	 * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE. The
	 * variable-length fields (relacl, reloptions) are NOT stored in the
814 815
	 * relcache --- there'd be little point in it, since we don't copy the
	 * tuple's nulls bitmap and hence wouldn't know if the values are valid.
B
Bruce Momjian 已提交
816 817 818 819
	 * Bottom line is that relacl *cannot* be retrieved from the relcache. Get
	 * it from the syscache if you need it.  The same goes for the original
	 * form of reloptions (however, we do store the parsed form of reloptions
	 * in rd_options).
820 821
	 */
	relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
822

823
	memcpy(relationForm, relp, CLASS_TUPLE_SIZE);
824 825

	/* initialize relation tuple form */
826
	relation->rd_rel = relationForm;
827

828 829 830 831 832 833 834
	/*
	 * This part MUST be remain as a fetch on demand, otherwise you end up
	 * needing it to open pg_class and then relation_open does infinite recursion...
	 */
	relation->rd_segfile0_relationnodeinfo.isPresent = false;
	relation->rd_segfile0_relationnodeinfo.tidAllowedToBeZero = false;

835
	/* and allocate attribute tuple form storage */
836 837
	relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts,
											   relationForm->relhasoids);
838 839
	/* which we mark as a reference-counted tupdesc */
	relation->rd_att->tdrefcount = 1;
840 841 842

	MemoryContextSwitchTo(oldcxt);

843
	return relation;
844 845
}

B
Bruce Momjian 已提交
846
/*
847 848 849 850 851 852
 * RelationParseRelOptions
 *		Convert pg_class.reloptions into pre-parsed rd_options
 *
 * tuple is the real pg_class tuple (not rd_rel!) for relation
 *
 * Note: rd_rel and (if an index) rd_am must be valid already
B
Bruce Momjian 已提交
853 854
 */
static void
855
RelationParseRelOptions(Relation relation, HeapTuple tuple)
B
Bruce Momjian 已提交
856
{
857 858 859
	Datum		datum;
	bool		isnull;
	bytea	   *options;
B
Bruce Momjian 已提交
860

861
	relation->rd_options = NULL;
B
Bruce Momjian 已提交
862

863
	/* Fall out if relkind should not have options */
B
Bruce Momjian 已提交
864 865
	switch (relation->rd_rel->relkind)
	{
866 867
		case RELKIND_RELATION:
		case RELKIND_TOASTVALUE:
868 869 870
		case RELKIND_AOSEGMENTS:
		case RELKIND_AOBLOCKDIR:
		case RELKIND_AOVISIMAP:
871 872 873 874
		case RELKIND_INDEX:
			break;
		default:
			return;
B
Bruce Momjian 已提交
875 876
	}

877
	/*
B
Bruce Momjian 已提交
878 879 880
	 * Fetch reloptions from tuple; have to use a hardwired descriptor because
	 * we might not have any other for pg_class yet (consider executing this
	 * code for pg_class itself)
881 882 883 884 885 886 887
	 */
	datum = fastgetattr(tuple,
						Anum_pg_class_reloptions,
						GetPgClassDescriptor(),
						&isnull);
	if (isnull)
		return;
B
Bruce Momjian 已提交
888

889
	/* Parse into appropriate format; don't error out here */
B
Bruce Momjian 已提交
890 891
	switch (relation->rd_rel->relkind)
	{
892 893
		case RELKIND_RELATION:
		case RELKIND_TOASTVALUE:
894 895 896
		case RELKIND_AOSEGMENTS:
		case RELKIND_AOBLOCKDIR:
		case RELKIND_AOVISIMAP:
897 898 899 900 901 902 903 904 905 906 907 908 909 910
		case RELKIND_UNCATALOGED:
			options = heap_reloptions(relation->rd_rel->relkind, datum,
									  false);
			break;
		case RELKIND_INDEX:
			options = index_reloptions(relation->rd_am->amoptions, datum,
									   false);
			break;
		default:
			Assert(false);		/* can't get here */
			options = NULL;		/* keep compiler quiet */
			break;
	}

911 912 913 914 915 916
	/*
	 * Copy parsed data into CacheMemoryContext.  To guard against the
	 * possibility of leaks in the reloptions code, we want to do the actual
	 * parsing in the caller's memory context and copy the results into
	 * CacheMemoryContext after the fact.
	 */
917 918 919 920 921
	if (options)
	{
		relation->rd_options = MemoryContextAlloc(CacheMemoryContext,
												  VARSIZE(options));
		memcpy(relation->rd_options, options, VARSIZE(options));
922
		pfree(options);
B
Bruce Momjian 已提交
923 924 925
	}
}

926
/*
927
 *		RelationBuildTupleDesc
928
 *
929
 *		Form the relation's tuple descriptor from information in
930
 *		the pg_attribute, pg_attrdef & pg_constraint system catalogs.
931 932
 */
static void
933
RelationBuildTupleDesc(Relation relation)
934
{
935 936
	HeapTuple	pg_attribute_tuple;
	Relation	pg_attribute_desc;
937 938
	SysScanDesc pg_attribute_scan;
	ScanKeyData skey[2];
939
	int			need;
940
	TupleConstr *constr;
H
Hiroshi Inoue 已提交
941
	AttrDefault *attrdef = NULL;
942
	int			ndef = 0;
943

944 945 946 947
	/* copy some fields from pg_class row to rd_att */
	relation->rd_att->tdtypeid = relation->rd_rel->reltype;
	relation->rd_att->tdtypmod = -1;	/* unnecessary, but... */
	relation->rd_att->tdhasoid = relation->rd_rel->relhasoids;
948

949 950
	constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext,
												sizeof(TupleConstr));
H
Hiroshi Inoue 已提交
951
	constr->has_not_null = false;
952

953
	/*
954
	 * Form a scan key that selects only user attributes (attnum > 0).
B
Bruce Momjian 已提交
955 956
	 * (Eliminating system attribute rows at the index level is lots faster
	 * than fetching them.)
957
	 */
958 959 960 961 962 963 964 965
	ScanKeyInit(&skey[0],
				Anum_pg_attribute_attrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
	ScanKeyInit(&skey[1],
				Anum_pg_attribute_attnum,
				BTGreaterStrategyNumber, F_INT2GT,
				Int16GetDatum(0));
966

967
	/*
B
Bruce Momjian 已提交
968 969 970
	 * Open pg_attribute and begin a scan.	Force heap scan if we haven't yet
	 * built the critical relcache entries (this includes initdb and startup
	 * without a pg_internal.init file).
971
	 */
972
	pg_attribute_desc = heap_open(AttributeRelationId, AccessShareLock);
973 974 975 976 977
	pg_attribute_scan = systable_beginscan(pg_attribute_desc,
										   AttributeRelidNumIndexId,
										   criticalRelcachesBuilt,
										   SnapshotNow,
										   2, skey);
978

979
	/*
B
Bruce Momjian 已提交
980
	 * add attribute data to relation->rd_att
981
	 */
982
	need = relation->rd_rel->relnatts;
983

984
	while (HeapTupleIsValid(pg_attribute_tuple = systable_getnext(pg_attribute_scan)))
985
	{
986 987
		Form_pg_attribute attp;

988
		attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
989

990 991
		if (attp->attnum <= 0 ||
			attp->attnum > relation->rd_rel->relnatts)
992
			elog(ERROR, "invalid attribute number %d for %s",
993 994
				 attp->attnum, RelationGetRelationName(relation));

995 996
		memcpy(relation->rd_att->attrs[attp->attnum - 1],
			   attp,
997
			   ATTRIBUTE_FIXED_PART_SIZE);
998

999 1000
		/* Update constraint/default info */
		if (attp->attnotnull)
1001
			constr->has_not_null = true;
H
Hiroshi Inoue 已提交
1002

1003 1004 1005 1006
		if (attp->atthasdef)
		{
			if (attrdef == NULL)
				attrdef = (AttrDefault *)
1007 1008 1009
					MemoryContextAllocZero(CacheMemoryContext,
										   relation->rd_rel->relnatts *
										   sizeof(AttrDefault));
1010 1011 1012
			attrdef[ndef].adnum = attp->attnum;
			attrdef[ndef].adbin = NULL;
			ndef++;
1013
		}
1014 1015 1016
		need--;
		if (need == 0)
			break;
1017
	}
1018

1019
	/*
B
Bruce Momjian 已提交
1020
	 * end the scan and close the attribute relation
1021
	 */
1022
	systable_endscan(pg_attribute_scan);
1023
	heap_close(pg_attribute_desc, AccessShareLock);
H
Hiroshi Inoue 已提交
1024

1025 1026 1027 1028
	if (need != 0)
		elog(ERROR, "catalog is missing %d attribute(s) for relid %u",
			 need, RelationGetRelid(relation));

1029
	/*
B
Bruce Momjian 已提交
1030 1031 1032
	 * The attcacheoff values we read from pg_attribute should all be -1
	 * ("unknown").  Verify this if assert checking is on.	They will be
	 * computed when and if needed during tuple access.
1033 1034 1035
	 */
#ifdef USE_ASSERT_CHECKING
	{
B
Bruce Momjian 已提交
1036
		int			i;
1037 1038 1039 1040 1041 1042

		for (i = 0; i < relation->rd_rel->relnatts; i++)
			Assert(relation->rd_att->attrs[i]->attcacheoff == -1);
	}
#endif

1043
	/*
B
Bruce Momjian 已提交
1044
	 * However, we can easily set the attcacheoff value for the first
B
Bruce Momjian 已提交
1045 1046
	 * attribute: it must be zero.	This eliminates the need for special cases
	 * for attnum=1 that used to exist in fastgetattr() and index_getattr().
1047
	 */
1048 1049
	if (relation->rd_rel->relnatts > 0)
		relation->rd_att->attrs[0]->attcacheoff = 0;
1050

1051 1052 1053 1054
	/*
	 * Set up constraint/default info
	 */
	if (constr->has_not_null || ndef > 0 || relation->rd_rel->relchecks)
1055
	{
1056
		relation->rd_att->constr = constr;
1057

1058
		if (ndef > 0)			/* DEFAULTs */
1059
		{
1060 1061 1062 1063 1064 1065 1066
			if (ndef < relation->rd_rel->relnatts)
				constr->defval = (AttrDefault *)
					repalloc(attrdef, ndef * sizeof(AttrDefault));
			else
				constr->defval = attrdef;
			constr->num_defval = ndef;
			AttrDefaultFetch(relation);
1067
		}
1068 1069
		else
			constr->num_defval = 0;
1070

1071
		if (relation->rd_rel->relchecks > 0)	/* CHECKs */
1072
		{
1073 1074
			constr->num_check = relation->rd_rel->relchecks;
			constr->check = (ConstrCheck *)
1075
				MemoryContextAllocZero(CacheMemoryContext,
B
Bruce Momjian 已提交
1076
									constr->num_check * sizeof(ConstrCheck));
1077
			CheckConstraintFetch(relation);
1078
		}
1079 1080 1081 1082 1083 1084 1085
		else
			constr->num_check = 0;
	}
	else
	{
		pfree(constr);
		relation->rd_att->constr = NULL;
1086
	}
1087 1088
}

1089
/*
1090
 *		RelationBuildRuleLock
1091
 *
1092 1093
 *		Form the relation's rewrite rules from information in
 *		the pg_rewrite system catalog.
1094 1095 1096 1097 1098 1099 1100
 *
 * Note: The rule parsetrees are potentially very complex node structures.
 * To allow these trees to be freed when the relcache entry is flushed,
 * we make a private memory context to hold the RuleLock information for
 * each relcache entry that has associated rules.  The context is used
 * just for rule info, not for any other subsidiary data of the relcache
 * entry, because that keeps the update logic in RelationClearRelation()
B
Bruce Momjian 已提交
1101
 * manageable.	The other subsidiary data structures are simple enough
1102
 * to be easy to free explicitly, anyway.
1103 1104 1105 1106
 */
static void
RelationBuildRuleLock(Relation relation)
{
1107 1108
	MemoryContext rulescxt;
	MemoryContext oldcxt;
1109 1110 1111
	HeapTuple	rewrite_tuple;
	Relation	rewrite_desc;
	TupleDesc	rewrite_tupdesc;
1112 1113
	SysScanDesc rewrite_scan;
	ScanKeyData key;
1114 1115 1116 1117
	RuleLock   *rulelock;
	int			numlocks;
	RewriteRule **rules;
	int			maxlocks;
1118

1119
	/*
B
Bruce Momjian 已提交
1120 1121
	 * Make the private context.  Parameters are set on the assumption that
	 * it'll probably not contain much data.
1122 1123 1124
	 */
	rulescxt = AllocSetContextCreate(CacheMemoryContext,
									 RelationGetRelationName(relation),
1125 1126 1127
									 ALLOCSET_SMALL_MINSIZE,
									 ALLOCSET_SMALL_INITSIZE,
									 ALLOCSET_SMALL_MAXSIZE);
1128 1129
	relation->rd_rulescxt = rulescxt;

1130
	/*
B
Bruce Momjian 已提交
1131 1132
	 * allocate an array to hold the rewrite rules (the array is extended if
	 * necessary)
1133 1134
	 */
	maxlocks = 4;
1135 1136
	rules = (RewriteRule **)
		MemoryContextAlloc(rulescxt, sizeof(RewriteRule *) * maxlocks);
1137 1138
	numlocks = 0;

1139 1140 1141 1142 1143 1144 1145 1146
	/*
	 * form a scan key
	 */
	ScanKeyInit(&key,
				Anum_pg_rewrite_ev_class,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));

1147
	/*
B
Bruce Momjian 已提交
1148
	 * open pg_rewrite and begin a scan
1149
	 *
1150 1151
	 * Note: since we scan the rules using RewriteRelRulenameIndexId, we will
	 * be reading the rules in name order, except possibly during
B
Bruce Momjian 已提交
1152 1153
	 * emergency-recovery operations (ie, IgnoreSystemIndexes). This in turn
	 * ensures that rules will be fired in name order.
1154
	 */
1155
	rewrite_desc = heap_open(RewriteRelationId, AccessShareLock);
1156 1157
	rewrite_tupdesc = RelationGetDescr(rewrite_desc);

1158 1159 1160 1161
	rewrite_scan = systable_beginscan(rewrite_desc,
									  RewriteRelRulenameIndexId,
									  true, SnapshotNow,
									  1, &key);
1162

1163
	while (HeapTupleIsValid(rewrite_tuple = systable_getnext(rewrite_scan)))
1164
	{
1165
		Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
1166
		bool		isnull;
1167 1168
		Datum		rule_datum;
		char	   *rule_str;
1169
		RewriteRule *rule;
1170

1171 1172
		rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
												  sizeof(RewriteRule));
1173

1174
		rule->ruleId = HeapTupleGetOid(rewrite_tuple);
1175

1176 1177
		rule->event = rewrite_form->ev_type - '0';
		rule->attrno = rewrite_form->ev_attr;
1178
		rule->enabled = rewrite_form->ev_enabled;
1179 1180
		rule->isInstead = rewrite_form->is_instead;

1181
		/*
B
Bruce Momjian 已提交
1182 1183 1184 1185
		 * Must use heap_getattr to fetch ev_action and ev_qual.  Also, the
		 * rule strings are often large enough to be toasted.  To avoid
		 * leaking memory in the caller's context, do the detoasting here so
		 * we can free the detoasted version.
1186 1187
		 */
		rule_datum = heap_getattr(rewrite_tuple,
1188
								  Anum_pg_rewrite_ev_action,
1189
								  rewrite_tupdesc,
B
Bruce Momjian 已提交
1190
								  &isnull);
B
Bruce Momjian 已提交
1191
		Assert(!isnull);
1192
		rule_str = TextDatumGetCString(rule_datum);
1193
		oldcxt = MemoryContextSwitchTo(rulescxt);
1194
		rule->actions = (List *) stringToNode(rule_str);
1195
		MemoryContextSwitchTo(oldcxt);
1196
		pfree(rule_str);
1197

1198 1199 1200 1201
		rule_datum = heap_getattr(rewrite_tuple,
								  Anum_pg_rewrite_ev_qual,
								  rewrite_tupdesc,
								  &isnull);
B
Bruce Momjian 已提交
1202
		Assert(!isnull);
1203
		rule_str = TextDatumGetCString(rule_datum);
1204
		oldcxt = MemoryContextSwitchTo(rulescxt);
1205
		rule->qual = (Node *) stringToNode(rule_str);
1206
		MemoryContextSwitchTo(oldcxt);
1207
		pfree(rule_str);
1208

1209 1210
		/*
		 * We want the rule's table references to be checked as though by the
B
Bruce Momjian 已提交
1211
		 * table owner, not the user referencing the rule.	Therefore, scan
1212
		 * through the rule's actions and set the checkAsUser field on all
B
Bruce Momjian 已提交
1213
		 * rtable entries.	We have to look at the qual as well, in case it
1214 1215
		 * contains sublinks.
		 *
B
Bruce Momjian 已提交
1216 1217 1218 1219 1220
		 * The reason for doing this when the rule is loaded, rather than when
		 * it is stored, is that otherwise ALTER TABLE OWNER would have to
		 * grovel through stored rules to update checkAsUser fields. Scanning
		 * the rule tree during load is relatively cheap (compared to
		 * constructing it in the first place), so we do it here.
1221 1222 1223 1224
		 */
		setRuleCheckAsUser((Node *) rule->actions, relation->rd_rel->relowner);
		setRuleCheckAsUser(rule->qual, relation->rd_rel->relowner);

1225
		if (numlocks >= maxlocks)
1226 1227
		{
			maxlocks *= 2;
1228 1229
			rules = (RewriteRule **)
				repalloc(rules, sizeof(RewriteRule *) * maxlocks);
1230
		}
1231
		rules[numlocks++] = rule;
1232
	}
1233

1234
	/*
B
Bruce Momjian 已提交
1235
	 * end the scan and close the attribute relation
1236
	 */
1237
	systable_endscan(rewrite_scan);
1238
	heap_close(rewrite_desc, AccessShareLock);
1239

1240
	/*
B
Bruce Momjian 已提交
1241
	 * form a RuleLock and insert into relation
1242
	 */
1243
	rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
1244 1245 1246 1247
	rulelock->numLocks = numlocks;
	rulelock->rules = rules;

	relation->rd_rules = rulelock;
1248 1249
}

1250
/*
1251 1252 1253 1254 1255 1256 1257 1258 1259
 *		equalRuleLocks
 *
 *		Determine whether two RuleLocks are equivalent
 *
 *		Probably this should be in the rules code someplace...
 */
static bool
equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
{
1260
	int			i;
1261

1262
	/*
B
Bruce Momjian 已提交
1263
	 * As of 7.3 we assume the rule ordering is repeatable, because
B
Bruce Momjian 已提交
1264 1265
	 * RelationBuildRuleLock should read 'em in a consistent order.  So just
	 * compare corresponding slots.
1266
	 */
1267 1268 1269 1270 1271 1272 1273 1274 1275
	if (rlock1 != NULL)
	{
		if (rlock2 == NULL)
			return false;
		if (rlock1->numLocks != rlock2->numLocks)
			return false;
		for (i = 0; i < rlock1->numLocks; i++)
		{
			RewriteRule *rule1 = rlock1->rules[i];
1276 1277 1278
			RewriteRule *rule2 = rlock2->rules[i];

			if (rule1->ruleId != rule2->ruleId)
1279 1280 1281 1282 1283
				return false;
			if (rule1->event != rule2->event)
				return false;
			if (rule1->attrno != rule2->attrno)
				return false;
1284 1285
			if (rule1->enabled != rule2->enabled)
				return false;
1286 1287
			if (rule1->isInstead != rule2->isInstead)
				return false;
1288
			if (!equal(rule1->qual, rule2->qual))
1289
				return false;
1290
			if (!equal(rule1->actions, rule2->actions))
1291 1292 1293 1294 1295 1296
				return false;
		}
	}
	else if (rlock2 != NULL)
		return false;
	return true;
1297 1298 1299
}


1300
/*
1301 1302
 *		RelationBuildDesc
 *
1303 1304 1305 1306
 *		Build a relation descriptor.  The caller must hold at least
 *		AccessShareLock on the target relid.
 *
 *		The new descriptor is inserted into the hash table if insertIt is true.
1307 1308 1309 1310
 *
 *		Returns NULL if no pg_class row could be found for the given relid
 *		(suggesting we are trying to access a just-deleted relation).
 *		Any other error is reported via elog.
1311
 */
1312
static Relation
1313
RelationBuildDesc(Oid targetRelId, bool insertIt)
1314
{
1315 1316
	Relation	relation;
	Oid			relid;
1317
	Relation    pg_class_relation;
1318
	HeapTuple	pg_class_tuple;
1319
	Form_pg_class relp;
1320

1321
	/*
B
Bruce Momjian 已提交
1322
	 * find the tuple in pg_class corresponding to the given relation id
1323
	 */
1324
	pg_class_tuple = ScanPgRelation(targetRelId, true, &pg_class_relation);
1325

1326
	/*
B
Bruce Momjian 已提交
1327
	 * if no such tuple exists, return NULL
1328 1329 1330 1331
	 */
	if (!HeapTupleIsValid(pg_class_tuple))
		return NULL;

1332
	/*
B
Bruce Momjian 已提交
1333
	 * get information from the pg_class_tuple
1334
	 */
1335
	relid = HeapTupleGetOid(pg_class_tuple);
1336
	relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
1337
	heap_close(pg_class_relation, AccessShareLock);
1338

1339
	/*
B
Bruce Momjian 已提交
1340
	 * allocate storage for the relation descriptor, and copy pg_class_tuple
1341
	 * to relation->rd_rel and new fields into relation->rd_newfields.
1342
	 */
1343
	relation = AllocateRelationDesc(relp);
1344

1345
	/*
B
Bruce Momjian 已提交
1346
	 * initialize the relation's relation id (relation->rd_id)
1347
	 */
1348
	RelationGetRelid(relation) = relid;
1349

1350
	/*
B
Bruce Momjian 已提交
1351 1352 1353
	 * normal relations are not nailed into the cache; nor can a pre-existing
	 * relation be new.  It could be temp though.  (Actually, it could be new
	 * too, but it's okay to forget that fact if forced to flush the entry.)
1354
	 */
1355
	relation->rd_refcnt = 0;
1356
	relation->rd_isnailed = false;
1357
	relation->rd_createSubid = InvalidSubTransactionId;
1358
	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1359
	relation->rd_istemp = isTempOrToastNamespace(relation->rd_rel->relnamespace);
1360 1361
	relation->rd_issyscat = (strncmp(relation->rd_rel->relname.data, "pg_", 3) == 0);

1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372
	/*
	 * CDB: On QEs, temp relations must use shared buffer cache so data
	 * will be visible to all segmates.  On QD, sequence objects must
	 * use shared buffer cache so data will be visible to sequence server.
	 */
	if (relation->rd_istemp &&
		relation->rd_rel->relkind != RELKIND_SEQUENCE &&
		Gp_role != GP_ROLE_EXECUTE)
		relation->rd_isLocalBuf = true;
	else
		relation->rd_isLocalBuf = false;
1373

1374
	/*
B
Bruce Momjian 已提交
1375
	 * initialize the tuple descriptor (relation->rd_att).
1376
	 */
1377
	RelationBuildTupleDesc(relation);
1378

1379
	/*
B
Bruce Momjian 已提交
1380
	 * Fetch rules and triggers that affect this relation
1381
	 */
1382
	if (relation->rd_rel->relhasrules)
1383 1384
		RelationBuildRuleLock(relation);
	else
1385
	{
1386
		relation->rd_rules = NULL;
1387 1388
		relation->rd_rulescxt = NULL;
	}
1389

1390
	if (relation->rd_rel->reltriggers > 0)
1391 1392 1393 1394
		RelationBuildTriggers(relation);
	else
		relation->trigdesc = NULL;

1395
	/*
1396
	 * if it's an index, initialize index-related information
1397
	 */
1398
	if (OidIsValid(relation->rd_rel->relam))
1399
		RelationInitIndexAccessInfo(relation);
1400

1401 1402 1403 1404 1405 1406 1407 1408 1409
	/*
	 * if it's an append-only table, get information from pg_appendonly
	 */
	if (relation->rd_rel->relstorage == RELSTORAGE_AOROWS ||
		relation->rd_rel->relstorage == RELSTORAGE_AOCOLS)
	{
		RelationInitAppendOnlyInfo(relation);
	}

1410 1411 1412
	/* extract reloptions if any */
	RelationParseRelOptions(relation, pg_class_tuple);

1413
	/*
B
Bruce Momjian 已提交
1414
	 * initialize the relation lock manager information
1415 1416 1417
	 */
	RelationInitLockInfo(relation);		/* see lmgr.c */

1418 1419 1420 1421
	/*
	 * initialize physical addressing information for the relation
	 */
	RelationInitPhysicalAddr(relation);
1422

1423
	/* make sure relation is marked as having no open file yet */
1424
	relation->rd_smgr = NULL;
1425

1426 1427 1428 1429 1430 1431 1432 1433 1434
    /*
     * initialize Greenplum Database partitioning info
     */
    if (relation->rd_rel->relkind == RELKIND_RELATION &&
        !IsSystemRelation(relation))
        relation->rd_cdbpolicy = GpPolicyFetch(CacheMemoryContext, targetRelId);

    relation->rd_cdbDefaultStatsWarningIssued = false;

B
Bruce Momjian 已提交
1435 1436 1437 1438 1439
	/*
	 * now we can free the memory allocated for pg_class_tuple
	 */
	heap_freetuple(pg_class_tuple);

1440
	/*
1441
	 * Insert newly created relation into relcache hash table, if requested.
1442
	 */
1443 1444
	if (insertIt)
		RelationCacheInsert(relation);
1445

1446 1447 1448
	/* It's fully valid */
	relation->rd_isvalid = true;

1449
	return relation;
1450 1451
}

1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468
/*
 * Initialize the physical addressing info (RelFileNode) for a relcache entry
 */
static void
RelationInitPhysicalAddr(Relation relation)
{
	if (relation->rd_rel->reltablespace)
		relation->rd_node.spcNode = relation->rd_rel->reltablespace;
	else
		relation->rd_node.spcNode = MyDatabaseTableSpace;
	if (relation->rd_rel->relisshared)
		relation->rd_node.dbNode = InvalidOid;
	else
		relation->rd_node.dbNode = MyDatabaseId;
	relation->rd_node.relNode = relation->rd_rel->relfilenode;
}

1469 1470 1471 1472 1473
/*
 * Initialize index-access-method support data for an index relation
 */
void
RelationInitIndexAccessInfo(Relation relation)
1474
{
1475 1476
	HeapTuple	tuple;
	Form_pg_am	aform;
1477
	Datum		indclassDatum;
1478
	Datum		indoptionDatum;
1479
	bool		isnull;
1480
	oidvector  *indclass;
B
Bruce Momjian 已提交
1481
	int2vector *indoption;
1482
	MemoryContext indexcxt;
1483
	MemoryContext oldcontext;
1484
	int			natts;
1485 1486
	uint16		amstrategies;
	uint16		amsupport;
1487 1488

	/*
1489
	 * Make a copy of the pg_index entry for the index.  Since pg_index
B
Bruce Momjian 已提交
1490 1491
	 * contains variable-length and possibly-null fields, we have to do this
	 * honestly rather than just treating it as a Form_pg_index struct.
1492 1493 1494 1495 1496
	 */
	tuple = SearchSysCache(INDEXRELID,
						   ObjectIdGetDatum(RelationGetRelid(relation)),
						   0, 0, 0);
	if (!HeapTupleIsValid(tuple))
1497
		elog(ERROR, "cache lookup failed for index %u",
1498
			 RelationGetRelid(relation));
1499 1500 1501 1502
	oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_indextuple = heap_copytuple(tuple);
	relation->rd_index = (Form_pg_index) GETSTRUCT(relation->rd_indextuple);
	MemoryContextSwitchTo(oldcontext);
1503 1504 1505 1506 1507 1508 1509 1510 1511
	ReleaseSysCache(tuple);

	/*
	 * Make a copy of the pg_am entry for the index's access method
	 */
	tuple = SearchSysCache(AMOID,
						   ObjectIdGetDatum(relation->rd_rel->relam),
						   0, 0, 0);
	if (!HeapTupleIsValid(tuple))
1512
		elog(ERROR, "cache lookup failed for access method %u",
1513 1514 1515 1516 1517
			 relation->rd_rel->relam);
	aform = (Form_pg_am) MemoryContextAlloc(CacheMemoryContext, sizeof *aform);
	memcpy(aform, GETSTRUCT(tuple), sizeof *aform);
	ReleaseSysCache(tuple);
	relation->rd_am = aform;
1518 1519

	natts = relation->rd_rel->relnatts;
1520
	if (natts != relation->rd_index->indnatts)
1521
		elog(ERROR, "relnatts disagrees with indnatts for index %u",
1522
			 RelationGetRelid(relation));
1523 1524
	amstrategies = aform->amstrategies;
	amsupport = aform->amsupport;
1525

1526
	/*
B
Bruce Momjian 已提交
1527 1528 1529
	 * Make the private context to hold index access info.	The reason we need
	 * a context, and not just a couple of pallocs, is so that we won't leak
	 * any subsidiary info attached to fmgr lookup records.
1530 1531 1532 1533 1534 1535
	 *
	 * Context parameters are set on the assumption that it'll probably not
	 * contain much data.
	 */
	indexcxt = AllocSetContextCreate(CacheMemoryContext,
									 RelationGetRelationName(relation),
1536 1537 1538
									 ALLOCSET_SMALL_MINSIZE,
									 ALLOCSET_SMALL_INITSIZE,
									 ALLOCSET_SMALL_MAXSIZE);
1539 1540 1541 1542 1543
	relation->rd_indexcxt = indexcxt;

	/*
	 * Allocate arrays to hold data
	 */
1544 1545 1546
	relation->rd_aminfo = (RelationAmInfo *)
		MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));

1547 1548 1549 1550 1551
	relation->rd_opfamily = (Oid *)
		MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
	relation->rd_opcintype = (Oid *)
		MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));

1552
	if (amstrategies > 0)
1553
		relation->rd_operator = (Oid *)
1554 1555
			MemoryContextAllocZero(indexcxt,
								   natts * amstrategies * sizeof(Oid));
1556
	else
1557
		relation->rd_operator = NULL;
1558

1559
	if (amsupport > 0)
1560
	{
1561
		int			nsupport = natts * amsupport;
1562

1563
		relation->rd_support = (RegProcedure *)
1564
			MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
1565
		relation->rd_supportinfo = (FmgrInfo *)
1566
			MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
1567 1568
	}
	else
1569
	{
1570 1571
		relation->rd_support = NULL;
		relation->rd_supportinfo = NULL;
1572
	}
1573

1574 1575 1576
	relation->rd_indoption = (int16 *)
		MemoryContextAllocZero(indexcxt, natts * sizeof(int16));

1577 1578
	/*
	 * indclass cannot be referenced directly through the C struct, because it
B
Bruce Momjian 已提交
1579 1580
	 * comes after the variable-width indkey field.  Must extract the datum
	 * the hard way...
1581 1582 1583 1584 1585 1586 1587
	 */
	indclassDatum = fastgetattr(relation->rd_indextuple,
								Anum_pg_index_indclass,
								GetPgIndexDescriptor(),
								&isnull);
	Assert(!isnull);
	indclass = (oidvector *) DatumGetPointer(indclassDatum);
1588

1589
	/*
B
Bruce Momjian 已提交
1590 1591 1592
	 * Fill the operator and support procedure OID arrays, as well as the info
	 * about opfamilies and opclass input types.  (aminfo and supportinfo are
	 * left as zeroes, and are filled on-the-fly when used)
1593
	 */
1594 1595 1596
	IndexSupportInitialize(indclass,
						   relation->rd_operator, relation->rd_support,
						   relation->rd_opfamily, relation->rd_opcintype,
1597
						   amstrategies, amsupport, natts);
1598

1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609
	/*
	 * Similarly extract indoption and copy it to the cache entry
	 */
	indoptionDatum = fastgetattr(relation->rd_indextuple,
								 Anum_pg_index_indoption,
								 GetPgIndexDescriptor(),
								 &isnull);
	Assert(!isnull);
	indoption = (int2vector *) DatumGetPointer(indoptionDatum);
	memcpy(relation->rd_indoption, indoption->values, natts * sizeof(int16));

1610 1611 1612 1613 1614
	/*
	 * expressions and predicate cache will be filled later
	 */
	relation->rd_indexprs = NIL;
	relation->rd_indpred = NIL;
1615
	relation->rd_amcache = NULL;
1616 1617
}

1618
/*
1619
 * IndexSupportInitialize
1620
 *		Initializes an index's cached opclass information,
1621
 *		given the index's pg_index.indclass entry.
1622
 *
1623 1624
 * Data is returned into *indexOperator, *indexSupport, *opFamily, and
 * *opcInType, which are arrays allocated by the caller.
1625 1626 1627 1628 1629 1630 1631
 *
 * The caller also passes maxStrategyNumber, maxSupportNumber, and
 * maxAttributeNumber, since these indicate the size of the arrays
 * it has allocated --- but in practice these numbers must always match
 * those obtainable from the system catalog entries for the index and
 * access method.
 */
1632
void
1633
IndexSupportInitialize(oidvector *indclass,
1634 1635
					   Oid *indexOperator,
					   RegProcedure *indexSupport,
1636 1637
					   Oid *opFamily,
					   Oid *opcInType,
1638 1639 1640 1641 1642 1643 1644 1645 1646 1647
					   StrategyNumber maxStrategyNumber,
					   StrategyNumber maxSupportNumber,
					   AttrNumber maxAttributeNumber)
{
	int			attIndex;

	for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
	{
		OpClassCacheEnt *opcentry;

1648
		if (!OidIsValid(indclass->values[attIndex]))
1649
			elog(ERROR, "bogus pg_index tuple");
1650 1651

		/* look up the info for this opclass, using a cache */
1652
		opcentry = LookupOpclassInfo(indclass->values[attIndex],
1653 1654 1655
									 maxStrategyNumber,
									 maxSupportNumber);

1656
		/* copy cached data into relcache entry */
1657 1658
		opFamily[attIndex] = opcentry->opcfamily;
		opcInType[attIndex] = opcentry->opcintype;
1659
		if (maxStrategyNumber > 0)
1660 1661 1662
			memcpy(&indexOperator[attIndex * maxStrategyNumber],
				   opcentry->operatorOids,
				   maxStrategyNumber * sizeof(Oid));
1663
		if (maxSupportNumber > 0)
1664 1665 1666
			memcpy(&indexSupport[attIndex * maxSupportNumber],
				   opcentry->supportProcs,
				   maxSupportNumber * sizeof(RegProcedure));
1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681
	}
}

/*
 * LookupOpclassInfo
 *
 * This routine maintains a per-opclass cache of the information needed
 * by IndexSupportInitialize().  This is more efficient than relying on
 * the catalog cache, because we can load all the info about a particular
 * opclass in a single indexscan of pg_amproc or pg_amop.
 *
 * The information from pg_am about expected range of strategy and support
 * numbers is passed in, rather than being looked up, mainly because the
 * caller will have it already.
 *
1682 1683 1684
 * Note there is no provision for flushing the cache.  This is OK at the
 * moment because there is no way to ALTER any interesting properties of an
 * existing opclass --- all you can do is drop it, which will result in
1685
 * a useless but harmless dead entry in the cache.  To support altering
1686 1687 1688
 * opclass membership (not the same as opfamily membership!), we'd need to
 * be able to flush this cache as well as the contents of relcache entries
 * for indexes.
1689 1690 1691 1692 1693 1694 1695 1696
 */
static OpClassCacheEnt *
LookupOpclassInfo(Oid operatorClassOid,
				  StrategyNumber numStrats,
				  StrategyNumber numSupport)
{
	OpClassCacheEnt *opcentry;
	bool		found;
1697 1698
	Relation	rel;
	SysScanDesc scan;
1699
	ScanKeyData skey[3];
1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713
	HeapTuple	htup;
	bool		indexOK;

	if (OpClassCache == NULL)
	{
		/* First time through: initialize the opclass cache */
		HASHCTL		ctl;

		if (!CacheMemoryContext)
			CreateCacheMemoryContext();

		MemSet(&ctl, 0, sizeof(ctl));
		ctl.keysize = sizeof(Oid);
		ctl.entrysize = sizeof(OpClassCacheEnt);
1714
		ctl.hash = oid_hash;
1715 1716 1717 1718 1719 1720 1721 1722
		OpClassCache = hash_create("Operator class cache", 64,
								   &ctl, HASH_ELEM | HASH_FUNCTION);
	}

	opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
											   (void *) &operatorClassOid,
											   HASH_ENTER, &found);

1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744
	if (!found)
	{
		/* Need to allocate memory for new entry */
		opcentry->valid = false;	/* until known OK */
		opcentry->numStrats = numStrats;
		opcentry->numSupport = numSupport;

		if (numStrats > 0)
			opcentry->operatorOids = (Oid *)
				MemoryContextAllocZero(CacheMemoryContext,
									   numStrats * sizeof(Oid));
		else
			opcentry->operatorOids = NULL;

		if (numSupport > 0)
			opcentry->supportProcs = (RegProcedure *)
				MemoryContextAllocZero(CacheMemoryContext,
									   numSupport * sizeof(RegProcedure));
		else
			opcentry->supportProcs = NULL;
	}
	else
1745 1746 1747 1748 1749
	{
		Assert(numStrats == opcentry->numStrats);
		Assert(numSupport == opcentry->numSupport);
	}

1750 1751 1752 1753 1754 1755 1756 1757 1758 1759
	/*
	 * When testing for cache-flush hazards, we intentionally disable the
	 * operator class cache and force reloading of the info on each call.
	 * This is helpful because we want to test the case where a cache flush
	 * occurs while we are loading the info, and it's very hard to provoke
	 * that if this happens only once per opclass per backend.
	 */
#if defined(CLOBBER_CACHE_ALWAYS)
	opcentry->valid = false;
#endif
1760

1761 1762
	if (opcentry->valid)
		return opcentry;
1763 1764

	/*
1765 1766
	 * Need to fill in new entry.
	 *
B
Bruce Momjian 已提交
1767 1768 1769
	 * To avoid infinite recursion during startup, force heap scans if we're
	 * looking up info for the opclasses used by the indexes we would like to
	 * reference here.
1770 1771 1772 1773 1774
	 */
	indexOK = criticalRelcachesBuilt ||
		(operatorClassOid != OID_BTREE_OPS_OID &&
		 operatorClassOid != INT2_BTREE_OPS_OID);

1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802
	/*
	 * We have to fetch the pg_opclass row to determine its opfamily and
	 * opcintype, which are needed to look up the operators and functions.
	 * It'd be convenient to use the syscache here, but that probably doesn't
	 * work while bootstrapping.
	 */
	ScanKeyInit(&skey[0],
				ObjectIdAttributeNumber,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(operatorClassOid));
	rel = heap_open(OperatorClassRelationId, AccessShareLock);
	scan = systable_beginscan(rel, OpclassOidIndexId, indexOK,
							  SnapshotNow, 1, skey);

	if (HeapTupleIsValid(htup = systable_getnext(scan)))
	{
		Form_pg_opclass opclassform = (Form_pg_opclass) GETSTRUCT(htup);

		opcentry->opcfamily = opclassform->opcfamily;
		opcentry->opcintype = opclassform->opcintype;
	}
	else
		elog(ERROR, "could not find tuple for opclass %u", operatorClassOid);

	systable_endscan(scan);
	heap_close(rel, AccessShareLock);


1803
	/*
B
Bruce Momjian 已提交
1804
	 * Scan pg_amop to obtain operators for the opclass.  We only fetch the
1805
	 * default ones (those with lefttype = righttype = opcintype).
1806 1807 1808
	 */
	if (numStrats > 0)
	{
1809
		ScanKeyInit(&skey[0],
1810
					Anum_pg_amop_amopfamily,
1811
					BTEqualStrategyNumber, F_OIDEQ,
1812
					ObjectIdGetDatum(opcentry->opcfamily));
1813
		ScanKeyInit(&skey[1],
1814 1815 1816 1817 1818
					Anum_pg_amop_amoplefttype,
					BTEqualStrategyNumber, F_OIDEQ,
					ObjectIdGetDatum(opcentry->opcintype));
		ScanKeyInit(&skey[2],
					Anum_pg_amop_amoprighttype,
1819
					BTEqualStrategyNumber, F_OIDEQ,
1820
					ObjectIdGetDatum(opcentry->opcintype));
1821 1822
		rel = heap_open(AccessMethodOperatorRelationId, AccessShareLock);
		scan = systable_beginscan(rel, AccessMethodStrategyIndexId, indexOK,
1823
								  SnapshotNow, 3, skey);
1824 1825

		while (HeapTupleIsValid(htup = systable_getnext(scan)))
1826 1827 1828 1829 1830
		{
			Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(htup);

			if (amopform->amopstrategy <= 0 ||
				(StrategyNumber) amopform->amopstrategy > numStrats)
1831
				elog(ERROR, "invalid amopstrategy number %d for opclass %u",
1832 1833 1834 1835 1836
					 amopform->amopstrategy, operatorClassOid);
			opcentry->operatorOids[amopform->amopstrategy - 1] =
				amopform->amopopr;
		}

1837 1838
		systable_endscan(scan);
		heap_close(rel, AccessShareLock);
1839 1840 1841
	}

	/*
B
Bruce Momjian 已提交
1842
	 * Scan pg_amproc to obtain support procs for the opclass.	We only fetch
1843
	 * the default ones (those with lefttype = righttype = opcintype).
1844 1845 1846
	 */
	if (numSupport > 0)
	{
1847
		ScanKeyInit(&skey[0],
1848
					Anum_pg_amproc_amprocfamily,
1849
					BTEqualStrategyNumber, F_OIDEQ,
1850
					ObjectIdGetDatum(opcentry->opcfamily));
1851
		ScanKeyInit(&skey[1],
1852
					Anum_pg_amproc_amproclefttype,
1853
					BTEqualStrategyNumber, F_OIDEQ,
1854 1855 1856 1857 1858
					ObjectIdGetDatum(opcentry->opcintype));
		ScanKeyInit(&skey[2],
					Anum_pg_amproc_amprocrighttype,
					BTEqualStrategyNumber, F_OIDEQ,
					ObjectIdGetDatum(opcentry->opcintype));
1859 1860
		rel = heap_open(AccessMethodProcedureRelationId, AccessShareLock);
		scan = systable_beginscan(rel, AccessMethodProcedureIndexId, indexOK,
1861
								  SnapshotNow, 3, skey);
1862 1863

		while (HeapTupleIsValid(htup = systable_getnext(scan)))
1864 1865 1866 1867 1868
		{
			Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);

			if (amprocform->amprocnum <= 0 ||
				(StrategyNumber) amprocform->amprocnum > numSupport)
1869
				elog(ERROR, "invalid amproc number %d for opclass %u",
1870 1871 1872 1873 1874 1875
					 amprocform->amprocnum, operatorClassOid);

			opcentry->supportProcs[amprocform->amprocnum - 1] =
				amprocform->amproc;
		}

1876 1877
		systable_endscan(scan);
		heap_close(rel, AccessShareLock);
1878 1879 1880 1881 1882 1883 1884 1885 1886 1887
	}

	opcentry->valid = true;
	return opcentry;
}


/*
 *		formrdesc
 *
1888 1889
 *		This is a special cut-down version of RelationBuildDesc(),
 *		used while initializing the relcache.
1890
 *		The relation descriptor is built just from the supplied parameters,
1891 1892
 *		without actually looking at any system table entries.  We cheat
 *		quite a lot since we only need to work for a few basic system
1893 1894
 *		catalogs.
 *
1895 1896 1897
 * formrdesc is currently used for: pg_database, pg_authid, pg_auth_members,
 * pg_class, pg_attribute, pg_proc, and pg_type
 * (see RelationCacheInitializePhase2/3).
1898
 *
1899 1900
 * Note that these catalogs can't have constraints (except attnotnull),
 * default values, rules, or triggers, since we don't cope with any of that.
1901 1902 1903
 * (Well, actually, this only matters for properties that need to be valid
 * during bootstrap or before RelationCacheInitializePhase3 runs, and none of
 * these properties matter then...)
1904
 *
1905
 * NOTE: we assume we are already switched into CacheMemoryContext.
1906 1907
 */
static void
1908
formrdesc(const char *relationName, Oid relationReltype,
1909 1910
		  bool isshared, bool hasoids,
		  int natts, const FormData_pg_attribute *attrs)
1911
{
1912
	Relation	relation;
1913
	int			i;
1914
	bool		has_not_null;
1915

1916
	/*
1917
	 * allocate new relation desc, clear all fields of reldesc
1918
	 */
1919
	relation = (Relation) palloc0(sizeof(RelationData));
1920 1921 1922
	relation->rd_targblock = InvalidBlockNumber;

	/* make sure relation is marked as having no open file yet */
1923
	relation->rd_smgr = NULL;
1924

1925
	/*
1926
	 * initialize reference count: 1 because it is nailed in cache
1927
	 */
1928
	relation->rd_refcnt = 1;
1929

1930
	/*
B
Bruce Momjian 已提交
1931 1932
	 * all entries built with this routine are nailed-in-cache; none are for
	 * new or temp relations.
1933
	 */
1934
	relation->rd_isnailed = true;
1935
	relation->rd_createSubid = InvalidSubTransactionId;
1936
	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1937
	relation->rd_istemp = false;
1938 1939
	relation->rd_issyscat = (strncmp(relationName, "pg_", 3) == 0);	/* GP */
    relation->rd_isLocalBuf = false;    /*CDB*/
1940

1941
	/*
B
Bruce Momjian 已提交
1942
	 * initialize relation tuple form
1943
	 *
1944 1945
	 * The data we insert here is pretty incomplete/bogus, but it'll serve to
	 * get us launched.  RelationCacheInitializePhase2() will read the real
1946 1947 1948
	 * data from pg_class and replace what we've done here.  Note in particular
	 * that relowner is left as zero; this cues RelationCacheInitializePhase2
	 * that the real data isn't there yet.
1949
	 */
1950
	relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
1951

1952 1953
	namestrcpy(&relation->rd_rel->relname, relationName);
	relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
1954
	relation->rd_rel->reltype = relationReltype;
1955 1956

	/*
B
Bruce Momjian 已提交
1957
	 * It's important to distinguish between shared and non-shared relations,
1958
	 * even at bootstrap time, to make sure we know where they are stored.
1959
	 */
1960 1961 1962
	relation->rd_rel->relisshared = isshared;
	if (isshared)
		relation->rd_rel->reltablespace = GLOBALTABLESPACE_OID;
1963

1964 1965
	relation->rd_rel->relpages = 0;
	relation->rd_rel->reltuples = 0;
1966
	relation->rd_rel->relkind = RELKIND_RELATION;
1967
	relation->rd_rel->relstorage = RELSTORAGE_HEAP;
1968
	relation->rd_rel->relhasoids = hasoids;
1969
	relation->rd_rel->relnatts = (int16) natts;
1970

1971 1972 1973 1974 1975 1976
	/*
	 * Physical file-system information.
	 */
	relation->rd_segfile0_relationnodeinfo.isPresent = false;
	relation->rd_segfile0_relationnodeinfo.tidAllowedToBeZero = false;
	
1977
	/*
B
Bruce Momjian 已提交
1978
	 * initialize attribute tuple form
1979
	 *
B
Bruce Momjian 已提交
1980
	 * Unlike the case with the relation tuple, this data had better be right
B
Bruce Momjian 已提交
1981 1982
	 * because it will never be replaced.  The input values must be correctly
	 * defined by macros in src/include/catalog/ headers.
1983
	 */
1984
	relation->rd_att = CreateTemplateTupleDesc(natts, hasoids);
1985 1986
	relation->rd_att->tdrefcount = 1;	/* mark as refcounted */

1987 1988
	relation->rd_att->tdtypeid = relationReltype;
	relation->rd_att->tdtypmod = -1;	/* unnecessary, but... */
1989

1990
	/*
B
Bruce Momjian 已提交
1991
	 * initialize tuple desc info
1992
	 */
1993
	has_not_null = false;
1994 1995
	for (i = 0; i < natts; i++)
	{
1996
		memcpy(relation->rd_att->attrs[i],
1997 1998 1999
			   &attrs[i],
			   ATTRIBUTE_FIXED_PART_SIZE);
		has_not_null |= attrs[i].attnotnull;
2000 2001
		/* make sure attcacheoff is valid */
		relation->rd_att->attrs[i]->attcacheoff = -1;
2002 2003
	}

2004 2005 2006
	/* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
	relation->rd_att->attrs[0]->attcacheoff = 0;

2007 2008 2009 2010 2011 2012 2013 2014 2015
	/* mark not-null status */
	if (has_not_null)
	{
		TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

		constr->has_not_null = true;
		relation->rd_att->constr = constr;
	}

2016
	/*
2017
	 * initialize relation id from info in att array (my, this is ugly)
2018
	 */
2019
	RelationGetRelid(relation) = relation->rd_att->attrs[0]->attrelid;
2020
	relation->rd_rel->relfilenode = RelationGetRelid(relation);
2021

2022
	/*
2023
	 * initialize the relation lock manager information
2024 2025 2026
	 */
	RelationInitLockInfo(relation);		/* see lmgr.c */

2027 2028 2029 2030
	/*
	 * initialize physical addressing information for the relation
	 */
	RelationInitPhysicalAddr(relation);
2031

2032
	/*
B
Bruce Momjian 已提交
2033
	 * initialize the rel-has-index flag, using hardwired knowledge
2034
	 */
2035 2036 2037 2038 2039 2040
	if (IsBootstrapProcessingMode())
	{
		/* In bootstrap mode, we have no indexes */
		relation->rd_rel->relhasindex = false;
	}
	else
2041
	{
2042 2043
		/* Otherwise, all the rels formrdesc is used for have indexes */
		relation->rd_rel->relhasindex = true;
2044 2045
	}

2046
	/*
B
Bruce Momjian 已提交
2047
	 * add new reldesc to relcache
2048
	 */
2049
	RelationCacheInsert(relation);
2050 2051 2052

	/* It's fully valid */
	relation->rd_isvalid = true;
2053 2054 2055
}


2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098
static void
RelationInitAppendOnlyInfo(Relation relation)
{
	Relation	pg_appendonly_rel;
	HeapTuple	tuple;
	MemoryContext oldcontext;
	SysScanDesc scan;
	ScanKeyData skey;

	/*
	 * Check the pg_appendonly relation to be certain the ao table
	 * is there.
	 */
	pg_appendonly_rel = heap_open(AppendOnlyRelationId, AccessShareLock);

	ScanKeyInit(&skey,
				Anum_pg_appendonly_relid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
	/* FIXME: isn't there a mode in relcache code to *not* use an index? Should
	 * we do something here to obey it?
	 */
	scan = systable_beginscan(pg_appendonly_rel, AppendOnlyRelidIndexId, true,
							  SnapshotNow, 1, &skey);

	tuple = systable_getnext(scan);
	if (!tuple)
		elog(ERROR, "could not find pg_appendonly tuple for relation \"%s\"",
			 RelationGetRelationName(relation));

	/*
	 * Make a copy of the pg_appendonly entry for the table.
	 */
	oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_aotuple = heap_copytuple(tuple);
	relation->rd_appendonly = (Form_pg_appendonly) GETSTRUCT(relation->rd_aotuple);
	MemoryContextSwitchTo(oldcontext);
	systable_endscan(scan);
	heap_close(pg_appendonly_rel, AccessShareLock);

}


2099
/* ----------------------------------------------------------------
2100
 *				 Relation Descriptor Lookup Interface
2101 2102 2103
 * ----------------------------------------------------------------
 */

2104
/*
2105
 *		RelationIdGetRelation
2106
 *
2107
 *		Lookup a reldesc by OID; make one if not already in cache.
2108
 *
2109 2110 2111
 *		Returns NULL if no pg_class row could be found for the given relid
 *		(suggesting we are trying to access a just-deleted relation).
 *		Any other error is reported via elog.
2112
 *
2113 2114 2115 2116
 *		NB: caller should already have at least AccessShareLock on the
 *		relation ID, else there are nasty race conditions.
 *
 *		NB: relation ref count is incremented, or set to 1 if new entry.
2117 2118
 *		Caller should eventually decrement count.  (Usually,
 *		that happens by calling RelationClose().)
2119 2120
 */
Relation
2121
RelationIdGetRelation(Oid relationId)
2122
{
2123
	Relation	rd;
2124

2125 2126 2127
	/*
	 * first try to find reldesc in the cache
	 */
2128 2129 2130
	RelationIdCacheLookup(relationId, rd);

	if (RelationIsValid(rd))
2131
	{
2132
		RelationIncrementReferenceCount(rd);
2133
		/* revalidate cache entry if necessary */
2134
		if (!rd->rd_isvalid)
2135 2136 2137 2138 2139 2140 2141
		{
			/*
			 * Indexes only have a limited number of possible schema changes,
			 * and we don't want to use the full-blown procedure because it's
			 * a headache for indexes that reload itself depends on.
			 */
			if (rd->rd_rel->relkind == RELKIND_INDEX)
2142
				RelationReloadIndexInfo(rd);
2143 2144
			else
				RelationClearRelation(rd, true);
2145
		}
2146
		return rd;
2147
	}
2148

2149
	/*
B
Bruce Momjian 已提交
2150 2151
	 * no reldesc in the cache, so have RelationBuildDesc() build one and add
	 * it.
2152
	 */
2153
	rd = RelationBuildDesc(relationId, true);
2154 2155
	if (RelationIsValid(rd))
		RelationIncrementReferenceCount(rd);
2156

2157 2158 2159 2160
	return rd;
}

/* ----------------------------------------------------------------
2161
 *				cache invalidation support routines
2162 2163 2164
 * ----------------------------------------------------------------
 */

2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188
/*
 * RelationIncrementReferenceCount
 *		Increments relation reference count.
 *
 * Note: bootstrap mode has its own weird ideas about relation refcount
 * behavior; we ought to fix it someday, but for now, just disable
 * reference count ownership tracking in bootstrap mode.
 */
void
RelationIncrementReferenceCount(Relation rel)
{
	ResourceOwnerEnlargeRelationRefs(CurrentResourceOwner);
	rel->rd_refcnt += 1;
	if (!IsBootstrapProcessingMode())
		ResourceOwnerRememberRelationRef(CurrentResourceOwner, rel);
}

/*
 * RelationDecrementReferenceCount
 *		Decrements relation reference count.
 */
void
RelationDecrementReferenceCount(Relation rel)
{
2189 2190 2191 2192 2193 2194 2195 2196 2197 2198
	if (rel->rd_refcnt <= 0)
	{
		elog(ERROR,
			 "Relation decrement reference count found relation %u/%u/%u with bad count (reference count %d)",
			 rel->rd_node.spcNode,
			 rel->rd_node.dbNode,
			 rel->rd_node.relNode,
			 rel->rd_refcnt);
	}
	
2199 2200 2201 2202 2203
	rel->rd_refcnt -= 1;
	if (!IsBootstrapProcessingMode())
		ResourceOwnerForgetRelationRef(CurrentResourceOwner, rel);
}

2204
/*
2205 2206
 * RelationClose - close an open relation
 *
2207 2208 2209 2210 2211 2212 2213
 *	Actually, we just decrement the refcount.
 *
 *	NOTE: if compiled with -DRELCACHE_FORCE_RELEASE then relcache entries
 *	will be freed as soon as their refcount goes to zero.  In combination
 *	with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
 *	to catch references to already-released relcache entries.  It slows
 *	things down quite a bit, however.
2214 2215 2216 2217
 */
void
RelationClose(Relation relation)
{
2218 2219
	/* Note: no locking manipulations needed */
	RelationDecrementReferenceCount(relation);
2220 2221

#ifdef RELCACHE_FORCE_RELEASE
2222
	if (RelationHasReferenceCountZero(relation) &&
2223 2224
		relation->rd_createSubid == InvalidSubTransactionId &&
		relation->rd_newRelfilenodeSubid == InvalidSubTransactionId)
2225 2226
		RelationClearRelation(relation, false);
#endif
2227 2228
}

2229
/*
2230
 * RelationReloadIndexInfo - reload minimal information for an open index
2231
 *
2232 2233 2234 2235 2236 2237 2238
 *	This function is used only for indexes.  A relcache inval on an index
 *	can mean that its pg_class or pg_index row changed.  There are only
 *	very limited changes that are allowed to an existing index's schema,
 *	so we can update the relcache entry without a complete rebuild; which
 *	is fortunate because we can't rebuild an index entry that is "nailed"
 *	and/or in active use.  We support full replacement of the pg_class row,
 *	as well as updates of a few simple fields of the pg_index row.
2239
 *
2240
 *	We can't necessarily reread the catalog rows right away; we might be
2241 2242
 *	in a failed transaction when we receive the SI notification.  If so,
 *	RelationClearRelation just marks the entry as invalid by setting
2243
 *	rd_isvalid to false.  This routine is called to fix the entry when it
2244
 *	is next needed.
2245 2246 2247 2248
 *
 *	We assume that at the time we are called, we have at least AccessShareLock
 *	on the target index.  (Note: in the calls from RelationClearRelation,
 *	this is legitimate because we know the rel has positive refcount.)
2249 2250 2251 2252 2253 2254
 *
 *	If the target index is an index on pg_class or pg_index, we'd better have
 *	previously gotten at least AccessShareLock on its underlying catalog,
 *	else we are at risk of deadlock against someone trying to exclusive-lock
 *	the heap and index in that order.  This is ensured in current usage by
 *	only applying this to indexes being opened or having positive refcount.
H
Hiroshi Inoue 已提交
2255 2256
 */
static void
2257
RelationReloadIndexInfo(Relation relation)
H
Hiroshi Inoue 已提交
2258
{
2259
	bool		indexOK;
H
Hiroshi Inoue 已提交
2260
	HeapTuple	pg_class_tuple;
B
Bruce Momjian 已提交
2261
	Form_pg_class relp;
H
Hiroshi Inoue 已提交
2262

2263 2264 2265 2266 2267
	/* Should be called only for invalidated indexes */
	Assert(relation->rd_rel->relkind == RELKIND_INDEX &&
		   !relation->rd_isvalid);
	/* Should be closed at smgr level */
	Assert(relation->rd_smgr == NULL);
B
Bruce Momjian 已提交
2268

2269 2270 2271 2272 2273 2274
	/* Make sure targblock is reset in case rel was truncated */
	relation->rd_targblock = InvalidBlockNumber;
	/* Must free any AM cached data, too */
	if (relation->rd_amcache)
		pfree(relation->rd_amcache);
	relation->rd_amcache = NULL;
2275

2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288
	/*
	 * If it's a shared index, we might be called before backend startup has
	 * finished selecting a database, in which case we have no way to read
	 * pg_class yet.  However, a shared index can never have any significant
	 * schema updates, so it's okay to ignore the invalidation signal.  Just
	 * mark it valid and return without doing anything more.
	 */
	if (relation->rd_rel->relisshared && !criticalRelcachesBuilt)
	{
		relation->rd_isvalid = true;
		return;
	}

2289
	/*
2290 2291
	 * Read the pg_class row
	 *
2292 2293
	 * Don't try to use an indexscan of pg_class_oid_index to reload the info
	 * for pg_class_oid_index ...
2294
	 */
2295
	indexOK = (RelationGetRelid(relation) != ClassOidIndexId);
2296
	pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK, NULL);
H
Hiroshi Inoue 已提交
2297
	if (!HeapTupleIsValid(pg_class_tuple))
2298
		elog(ERROR, "could not find pg_class tuple for index %u",
2299
			 RelationGetRelid(relation));
H
Hiroshi Inoue 已提交
2300
	relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
2301
	memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
2302
	/* Reload reloptions in case they changed */
B
Bruce Momjian 已提交
2303 2304
	if (relation->rd_options)
		pfree(relation->rd_options);
2305 2306
	RelationParseRelOptions(relation, pg_class_tuple);
	/* done with pg_class tuple */
H
Hiroshi Inoue 已提交
2307
	heap_freetuple(pg_class_tuple);
2308 2309
	/* We must recalculate physical address in case it changed */
	RelationInitPhysicalAddr(relation);
2310 2311 2312 2313

	/* Forget gp_relation_node information -- it may have changed. */
	MemSet(&relation->rd_segfile0_relationnodeinfo, 0, sizeof(RelationNodeInfo));

2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330
	/*
	 * For a non-system index, there are fields of the pg_index row that are
	 * allowed to change, so re-read that row and update the relcache entry.
	 * Most of the info derived from pg_index (such as support function lookup
	 * info) cannot change, and indeed the whole point of this routine is to
	 * update the relcache entry without clobbering that data; so wholesale
	 * replacement is not appropriate.
	 */
	if (!IsSystemRelation(relation))
	{
		HeapTuple	tuple;
		Form_pg_index index;

		tuple = SearchSysCache(INDEXRELID,
							   ObjectIdGetDatum(RelationGetRelid(relation)),
							   0, 0, 0);
		if (!HeapTupleIsValid(tuple))
B
Bruce Momjian 已提交
2331 2332
			elog(ERROR, "cache lookup failed for index %u",
				 RelationGetRelid(relation));
2333 2334
		index = (Form_pg_index) GETSTRUCT(tuple);

2335 2336 2337 2338 2339 2340 2341 2342 2343
		/*
		 * Basically, let's just copy all the bool fields.  There are one or
		 * two of these that can't actually change in the current code, but
		 * it's not worth it to track exactly which ones they are.  None of
		 * the array fields are allowed to change, though.
		 */
		relation->rd_index->indisunique = index->indisunique;
		relation->rd_index->indisprimary = index->indisprimary;
		relation->rd_index->indisclustered = index->indisclustered;
2344
		relation->rd_index->indisvalid = index->indisvalid;
2345 2346
		relation->rd_index->indcheckxmin = index->indcheckxmin;
		relation->rd_index->indisready = index->indisready;
2347 2348

		/* Copy xmin too, as that is needed to make sense of indcheckxmin */
2349 2350
		HeapTupleHeaderSetXmin(relation->rd_indextuple->t_data,
							   HeapTupleHeaderGetXmin(tuple->t_data));
2351 2352 2353

		ReleaseSysCache(tuple);
	}
2354

2355
	/* Okay, now it's valid again */
2356
	relation->rd_isvalid = true;
H
Hiroshi Inoue 已提交
2357
}
2358

2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387
/*
 * RelationDestroyRelation
 *
 *	Physically delete a relation cache entry and all subsidiary data.
 *	Caller must already have unhooked the entry from the hash table.
 */
static void
RelationDestroyRelation(Relation relation)
{
	Assert(RelationHasReferenceCountZero(relation));

	/*
	 * Make sure smgr and lower levels close the relation's files, if they
	 * weren't closed already.  (This was probably done by caller, but let's
	 * just be real sure.)
	 */
	RelationCloseSmgr(relation);

	/*
	 * Free all the subsidiary data structures of the relcache entry,
	 * then the entry itself.
	 */
	if (relation->rd_rel)
		pfree(relation->rd_rel);
	/* can't use DecrTupleDescRefCount here */
	Assert(relation->rd_att->tdrefcount > 0);
	if (--relation->rd_att->tdrefcount == 0)
		FreeTupleDesc(relation->rd_att);
	list_free(relation->rd_indexlist);
2388
	bms_free(relation->rd_indexattr);
2389 2390 2391 2392 2393
	FreeTriggerDesc(relation->trigdesc);
	if (relation->rd_options)
		pfree(relation->rd_options);
	if (relation->rd_indextuple)
		pfree(relation->rd_indextuple);
2394 2395
	if (relation->rd_aotuple)
		pfree(relation->rd_aotuple);
2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407
	if (relation->rd_am)
		pfree(relation->rd_am);
	if (relation->rd_indexcxt)
		MemoryContextDelete(relation->rd_indexcxt);
	if (relation->rd_rulescxt)
		MemoryContextDelete(relation->rd_rulescxt);
	if (relation->rd_cdbpolicy)
		pfree(relation->rd_cdbpolicy);

	pfree(relation);
}

2408
/*
2409
 * RelationClearRelation
2410
 *
2411 2412
 *	 Physically blow away a relation cache entry, or reset it and rebuild
 *	 it from scratch (that is, from catalog entries).  The latter path is
2413 2414
 *	 used when we are notified of a change to an open relation (one with
 *	 refcount > 0).
2415
 *
2416 2417 2418 2419 2420 2421
 *	 NB: when rebuilding, we'd better hold some lock on the relation,
 *	 else the catalog data we need to read could be changing under us.
 *	 Also, a rel to be rebuilt had better have refcnt > 0.  This is because
 *	 an sinval reset could happen while we're accessing the catalogs, and
 *	 the rel would get blown away underneath us by RelationCacheInvalidate
 *	 if it has zero refcnt.
2422
 *
2423 2424 2425
 *	 The "rebuild" parameter is redundant in current usage because it has
 *	 to match the relation's refcnt status, but we keep it as a crosscheck
 *	 that we're doing what the caller expects.
2426
 */
2427
static void
2428
RelationClearRelation(Relation relation, bool rebuild)
2429
{
2430 2431 2432 2433 2434 2435 2436
	/*
	 * As per notes above, a rel to be rebuilt MUST have refcnt > 0; while
	 * of course it would be a bad idea to blow away one with nonzero refcnt.
	 */
	Assert(rebuild ?
		   !RelationHasReferenceCountZero(relation) :
		   RelationHasReferenceCountZero(relation));
2437 2438

	/*
2439
	 * Make sure smgr and lower levels close the relation's files, if they
B
Bruce Momjian 已提交
2440 2441 2442 2443
	 * weren't closed already.  If the relation is not getting deleted, the
	 * next smgr access should reopen the files automatically.	This ensures
	 * that the low-level file access state is updated after, say, a vacuum
	 * truncation.
2444
	 */
2445
	RelationCloseSmgr(relation);
2446

2447
	/*
B
Bruce Momjian 已提交
2448 2449 2450
	 * Never, never ever blow away a nailed-in system relation, because we'd
	 * be unable to recover.  However, we must reset rd_targblock, in case we
	 * got called because of a relation cache flush that was triggered by
2451
	 * VACUUM.  Likewise reset the fsm and vm size info.
2452
	 *
2453 2454 2455
	 * If it's a nailed index, then we need to re-read the pg_class row to see
	 * if its relfilenode changed.	We can't necessarily do that here, because
	 * we might be in a failed transaction.  We assume it's okay to do it if
B
Bruce Momjian 已提交
2456 2457 2458
	 * there are open references to the relcache entry (cf notes for
	 * AtEOXact_RelationCache).  Otherwise just mark the entry as possibly
	 * invalid, and it'll be fixed when next opened.
2459 2460
	 */
	if (relation->rd_isnailed)
H
Hiroshi Inoue 已提交
2461
	{
2462
		relation->rd_targblock = InvalidBlockNumber;
2463 2464
		if (relation->rd_rel->relkind == RELKIND_INDEX)
		{
B
Bruce Momjian 已提交
2465
			relation->rd_isvalid = false;		/* needs to be revalidated */
2466
			if (relation->rd_refcnt > 1)
2467
				RelationReloadIndexInfo(relation);
2468
		}
2469
		return;
H
Hiroshi Inoue 已提交
2470
	}
2471

2472 2473 2474 2475
	/*
	 * Even non-system indexes should not be blown away if they are open and
	 * have valid index support information.  This avoids problems with active
	 * use of the index support information.  As with nailed indexes, we
B
Bruce Momjian 已提交
2476
	 * re-read the pg_class row to handle possible physical relocation of the
2477
	 * index, and we check for pg_index updates too.
2478 2479 2480 2481 2482
	 */
	if (relation->rd_rel->relkind == RELKIND_INDEX &&
		relation->rd_refcnt > 0 &&
		relation->rd_indexcxt != NULL)
	{
B
Bruce Momjian 已提交
2483
		relation->rd_isvalid = false;	/* needs to be revalidated */
2484
		RelationReloadIndexInfo(relation);
2485 2486 2487
		return;
	}

2488 2489
	/* Mark it invalid until we've finished rebuild */
	relation->rd_isvalid = false;
2490

2491
	/*
2492
	 * If we're really done with the relcache entry, blow it away. But if
B
Bruce Momjian 已提交
2493 2494 2495
	 * someone is still using it, reconstruct the whole deal without moving
	 * the physical RelationData record (so that the someone's pointer is
	 * still valid).
2496
	 */
2497
	if (!rebuild)
2498
	{
2499 2500 2501 2502 2503
		/* Remove it from the hash table */
		RelationCacheDelete(relation);

		/* And release storage */
		RelationDestroyRelation(relation);
2504 2505 2506
	}
	else
	{
2507
		/*
2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522
		 * Our strategy for rebuilding an open relcache entry is to build
		 * a new entry from scratch, swap its contents with the old entry,
		 * and finally delete the new entry (along with any infrastructure
		 * swapped over from the old entry).  This is to avoid trouble in case
		 * an error causes us to lose control partway through.  The old entry
		 * will still be marked !rd_isvalid, so we'll try to rebuild it again
		 * on next access.  Meanwhile it's not any less valid than it was
		 * before, so any code that might expect to continue accessing it
		 * isn't hurt by the rebuild failure.  (Consider for example a
		 * subtransaction that ALTERs a table and then gets cancelled partway
		 * through the cache entry rebuild.  The outer transaction should
		 * still see the not-modified cache entry as valid.)  The worst
		 * consequence of an error is leaking the necessarily-unreferenced
		 * new entry, and this shouldn't happen often enough for that to be
		 * a big problem.
2523
		 *
2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539
		 * When rebuilding an open relcache entry, we must preserve ref count
		 * and rd_createSubid/rd_newRelfilenodeSubid state.  Also attempt to
		 * preserve the pg_class entry (rd_rel), tupledesc, and rewrite-rule
		 * substructures in place, because various places assume that these
		 * structures won't move while they are working with an open relcache
		 * entry.  (Note: the refcount mechanism for tupledescs might someday
		 * allow us to remove this hack for the tupledesc.)
 		 *
 		 * Note that this process does not touch CurrentResourceOwner; which
 		 * is good because whatever ref counts the entry may have do not
 		 * necessarily belong to that resource owner.
 		 */
		Relation	newrel;
 		Oid			save_relid = RelationGetRelid(relation);
		bool		keep_tupdesc;
		bool		keep_rules;
2540
		bool		keep_pt_info;
2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553

		/* Build temporary entry, but don't link it into hashtable */
		newrel = RelationBuildDesc(save_relid, false);
		if (newrel == NULL)
 		{
 			/* Should only get here if relation was deleted */
			RelationCacheDelete(relation);
			RelationDestroyRelation(relation);
 			elog(ERROR, "relation %u deleted while still in use", save_relid);
 		}
 
		keep_tupdesc = equalTupleDescs(relation->rd_att, newrel->rd_att, true);
		keep_rules = equalRuleLocks(relation->rd_rules, newrel->rd_rules);
2554 2555
		keep_pt_info = (relation->rd_rel->relfilenode ==
						newrel->rd_rel->relfilenode);
2556 2557 2558 2559 2560 2561 2562 2563 2564 2565

		/*
		 * Perform swapping of the relcache entry contents.  Within this
		 * process the old entry is momentarily invalid, so there *must*
		 * be no possibility of CHECK_FOR_INTERRUPTS within this sequence.
		 * Do it in all-in-line code for safety.
		 *
		 * Since the vast majority of fields should be swapped, our method
		 * is to swap the whole structures and then re-swap those few fields
		 * we didn't want swapped.
2566
		 */
2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580
#define SWAPFIELD(fldtype, fldname) \
		do { \
			fldtype _tmp = newrel->fldname; \
			newrel->fldname = relation->fldname; \
			relation->fldname = _tmp; \
		} while (0)

		/* swap all Relation struct fields */
 		{
			RelationData tmpstruct;

			memcpy(&tmpstruct, newrel, sizeof(RelationData));
			memcpy(newrel, relation, sizeof(RelationData));
			memcpy(relation, &tmpstruct, sizeof(RelationData));
2581
		}
2582 2583 2584 2585 2586 2587 2588 2589 2590

		/* rd_smgr must not be swapped, due to back-links from smgr level */
		SWAPFIELD(SMgrRelation, rd_smgr);
		/* rd_refcnt must be preserved */
		SWAPFIELD(int, rd_refcnt);
		/* isnailed shouldn't change */
		Assert(newrel->rd_isnailed == relation->rd_isnailed);
		/* creation sub-XIDs must be preserved */
		SWAPFIELD(SubTransactionId, rd_createSubid);
2591
		SWAPFIELD(SubTransactionId, rd_newRelfilenodeSubid);
2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607
		/* un-swap rd_rel pointers, swap contents instead */
		SWAPFIELD(Form_pg_class, rd_rel);
		/* ... but actually, we don't have to update newrel->rd_rel */
		memcpy(relation->rd_rel, newrel->rd_rel, CLASS_TUPLE_SIZE);
		/* preserve old tupledesc and rules if no logical change */
		if (keep_tupdesc)
			SWAPFIELD(TupleDesc, rd_att);
		if (keep_rules)
 		{
			SWAPFIELD(RuleLock *, rd_rules);
			SWAPFIELD(MemoryContext, rd_rulescxt);
 		}
		/* pgstat_info must be preserved */
		SWAPFIELD(struct PgStat_TableStatus *, pgstat_info);

		/* preserve persistent table information for the relation  */
2608 2609
		if (keep_pt_info)
			SWAPFIELD(struct RelationNodeInfo, rd_segfile0_relationnodeinfo);
2610 2611 2612 2613 2614

#undef SWAPFIELD

		/* And now we can throw away the temporary entry */
		RelationDestroyRelation(newrel);
2615
	}
2616 2617
}

2618
/*
2619 2620 2621 2622 2623
 * RelationFlushRelation
 *
 *	 Rebuild the relation if it is open (refcount > 0), else blow it away.
 */
static void
2624
RelationFlushRelation(Relation relation)
2625
{
2626 2627
	if (relation->rd_createSubid != InvalidSubTransactionId ||
		relation->rd_newRelfilenodeSubid != InvalidSubTransactionId)
2628 2629
	{
		/*
2630 2631
		 * New relcache entries are always rebuilt, not flushed; else we'd
		 * forget the "new" status of the relation, which is a useful
2632
		 * optimization to have.  Ditto for the new-relfilenode status.
2633 2634 2635 2636
		 *
		 * The rel could have zero refcnt here, so temporarily increment
		 * the refcnt to ensure it's safe to rebuild it.  We can assume that
		 * the current transaction has some lock on the rel already.
2637
		 */
2638 2639 2640
		RelationIncrementReferenceCount(relation);
		RelationClearRelation(relation, true);
		RelationDecrementReferenceCount(relation);
2641 2642 2643 2644
	}
	else
	{
		/*
2645
		 * Pre-existing rels can be dropped from the relcache if not open.
2646
		 */
2647
		bool	rebuild = !RelationHasReferenceCountZero(relation);
2648

2649 2650
		RelationClearRelation(relation, rebuild);
	}
2651 2652
}

2653
/*
2654
 * RelationForgetRelation - unconditionally remove a relcache entry
2655
 *
2656 2657
 *		   External interface for destroying a relcache entry when we
 *		   drop the relation.
2658 2659
 */
void
2660
RelationForgetRelation(Oid rid)
2661
{
2662
	Relation	relation;
2663 2664 2665

	RelationIdCacheLookup(rid, relation);

2666 2667 2668 2669
	if (!PointerIsValid(relation))
		return;					/* not in cache, nothing to do */

	if (!RelationHasReferenceCountZero(relation))
2670
		elog(ERROR, "relation %u is still open", rid);
2671 2672 2673

	/* Unconditionally destroy the relcache entry */
	RelationClearRelation(relation, false);
2674 2675
}

2676
/*
2677
 *		RelationCacheInvalidateEntry
2678 2679 2680
 *
 *		This routine is invoked for SI cache flush messages.
 *
2681 2682
 * Any relcache entry matching the relid must be flushed.  (Note: caller has
 * already determined that the relid belongs to our database or is a shared
2683
 * relation.)
2684 2685 2686 2687 2688 2689
 *
 * We used to skip local relations, on the grounds that they could
 * not be targets of cross-backend SI update messages; but it seems
 * safer to process them, so that our *own* SI update messages will
 * have the same effects during CommandCounterIncrement for both
 * local and nonlocal relations.
2690 2691
 */
void
2692
RelationCacheInvalidateEntry(Oid relationId)
2693
{
2694
	Relation	relation;
2695 2696 2697

	RelationIdCacheLookup(relationId, relation);

2698
	if (PointerIsValid(relation))
2699
	{
2700
		relcacheInvalsReceived++;
2701
		RelationFlushRelation(relation);
2702
	}
2703 2704 2705 2706
}

/*
 * RelationCacheInvalidate
2707
 *	 Blow away cached relation descriptors that have zero reference counts,
B
Bruce Momjian 已提交
2708
 *	 and rebuild those with positive reference counts.	Also reset the smgr
2709
 *	 relation cache.
2710
 *
2711
 *	 This is currently used only to recover from SI message buffer overflow,
2712
 *	 so we do not touch new-in-transaction relations; they cannot be targets
2713 2714
 *	 of cross-backend SI updates (and our own updates now go through a
 *	 separate linked list that isn't limited by the SI message buffer size).
2715 2716 2717
 *	 Likewise, we need not discard new-relfilenode-in-transaction hints,
 *	 since any invalidation of those would be a local event.
 *
2718 2719
 *	 We do this in two phases: the first pass deletes deletable items, and
 *	 the second one rebuilds the rebuildable items.  This is essential for
2720
 *	 safety, because hash_seq_search only copes with concurrent deletion of
B
Bruce Momjian 已提交
2721
 *	 the element it is currently visiting.	If a second SI overflow were to
2722 2723 2724 2725
 *	 occur while we are walking the table, resulting in recursive entry to
 *	 this routine, we could crash because the inner invocation blows away
 *	 the entry next to be visited by the outer scan.  But this way is OK,
 *	 because (a) during the first pass we won't process any more SI messages,
2726
 *	 so hash_seq_search will complete safely; (b) during the second pass we
2727
 *	 only hold onto pointers to nondeletable entries.
2728 2729 2730 2731 2732 2733
 *
 *	 The two-phase approach also makes it easy to ensure that we process
 *	 nailed-in-cache indexes before other nondeletable items, and that we
 *	 process pg_class_oid_index first of all.  In scenarios where a nailed
 *	 index has been given a new relfilenode, we have to detect that update
 *	 before the nailed index is used in reloading any other relcache entry.
2734 2735
 */
void
2736
RelationCacheInvalidate(void)
2737
{
2738
	HASH_SEQ_STATUS status;
2739
	RelIdCacheEnt *idhentry;
2740
	Relation	relation;
2741
	List	   *rebuildFirstList = NIL;
B
Bruce Momjian 已提交
2742
	List	   *rebuildList = NIL;
2743
	ListCell   *l;
2744 2745

	/* Phase 1 */
2746
	hash_seq_init(&status, RelationIdCache);
2747

2748
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2749
	{
2750
		relation = idhentry->reldesc;
2751

2752
		/* Must close all smgr references to avoid leaving dangling ptrs */
2753
		RelationCloseSmgr(relation);
2754

2755
		/* Ignore new relations, since they are never SI targets */
2756
		if (relation->rd_createSubid != InvalidSubTransactionId)
2757
			continue;
2758

2759 2760
		relcacheInvalsReceived++;

2761
		if (RelationHasReferenceCountZero(relation))
2762 2763
		{
			/* Delete this entry immediately */
2764
			Assert(!relation->rd_isnailed);
2765 2766 2767 2768
			RelationClearRelation(relation, false);
		}
		else
		{
2769 2770
			/*
			 * Add this entry to list of stuff to rebuild in second pass.
B
Bruce Momjian 已提交
2771 2772
			 * pg_class_oid_index goes on the front of rebuildFirstList, other
			 * nailed indexes on the back, and everything else into
2773 2774 2775 2776 2777
			 * rebuildList (in no particular order).
			 */
			if (relation->rd_isnailed &&
				relation->rd_rel->relkind == RELKIND_INDEX)
			{
2778
				if (RelationGetRelid(relation) == ClassOidIndexId)
2779 2780 2781 2782 2783 2784
					rebuildFirstList = lcons(relation, rebuildFirstList);
				else
					rebuildFirstList = lappend(rebuildFirstList, relation);
			}
			else
				rebuildList = lcons(relation, rebuildList);
2785
		}
2786
	}
2787

2788
	/*
B
Bruce Momjian 已提交
2789 2790 2791
	 * Now zap any remaining smgr cache entries.  This must happen before we
	 * start to rebuild entries, since that may involve catalog fetches which
	 * will re-open catalog files.
2792 2793 2794
	 */
	smgrcloseall();

2795
	/* Phase 2: rebuild the items found to need rebuild in phase 1 */
2796 2797 2798 2799 2800 2801
	foreach(l, rebuildFirstList)
	{
		relation = (Relation) lfirst(l);
		RelationClearRelation(relation, true);
	}
	list_free(rebuildFirstList);
2802
	foreach(l, rebuildList)
2803
	{
2804 2805
		relation = (Relation) lfirst(l);
		RelationClearRelation(relation, true);
2806
	}
2807
	list_free(rebuildList);
2808
}
2809

2810
/*
2811
 * AtEOXact_RelationCache
2812
 *
2813
 *	Clean up the relcache at main-transaction commit or abort.
2814 2815 2816 2817 2818
 *
 * Note: this must be called *before* processing invalidation messages.
 * In the case of abort, we don't want to try to rebuild any invalidated
 * cache entries (since we can't safely do database accesses).  Therefore
 * we must reset refcnts before handling pending invalidations.
2819 2820 2821 2822 2823 2824
 *
 * As of PostgreSQL 8.1, relcache refcnts should get released by the
 * ResourceOwner mechanism.  This routine just does a debugging
 * cross-check that no pins remain.  However, we also need to do special
 * cleanup when the current transaction created any relations or made use
 * of forced index lists.
2825 2826
 */
void
2827
AtEOXact_RelationCache(bool isCommit)
2828
{
2829
	HASH_SEQ_STATUS status;
2830
	RelIdCacheEnt *idhentry;
2831

2832 2833
	/*
	 * To speed up transaction exit, we want to avoid scanning the relcache
B
Bruce Momjian 已提交
2834 2835 2836 2837
	 * unless there is actually something for this routine to do.  Other than
	 * the debug-only Assert checks, most transactions don't create any work
	 * for us to do here, so we keep a static flag that gets set if there is
	 * anything to do.	(Currently, this means either a relation is created in
2838 2839 2840 2841
	 * the current xact, or one is given a new relfilenode, or an index list
	 * is forced.)  For simplicity, the flag remains set till end of top-level
	 * transaction, even though we could clear it at subtransaction end in
	 * some cases.
2842 2843 2844
	 *
	 * MPP-3333: READERS need to *always* scan, otherwise they will not be able
	 * to maintain a coherent view of the storage layer.
2845 2846
	 */
	if (!need_eoxact_work
2847
		&& DistributedTransactionContext != DTX_CONTEXT_QE_READER
2848 2849 2850 2851 2852 2853
#ifdef USE_ASSERT_CHECKING
		&& !assert_enabled
#endif
		)
		return;

2854
	hash_seq_init(&status, RelationIdCache);
2855

2856
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2857
	{
2858
		Relation	relation = idhentry->reldesc;
2859 2860 2861 2862 2863

		/*
		 * The relcache entry's ref count should be back to its normal
		 * not-in-a-transaction state: 0 unless it's nailed in cache.
		 *
B
Bruce Momjian 已提交
2864 2865 2866
		 * In bootstrap mode, this is NOT true, so don't check it --- the
		 * bootstrap code expects relations to stay open across start/commit
		 * transaction calls.  (That seems bogus, but it's not worth fixing.)
2867 2868 2869 2870 2871 2872 2873 2874 2875 2876
		 */
#ifdef USE_ASSERT_CHECKING
		if (!IsBootstrapProcessingMode())
		{
			int			expected_refcnt;

			expected_refcnt = relation->rd_isnailed ? 1 : 0;
			Assert(relation->rd_refcnt == expected_refcnt);
		}
#endif
2877

2878 2879 2880 2881 2882 2883 2884 2885 2886
		/*
		 * QE-readers aren't properly enrolled in transactions, they
		 * just get the snapshot which corresponds -- so here, where
		 * we are maintaining their relcache, we want to just clean
		 * up (almost as if we had aborted). (MPP-3338)
		 */
		if (DistributedTransactionContext == DTX_CONTEXT_QE_ENTRY_DB_SINGLETON ||
			DistributedTransactionContext == DTX_CONTEXT_QE_READER)
		{
2887
			RelationClearRelation(relation, relation->rd_isnailed ? true : false);
2888 2889 2890
			continue;
		}

2891 2892 2893
		/*
		 * Is it a relation created in the current transaction?
		 *
B
Bruce Momjian 已提交
2894 2895 2896 2897 2898 2899
		 * During commit, reset the flag to zero, since we are now out of the
		 * creating transaction.  During abort, simply delete the relcache
		 * entry --- it isn't interesting any longer.  (NOTE: if we have
		 * forgotten the new-ness of a new relation due to a forced cache
		 * flush, the entry will get deleted anyway by shared-cache-inval
		 * processing of the aborted pg_class insertion.)
2900
		 */
2901
		if (relation->rd_createSubid != InvalidSubTransactionId)
2902
		{
2903
			if (isCommit)
2904
				relation->rd_createSubid = InvalidSubTransactionId;
2905 2906
			else
			{
2907 2908 2909 2910 2911
				/*
				 * In abort, delete the error log file before forgetting
				 * this relation.
				 */
				ErrorLogDelete(MyDatabaseId, RelationGetRelid(relation));
2912 2913 2914 2915 2916
				RelationClearRelation(relation, false);
				continue;
			}
		}

2917 2918 2919
		/*
		 * Likewise, reset the hint about the relfilenode being new.
		 */
2920
		relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
2921

2922 2923 2924 2925 2926
		/*
		 * Flush any temporary index list.
		 */
		if (relation->rd_indexvalid == 2)
		{
2927
			list_free(relation->rd_indexlist);
2928
			relation->rd_indexlist = NIL;
2929
			relation->rd_oidindex = InvalidOid;
2930 2931
			relation->rd_indexvalid = 0;
		}
2932
	}
2933

2934 2935
	/* Once done with the transaction, we can reset need_eoxact_work */
	need_eoxact_work = false;
2936
}
2937

2938 2939 2940 2941 2942 2943 2944 2945
/*
 * AtEOSubXact_RelationCache
 *
 *	Clean up the relcache at sub-transaction commit or abort.
 *
 * Note: this must be called *before* processing invalidation messages.
 */
void
2946 2947
AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
						  SubTransactionId parentSubid)
2948 2949 2950 2951
{
	HASH_SEQ_STATUS status;
	RelIdCacheEnt *idhentry;

2952
	/*
2953 2954
	 * Skip the relcache scan if nothing to do --- see notes for
	 * AtEOXact_RelationCache.
2955
	 */
2956 2957
	if (!need_eoxact_work &&
		DistributedTransactionContext != DTX_CONTEXT_QE_READER)
2958 2959
		return;

2960 2961 2962 2963 2964 2965
	hash_seq_init(&status, RelationIdCache);

	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
	{
		Relation	relation = idhentry->reldesc;

2966 2967 2968 2969 2970 2971 2972 2973
		/*
		 * As opposed to AtEOXact_RelationCache, subtransactions
		 * in readers are only caused by internal commands, and
		 * there shouldn't be interaction with global transactions,
		 * (reader gangs commit their transaction independently)
		 * we must not clear the relcache here.
		 */

2974 2975 2976
		/*
		 * Is it a relation created in the current subtransaction?
		 *
2977 2978
		 * During subcommit, mark it as belonging to the parent, instead.
		 * During subabort, simply delete the relcache entry.
2979
		 */
2980
		if (relation->rd_createSubid == mySubid)
2981 2982
		{
			if (isCommit)
2983
				relation->rd_createSubid = parentSubid;
2984
			else if (RelationHasReferenceCountZero(relation))
2985
			{
2986 2987 2988 2989 2990
				/*
				 * In abort, delete the error log file before forgetting
				 * this relation.
				 */
				ErrorLogDelete(MyDatabaseId, RelationGetRelid(relation));
2991

2992 2993 2994
				RelationClearRelation(relation, false);
				continue;
			}
2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008
			else
			{
				/*
				 * Hmm, somewhere there's a (leaked?) reference to the
				 * relation.  We daren't remove the entry for fear of
				 * dereferencing a dangling pointer later.  Bleat, and mark it
				 * as not belonging to the current transaction.  Hopefully
				 * it'll get cleaned up eventually.  This must be just a
				 * WARNING to avoid error-during-error-recovery loops.
				 */
				relation->rd_createSubid = InvalidSubTransactionId;
				elog(WARNING, "cannot remove relcache entry for \"%s\" because it has nonzero refcount",
					 RelationGetRelationName(relation));
			}
3009 3010
		}

3011
		/*
B
Bruce Momjian 已提交
3012 3013
		 * Likewise, update or drop any new-relfilenode-in-subtransaction
		 * hint.
3014
		 */
3015 3016 3017 3018 3019
		if (relation->rd_newRelfilenodeSubid == mySubid)
		{
			if (isCommit)
				relation->rd_newRelfilenodeSubid = parentSubid;
			else
3020
				relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3021
		}
3022 3023 3024 3025 3026 3027 3028 3029

		/*
		 * Flush any temporary index list.
		 */
		if (relation->rd_indexvalid == 2)
		{
			list_free(relation->rd_indexlist);
			relation->rd_indexlist = NIL;
3030
			relation->rd_oidindex = InvalidOid;
3031 3032 3033 3034 3035
			relation->rd_indexvalid = 0;
		}
	}
}

3036 3037 3038 3039
/*
 * RelationCacheMarkNewRelfilenode
 *
 *	Mark the rel as having been given a new relfilenode in the current
B
Bruce Momjian 已提交
3040
 *	(sub) transaction.	This is a hint that can be used to optimize
3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052
 *	later operations on the rel in the same transaction.
 */
void
RelationCacheMarkNewRelfilenode(Relation rel)
{
	/* Mark it... */
	rel->rd_newRelfilenodeSubid = GetCurrentSubTransactionId();
	/* ... and now we have eoxact cleanup work to do */
	need_eoxact_work = true;
}


3053
/*
3054 3055 3056
 *		RelationBuildLocalRelation
 *			Build a relcache entry for an about-to-be-created relation,
 *			and enter it into the relcache.
3057
 */
3058 3059
Relation
RelationBuildLocalRelation(const char *relname,
3060
						   Oid relnamespace,
3061
						   TupleDesc tupDesc,
3062 3063
						   Oid relid,
						   Oid reltablespace,
3064
			               char relkind,            /*CDB*/
3065
						   bool shared_relation)
3066
{
3067
	Relation	rel;
3068
	MemoryContext oldcxt;
3069 3070
	int			natts = tupDesc->natts;
	int			i;
3071
	bool		has_not_null;
3072
	bool		nailit;
H
Heikki Linnakangas 已提交
3073
	Oid			relfilenode;
3074

3075
	AssertArg(natts >= 0);
3076

3077 3078 3079
	/*
	 * check for creation of a rel that must be nailed in cache.
	 *
3080 3081
	 * XXX this list had better match the relations specially handled in
	 * RelationCacheInitializePhase2/3.
3082 3083 3084
	 */
	switch (relid)
	{
3085 3086 3087
		case DatabaseRelationId:
		case AuthIdRelationId:
		case AuthMemRelationId:
3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098
		case RelationRelationId:
		case AttributeRelationId:
		case ProcedureRelationId:
		case TypeRelationId:
			nailit = true;
			break;
		default:
			nailit = false;
			break;
	}

3099 3100
	/*
	 * check that hardwired list of shared rels matches what's in the
B
Bruce Momjian 已提交
3101 3102 3103
	 * bootstrap .bki file.  If you get a failure here during initdb, you
	 * probably need to fix IsSharedRelation() to match whatever you've done
	 * to the set of shared relations.
3104 3105 3106 3107 3108
	 */
	if (shared_relation != IsSharedRelation(relid))
		elog(ERROR, "shared_relation flag for \"%s\" does not match IsSharedRelation(%u)",
			 relname, relid);

3109 3110 3111 3112 3113
	/*
	 * switch to the cache context to create the relcache entry.
	 */
	if (!CacheMemoryContext)
		CreateCacheMemoryContext();
3114

3115 3116
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

3117
	/*
3118
	 * allocate a new relation descriptor and fill in basic state fields.
3119
	 */
3120
	rel = (Relation) palloc0(sizeof(RelationData));
3121

3122 3123 3124
	rel->rd_targblock = InvalidBlockNumber;

	/* make sure relation is marked as having no open file yet */
3125
	rel->rd_smgr = NULL;
3126

3127 3128 3129
	/* mark it nailed if appropriate */
	rel->rd_isnailed = nailit;

3130
	rel->rd_refcnt = nailit ? 1 : 0;
3131

3132
	/* it's being created in this transaction */
3133
	rel->rd_createSubid = GetCurrentSubTransactionId();
3134
	rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3135

3136
	/* must flag that we have rels created in this transaction */
3137
	need_eoxact_work = true;
3138

3139
	/* is it a temporary relation? */
3140
	rel->rd_istemp = isTempOrToastNamespace(relnamespace);
3141

3142 3143 3144
	/* is it a system catalog? */
	rel->rd_issyscat = (strncmp(relname, "pg_", 3) == 0);

3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155
	/*
	 * CDB: On QEs, temp relations must use shared buffer cache so data
	 * will be visible to all segmates.  On QD, sequence objects must
	 * use shared buffer cache so data will be visible to sequence server.
	 */
	if (rel->rd_istemp &&
		relkind != RELKIND_SEQUENCE &&
		Gp_role != GP_ROLE_EXECUTE)
		rel->rd_isLocalBuf = true;
	else
		rel->rd_isLocalBuf = false;
3156

3157
	/*
3158
	 * create a new tuple descriptor from the one passed in.  We do this
B
Bruce Momjian 已提交
3159 3160 3161 3162
	 * partly to copy it into the cache context, and partly because the new
	 * relation can't have any defaults or constraints yet; they have to be
	 * added in later steps, because they require additions to multiple system
	 * catalogs.  We can copy attnotnull constraints here, however.
3163
	 */
3164
	rel->rd_att = CreateTupleDescCopy(tupDesc);
3165
	rel->rd_att->tdrefcount = 1;	/* mark as refcounted */
3166
	has_not_null = false;
3167
	for (i = 0; i < natts; i++)
3168
	{
3169
		rel->rd_att->attrs[i]->attnotnull = tupDesc->attrs[i]->attnotnull;
3170 3171 3172 3173 3174 3175 3176 3177 3178 3179
		has_not_null |= tupDesc->attrs[i]->attnotnull;
	}

	if (has_not_null)
	{
		TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

		constr->has_not_null = true;
		rel->rd_att->constr = constr;
	}
3180 3181 3182 3183

	/*
	 * initialize relation tuple form (caller may add/override data later)
	 */
3184
	rel->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
3185

3186 3187
	namestrcpy(&rel->rd_rel->relname, relname);
	rel->rd_rel->relnamespace = relnamespace;
3188 3189

	rel->rd_rel->relkind = RELKIND_UNCATALOGED;
3190
	rel->rd_rel->relstorage = RELSTORAGE_HEAP;
3191
	rel->rd_rel->relhasoids = rel->rd_att->tdhasoid;
3192 3193
	rel->rd_rel->relnatts = natts;
	rel->rd_rel->reltype = InvalidOid;
3194 3195
	/* needed when bootstrapping: */
	rel->rd_rel->relowner = BOOTSTRAP_SUPERUSERID;
3196

3197 3198 3199 3200 3201 3202 3203
	/*
	 * Create zeroed-out gp_relation_node data.  It will be filled in when the
	 * disk file is created.
	 */
	rel->rd_segfile0_relationnodeinfo.isPresent = false;
	rel->rd_segfile0_relationnodeinfo.tidAllowedToBeZero = false;

3204
	/*
B
Bruce Momjian 已提交
3205
	 * Insert relation physical and logical identifiers (OIDs) into the right
H
Heikki Linnakangas 已提交
3206 3207 3208 3209 3210 3211
	 * places.
	 *
	 * In PostgreSQL, the physical ID (relfilenode) is initially the same
	 * as the logical ID (OID). In GPDB, the table's logical OID is allocated
	 * in the master, and might already be in use as a relfilenode of an
	 * existing relation in a segment.
3212
	 */
3213
	rel->rd_rel->relisshared = shared_relation;
3214 3215 3216 3217 3218 3219

	RelationGetRelid(rel) = relid;

	for (i = 0; i < natts; i++)
		rel->rd_att->attrs[i]->attrelid = relid;

H
Heikki Linnakangas 已提交
3220 3221 3222 3223 3224 3225 3226 3227 3228 3229 3230 3231
	if (Gp_role != GP_ROLE_EXECUTE ||
		CheckNewRelFileNodeIsOk(relid, reltablespace, shared_relation))
	{
		relfilenode = relid;
	}
	else
	{
		/* FIXME: should we pass pg_class here? */
		relfilenode = GetNewRelFileNode(reltablespace, shared_relation, NULL);
	}
	rel->rd_rel->relfilenode = relfilenode;

3232
	rel->rd_rel->reltablespace = reltablespace;
3233

3234
	RelationInitLockInfo(rel);	/* see lmgr.c */
3235

3236 3237
	RelationInitPhysicalAddr(rel);

3238 3239 3240 3241
	/*
	 * Okay to insert into the relcache hash tables.
	 */
	RelationCacheInsert(rel);
3242

3243 3244 3245
	/*
	 * done building relcache entry.
	 */
3246
	MemoryContextSwitchTo(oldcxt);
3247

3248 3249 3250
	/* It's fully valid */
	rel->rd_isvalid = true;

3251 3252 3253 3254 3255
	/*
	 * Caller expects us to pin the returned entry.
	 */
	RelationIncrementReferenceCount(rel);

3256
	return rel;
3257 3258
}

3259
/*
3260
 *		RelationCacheInitialize
3261
 *
3262 3263
 *		This initializes the relation descriptor cache.  At the time
 *		that this is invoked, we can't do database access yet (mainly
3264 3265 3266 3267 3268
 *		because the transaction subsystem is not up); all we are doing
 *		is making an empty cache hashtable.  This must be done before
 *		starting the initialization transaction, because otherwise
 *		AtEOXact_RelationCache would crash if that transaction aborts
 *		before we can get the relcache set up.
3269 3270
 */

3271
#define INITRELCACHESIZE		400
3272 3273

void
3274
RelationCacheInitialize(void)
3275
{
3276 3277
	MemoryContext oldcxt;
	HASHCTL		ctl;
3278

3279
	/*
3280
	 * make sure cache memory context exists
3281
	 */
3282 3283
	if (!CacheMemoryContext)
		CreateCacheMemoryContext();
3284

3285 3286 3287
    /*
	 * switch to cache memory context
	 */
3288
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3289

3290
	/*
3291
	 * create hashtable that indexes the relcache
3292
	 */
3293
	MemSet(&ctl, 0, sizeof(ctl));
3294
	ctl.keysize = sizeof(Oid);
3295
	ctl.entrysize = sizeof(RelIdCacheEnt);
3296
	ctl.hash = oid_hash;
3297 3298
	RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
								  &ctl, HASH_ELEM | HASH_FUNCTION);
3299

3300 3301 3302 3303 3304 3305
	MemoryContextSwitchTo(oldcxt);
}

/*
 *		RelationCacheInitializePhase2
 *
3306 3307 3308 3309 3310 3311 3312
 *		This is called to prepare for access to shared catalogs during startup.
 *		We must at least set up nailed reldescs for pg_database, pg_authid,
 *		and pg_auth_members.  Ideally we'd like to have reldescs for their
 *		indexes, too.  We attempt to load this information from the shared
 *		relcache init file.  If that's missing or broken, just make phony
 *		entries for the catalogs themselves.  RelationCacheInitializePhase3
 *		will clean up as needed.
3313 3314 3315
 */
void
RelationCacheInitializePhase2(void)
3316 3317 3318 3319 3320 3321 3322 3323 3324 3325 3326 3327 3328 3329 3330 3331 3332 3333 3334 3335 3336 3337 3338 3339 3340 3341 3342 3343 3344 3345 3346 3347 3348 3349 3350 3351 3352 3353 3354 3355 3356 3357 3358 3359 3360 3361 3362 3363 3364 3365
{
	MemoryContext oldcxt;

	/*
	 * In bootstrap mode, the shared catalogs aren't there yet anyway, so do
	 * nothing.
	 */
	if (IsBootstrapProcessingMode())
		return;

	/*
	 * switch to cache memory context
	 */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

	/*
	 * Try to load the shared relcache cache file.	If unsuccessful, bootstrap
	 * the cache with pre-made descriptors for the critical shared catalogs.
	 */
	if (!load_relcache_init_file(true))
	{
		formrdesc("pg_database", PG_DATABASE_RELTYPE_OID, true,
				  true, Natts_pg_database, Desc_pg_database);
		formrdesc("pg_authid", PG_AUTHID_RELTYPE_OID, true,
				  true, Natts_pg_authid, Desc_pg_authid);
		formrdesc("pg_auth_members", PG_AUTH_MEMBERS_RELTYPE_OID, true,
				  false, Natts_pg_auth_members, Desc_pg_auth_members);

#define NUM_CRITICAL_SHARED_RELS	3	/* fix if you change list above */
	}

	MemoryContextSwitchTo(oldcxt);
}

/*
 *		RelationCacheInitializePhase3
 *
 *		This is called as soon as the catcache and transaction system
 *		are functional and we have determined MyDatabaseId.  At this point
 *		we can actually read data from the database's system catalogs.
 *		We first try to read pre-computed relcache entries from the local
 *		relcache init file.  If that's missing or broken, make phony entries
 *		for the minimum set of nailed-in-cache relations.  Then (unless
 *		bootstrapping) make sure we have entries for the critical system
 *		indexes.  Once we've done all this, we have enough infrastructure to
 *		open any system catalog or use any catcache.  The last step is to
 *		rewrite the cache files if needed.
 */
void
RelationCacheInitializePhase3(void)
3366 3367 3368 3369
{
	HASH_SEQ_STATUS status;
	RelIdCacheEnt *idhentry;
	MemoryContext oldcxt;
3370
	bool		needNewCacheFile = !criticalSharedRelcachesBuilt;
3371

3372
	/*
3373 3374 3375 3376 3377
	 * switch to cache memory context
	 */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

	/*
3378 3379
	 * Try to load the local relcache cache file.  If unsuccessful, bootstrap
	 * the cache with pre-made descriptors for the critical "nailed-in" system
3380
	 * catalogs.
3381
	 */
3382
	if (IsBootstrapProcessingMode() ||
3383
		!load_relcache_init_file(false))
3384
	{
3385 3386
		needNewCacheFile = true;

3387
		formrdesc("pg_class", PG_CLASS_RELTYPE_OID, false,
3388
				  true, Natts_pg_class, Desc_pg_class);
3389
		formrdesc("pg_attribute", PG_ATTRIBUTE_RELTYPE_OID, false,
3390
				  false, Natts_pg_attribute, Desc_pg_attribute);
3391
		formrdesc("pg_proc", PG_PROC_RELTYPE_OID, false,
3392
				  true, Natts_pg_proc, Desc_pg_proc);
3393
		formrdesc("pg_type", PG_TYPE_RELTYPE_OID, false,
3394
				  true, Natts_pg_type, Desc_pg_type);
3395

3396
#define NUM_CRITICAL_LOCAL_RELS 4		/* fix if you change list above */
3397
	}
3398 3399

	MemoryContextSwitchTo(oldcxt);
3400

3401
	/* In bootstrap mode, the faked-up formrdesc info is all we'll have */
3402 3403 3404
	if (IsBootstrapProcessingMode())
		return;

3405
	/*
B
Bruce Momjian 已提交
3406
	 * If we didn't get the critical system indexes loaded into relcache, do
3407 3408
	 * so now.	These are critical because the catcache and/or opclass cache
	 * depend on them for fetches done during relcache load.  Thus, we have an
B
Bruce Momjian 已提交
3409 3410 3411 3412 3413 3414
	 * infinite-recursion problem.	We can break the recursion by doing
	 * heapscans instead of indexscans at certain key spots. To avoid hobbling
	 * performance, we only want to do that until we have the critical indexes
	 * loaded into relcache.  Thus, the flag criticalRelcachesBuilt is used to
	 * decide whether to do heapscan or indexscan at the key spots, and we set
	 * it true after we've loaded the critical indexes.
3415
	 *
B
Bruce Momjian 已提交
3416 3417 3418 3419 3420 3421
	 * The critical indexes are marked as "nailed in cache", partly to make it
	 * easy for load_relcache_init_file to count them, but mainly because we
	 * cannot flush and rebuild them once we've set criticalRelcachesBuilt to
	 * true.  (NOTE: perhaps it would be possible to reload them by
	 * temporarily setting criticalRelcachesBuilt to false again.  For now,
	 * though, we just nail 'em in.)
3422 3423 3424 3425
	 *
	 * RewriteRelRulenameIndexId and TriggerRelidNameIndexId are not critical
	 * in the same way as the others, because the critical catalogs don't
	 * (currently) have any rules or triggers, and so these indexes can be
B
Bruce Momjian 已提交
3426
	 * rebuilt without inducing recursion.	However they are used during
3427 3428
	 * relcache load when a rel does have rules or triggers, so we choose to
	 * nail them for performance reasons.
3429
	 */
B
Bruce Momjian 已提交
3430
	if (!criticalRelcachesBuilt)
3431
	{
3432 3433 3434 3435 3436 3437 3438 3439 3440 3441 3442 3443
		load_critical_index(ClassOidIndexId,
							RelationRelationId);
		load_critical_index(AttributeRelidNumIndexId,
							AttributeRelationId);
		load_critical_index(IndexRelidIndexId,
							IndexRelationId);
		load_critical_index(AccessMethodStrategyIndexId,
							AccessMethodOperatorRelationId);
		load_critical_index(OpclassOidIndexId,
							OperatorClassRelationId);
		load_critical_index(AccessMethodProcedureIndexId,
							AccessMethodProcedureRelationId);
3444 3445
		load_critical_index(OperatorOidIndexId,
							OperatorRelationId);
3446 3447 3448 3449 3450
		load_critical_index(RewriteRelRulenameIndexId,
							RewriteRelationId);
		load_critical_index(TriggerRelidNameIndexId,
							TriggerRelationId);

3451
#define NUM_CRITICAL_LOCAL_INDEXES	9	/* fix if you change list above */
3452 3453 3454 3455

		criticalRelcachesBuilt = true;
	}

3456 3457 3458 3459 3460 3461 3462 3463 3464 3465 3466 3467 3468 3469 3470 3471 3472 3473 3474 3475 3476 3477 3478 3479 3480 3481 3482 3483
	/*
	 * Process critical shared indexes too.
	 *
	 * DatabaseNameIndexId isn't critical for relcache loading, but rather for
	 * initial lookup of MyDatabaseId, without which we'll never find any
	 * non-shared catalogs at all.	Autovacuum calls InitPostgres with a
	 * database OID, so it instead depends on DatabaseOidIndexId.  We also
	 * need to nail up some indexes on pg_authid and pg_auth_members for use
	 * during client authentication.
	 */
	if (!criticalSharedRelcachesBuilt)
	{
		load_critical_index(DatabaseNameIndexId,
							DatabaseRelationId);
		load_critical_index(DatabaseOidIndexId,
							DatabaseRelationId);
		load_critical_index(AuthIdRolnameIndexId,
							AuthIdRelationId);
		load_critical_index(AuthIdOidIndexId,
							AuthIdRelationId);
		load_critical_index(AuthMemMemRoleIndexId,
							AuthMemRelationId);

#define NUM_CRITICAL_SHARED_INDEXES 5	/* fix if you change list above */

		criticalSharedRelcachesBuilt = true;
	}

3484
	/*
B
Bruce Momjian 已提交
3485 3486 3487 3488 3489 3490
	 * Now, scan all the relcache entries and update anything that might be
	 * wrong in the results from formrdesc or the relcache cache file. If we
	 * faked up relcache entries using formrdesc, then read the real pg_class
	 * rows and replace the fake entries with them. Also, if any of the
	 * relcache entries have rules or triggers, load that info the hard way
	 * since it isn't recorded in the cache file.
3491
	 *
3492 3493
	 * Whenever we access the catalogs to read data, there is a possibility
	 * of a shared-inval cache flush causing relcache entries to be removed.
3494 3495 3496 3497 3498
	 * Since hash_seq_search only guarantees to still work after the *current*
	 * entry is removed, it's unsafe to continue the hashtable scan afterward.
	 * We handle this by restarting the scan from scratch after each access.
	 * This is theoretically O(N^2), but the number of entries that actually
	 * need to be fixed is small enough that it doesn't matter.
3499
	 */
3500
	hash_seq_init(&status, RelationIdCache);
3501

3502
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3503
	{
3504
		Relation	relation = idhentry->reldesc;
3505 3506 3507 3508 3509 3510
		bool		restart = false;

		/*
		 * Make sure *this* entry doesn't get flushed while we work with it.
		 */
		RelationIncrementReferenceCount(relation);
3511

3512
		/*
3513
		 * If it's a faked-up entry, read the real pg_class tuple.
3514
		 */
3515
		if (relation->rd_rel->relowner == InvalidOid)
3516 3517 3518
		{
			HeapTuple	htup;
			Form_pg_class relp;
B
Bruce Momjian 已提交
3519

3520
			htup = SearchSysCache(RELOID,
3521
							   ObjectIdGetDatum(RelationGetRelid(relation)),
3522 3523
								  0, 0, 0);
			if (!HeapTupleIsValid(htup))
3524 3525
				elog(FATAL, "cache lookup failed for relation %u",
					 RelationGetRelid(relation));
3526
			relp = (Form_pg_class) GETSTRUCT(htup);
B
Bruce Momjian 已提交
3527

3528 3529 3530 3531 3532
			/*
			 * Copy tuple to relation->rd_rel. (See notes in
			 * AllocateRelationDesc())
			 */
			memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
3533

3534 3535 3536 3537 3538
			/* Update rd_options while we have the tuple */
			if (relation->rd_options)
				pfree(relation->rd_options);
			RelationParseRelOptions(relation, htup);

3539
			/*
3540
			 * Check the values in rd_att were set up correctly.  (We cannot
3541 3542 3543
			 * just copy them over now: formrdesc must have set up the
			 * rd_att data correctly to start with, because it may already
			 * have been copied into one or more catcache entries.)
3544
			 */
3545 3546 3547
			Assert(relation->rd_att->tdtypeid == relp->reltype);
			Assert(relation->rd_att->tdtypmod == -1);
			Assert(relation->rd_att->tdhasoid == relp->relhasoids);
3548

3549
			ReleaseSysCache(htup);
3550 3551 3552 3553 3554 3555 3556

			/* relowner had better be OK now, else we'll loop forever */
			if (relation->rd_rel->relowner == InvalidOid)
				elog(ERROR, "invalid relowner in pg_class entry for \"%s\"",
					 RelationGetRelationName(relation));

			restart = true;
3557 3558 3559 3560
		}

		/*
		 * Fix data that isn't saved in relcache cache file.
3561 3562 3563 3564 3565
		 *
		 * relhasrules or relhastriggers could possibly be wrong or out of
		 * date.  If we don't actually find any rules or triggers, clear the
		 * local copy of the flag so that we don't get into an infinite loop
		 * here.  We don't make any attempt to fix the pg_class entry, though.
3566 3567
		 */
		if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
3568
		{
3569
			RelationBuildRuleLock(relation);
3570 3571 3572 3573
			if (relation->rd_rules == NULL)
				relation->rd_rel->relhasrules = false;
			restart = true;
		}
3574
		if (relation->rd_rel->reltriggers > 0 && relation->trigdesc == NULL)
3575
		{
3576
			RelationBuildTriggers(relation);
3577 3578 3579 3580 3581 3582 3583 3584 3585 3586 3587 3588 3589 3590
			if (relation->trigdesc == NULL)
				relation->rd_rel->reltriggers = 0;
			restart = true;
		}

		/* Release hold on the relation */
		RelationDecrementReferenceCount(relation);

		/* Now, restart the hashtable scan if needed */
		if (restart)
		{
			hash_seq_term(&status);
			hash_seq_init(&status, RelationIdCache);
		}
3591
	}
3592

3593
	/*
3594 3595
	 * Lastly, write out new relcache cache files if needed.  We don't bother
	 * to distinguish cases where only one of the two needs an update.
3596
	 */
3597 3598 3599
	if (needNewCacheFile)
	{
		/*
B
Bruce Momjian 已提交
3600 3601 3602
		 * Force all the catcaches to finish initializing and thereby open the
		 * catalogs and indexes they use.  This will preload the relcache with
		 * entries for all the most important system catalogs and indexes, so
3603
		 * that the init files will be most useful for future backends.
3604 3605 3606
		 */
		InitCatalogCachePhase2();

3607 3608 3609 3610 3611 3612
		/* reset initFileRelationIds list; we'll fill it during write */
		initFileRelationIds = NIL;

		/* now write the files */
		write_relcache_init_file(true);
		write_relcache_init_file(false);
3613 3614 3615
	}
}

3616 3617 3618 3619 3620 3621 3622 3623 3624 3625 3626 3627 3628 3629 3630 3631 3632 3633 3634 3635 3636 3637 3638 3639 3640 3641 3642 3643
/*
 * Load one critical system index into the relcache
 *
 * indexoid is the OID of the target index, heapoid is the OID of the catalog
 * it belongs to.
 */
static void
load_critical_index(Oid indexoid, Oid heapoid)
{
	Relation	ird;

	/*
	 * We must lock the underlying catalog before locking the index to avoid
	 * deadlock, since RelationBuildDesc might well need to read the catalog,
	 * and if anyone else is exclusive-locking this catalog and index they'll
	 * be doing it in that order.
	 */
	LockRelationOid(heapoid, AccessShareLock);
	LockRelationOid(indexoid, AccessShareLock);
	ird = RelationBuildDesc(indexoid, true);
	if (ird == NULL)
		elog(PANIC, "could not open critical system index %u", indexoid);
	ird->rd_isnailed = true;
	ird->rd_refcnt = 1;
	UnlockRelationOid(indexoid, AccessShareLock);
	UnlockRelationOid(heapoid, AccessShareLock);
}

3644
/*
3645
 * GetPgClassDescriptor -- get a predefined tuple descriptor for pg_class
3646 3647 3648
 * GetPgIndexDescriptor -- get a predefined tuple descriptor for pg_index
 *
 * We need this kluge because we have to be able to access non-fixed-width
3649 3650 3651 3652 3653 3654
 * fields of pg_class and pg_index before we have the standard catalog caches
 * available.  We use predefined data that's set up in just the same way as
 * the bootstrapped reldescs used by formrdesc().  The resulting tupdesc is
 * not 100% kosher: it does not have the correct rowtype OID in tdtypeid, nor
 * does it have a TupleConstr field.  But it's good enough for the purpose of
 * extracting fields.
3655 3656
 */
static TupleDesc
3657 3658
BuildHardcodedDescriptor(int natts, const FormData_pg_attribute *attrs,
						 bool hasoids)
3659
{
3660
	TupleDesc	result;
3661 3662 3663 3664 3665
	MemoryContext oldcxt;
	int			i;

	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);

3666
	result = CreateTemplateTupleDesc(natts, hasoids);
B
Bruce Momjian 已提交
3667
	result->tdtypeid = RECORDOID;		/* not right, but we don't care */
3668
	result->tdtypmod = -1;
3669

3670
	for (i = 0; i < natts; i++)
3671
	{
3672
		memcpy(result->attrs[i], &attrs[i], ATTRIBUTE_FIXED_PART_SIZE);
3673
		/* make sure attcacheoff is valid */
3674
		result->attrs[i]->attcacheoff = -1;
3675 3676 3677
	}

	/* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
3678
	result->attrs[0]->attcacheoff = 0;
3679 3680 3681 3682 3683

	/* Note: we don't bother to set up a TupleConstr entry */

	MemoryContextSwitchTo(oldcxt);

3684 3685 3686 3687 3688 3689 3690 3691 3692 3693 3694 3695 3696 3697 3698 3699 3700 3701 3702 3703 3704 3705 3706 3707 3708 3709 3710 3711
	return result;
}

static TupleDesc
GetPgClassDescriptor(void)
{
	static TupleDesc pgclassdesc = NULL;

	/* Already done? */
	if (pgclassdesc == NULL)
		pgclassdesc = BuildHardcodedDescriptor(Natts_pg_class,
											   Desc_pg_class,
											   true);

	return pgclassdesc;
}

static TupleDesc
GetPgIndexDescriptor(void)
{
	static TupleDesc pgindexdesc = NULL;

	/* Already done? */
	if (pgindexdesc == NULL)
		pgindexdesc = BuildHardcodedDescriptor(Natts_pg_index,
											   Desc_pg_index,
											   false);

3712 3713 3714
	return pgindexdesc;
}

3715 3716 3717
/*
 * Load any default attribute value definitions for the relation.
 */
3718
static void
3719
AttrDefaultFetch(Relation relation)
3720
{
3721 3722 3723
	AttrDefault *attrdef = relation->rd_att->constr->defval;
	int			ndef = relation->rd_att->constr->num_defval;
	Relation	adrel;
3724 3725
	SysScanDesc adscan;
	ScanKeyData skey;
H
Hiroshi Inoue 已提交
3726
	HeapTuple	htup;
3727
	Datum		val;
3728 3729 3730
	bool		isnull;
	int			found;
	int			i;
3731

3732 3733 3734 3735
	ScanKeyInit(&skey,
				Anum_pg_attrdef_adrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
3736

3737 3738 3739
	adrel = heap_open(AttrDefaultRelationId, AccessShareLock);
	adscan = systable_beginscan(adrel, AttrDefaultIndexId, true,
								SnapshotNow, 1, &skey);
3740
	found = 0;
3741

3742
	while (HeapTupleIsValid(htup = systable_getnext(adscan)))
3743
	{
3744
		Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
3745

3746 3747 3748 3749
		for (i = 0; i < ndef; i++)
		{
			if (adform->adnum != attrdef[i].adnum)
				continue;
3750
			if (attrdef[i].adbin != NULL)
3751
				elog(WARNING, "multiple attrdef records found for attr %s of rel %s",
B
Bruce Momjian 已提交
3752
				NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
3753
					 RelationGetRelationName(relation));
3754 3755
			else
				found++;
3756

3757 3758 3759
			val = fastgetattr(htup,
							  Anum_pg_attrdef_adbin,
							  adrel->rd_att, &isnull);
3760
			if (isnull)
3761
				elog(WARNING, "null adbin for attr %s of rel %s",
B
Bruce Momjian 已提交
3762
				NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
3763
					 RelationGetRelationName(relation));
3764 3765
			else
				attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext,
3766
												   TextDatumGetCString(val));
3767 3768
			break;
		}
3769

3770
		if (i >= ndef)
3771 3772
			elog(WARNING, "unexpected attrdef record found for attr %d of rel %s",
				 adform->adnum, RelationGetRelationName(relation));
3773 3774
	}

3775
	systable_endscan(adscan);
3776
	heap_close(adrel, AccessShareLock);
3777 3778

	if (found != ndef)
3779
		elog(WARNING, "%d attrdef record(s) missing for rel %s",
3780
			 ndef - found, RelationGetRelationName(relation));
3781 3782
}

3783 3784 3785
/*
 * Load any check constraints for the relation.
 */
3786
static void
3787
CheckConstraintFetch(Relation relation)
3788
{
3789 3790
	ConstrCheck *check = relation->rd_att->constr->check;
	int			ncheck = relation->rd_att->constr->num_check;
3791
	Relation	conrel;
3792 3793
	SysScanDesc conscan;
	ScanKeyData skey[1];
H
Hiroshi Inoue 已提交
3794
	HeapTuple	htup;
3795
	Datum		val;
3796
	bool		isnull;
3797
	int			found = 0;
3798

3799 3800 3801 3802
	ScanKeyInit(&skey[0],
				Anum_pg_constraint_conrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
3803

3804 3805 3806
	conrel = heap_open(ConstraintRelationId, AccessShareLock);
	conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true,
								 SnapshotNow, 1, skey);
3807

3808
	while (HeapTupleIsValid(htup = systable_getnext(conscan)))
3809
	{
3810 3811 3812 3813 3814 3815
		Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);

		/* We want check constraints only */
		if (conform->contype != CONSTRAINT_CHECK)
			continue;

3816
		if (found >= ncheck)
3817 3818 3819
			elog(ERROR,
			     "pg_class reports %d constraint record(s) for rel %s, but found extra in pg_constraint",
			     ncheck, RelationGetRelationName(relation));
3820

3821
		check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
B
Bruce Momjian 已提交
3822
												  NameStr(conform->conname));
3823 3824

		/* Grab and test conbin is actually set */
3825
		val = fastgetattr(htup,
3826 3827
						  Anum_pg_constraint_conbin,
						  conrel->rd_att, &isnull);
3828
		if (isnull)
3829
			elog(ERROR, "null conbin for rel %s",
3830
				 RelationGetRelationName(relation));
3831

3832
		check[found].ccbin = MemoryContextStrdup(CacheMemoryContext,
3833
												 TextDatumGetCString(val));
3834 3835 3836
		found++;
	}

3837
	systable_endscan(conscan);
3838
	heap_close(conrel, AccessShareLock);
3839 3840

	if (found != ncheck)
3841 3842 3843
		elog(ERROR,
		     "found %d in pg_constraint, but pg_class reports %d constraint record(s) for rel %s",
		     found, ncheck, RelationGetRelationName(relation));
3844 3845
}

3846 3847 3848 3849 3850 3851 3852 3853 3854 3855 3856 3857 3858 3859 3860

/*
 * RelationGetPartitioningKey -- get GpPolicy struct for distributed relation
 *
 * Returns a copy of the relation's GpPolicy object, palloc'd in
 * the caller's context.  Caller should pfree() it.  If NULL is
 * returned, relation should be accessed locally.
 */
GpPolicy*
RelationGetPartitioningKey(Relation relation)
{
    return GpPolicyCopy(CurrentMemoryContext, relation->rd_cdbpolicy);
}                                       /* RelationGetPartitioningKey */


3861 3862 3863 3864 3865 3866
/*
 * RelationGetIndexList -- get a list of OIDs of indexes on this relation
 *
 * The index list is created only if someone requests it.  We scan pg_index
 * to find relevant indexes, and add the list to the relcache entry so that
 * we won't have to compute it again.  Note that shared cache inval of a
3867
 * relcache entry will delete the old list and set rd_indexvalid to 0,
3868 3869 3870
 * so that we must recompute the index list on next request.  This handles
 * creation or deletion of an index.
 *
3871 3872 3873 3874 3875 3876
 * The returned list is guaranteed to be sorted in order by OID.  This is
 * needed by the executor, since for index types that we obtain exclusive
 * locks on when updating the index, all backends must lock the indexes in
 * the same order or we will get deadlocks (see ExecOpenIndices()).  Any
 * consistent ordering would do, but ordering by OID is easy.
 *
3877 3878
 * Since shared cache inval causes the relcache's copy of the list to go away,
 * we return a copy of the list palloc'd in the caller's context.  The caller
B
Bruce Momjian 已提交
3879
 * may list_free() the returned list after scanning it. This is necessary
3880 3881
 * since the caller will typically be doing syscache lookups on the relevant
 * indexes, and syscache lookup could cause SI messages to be processed!
3882 3883 3884 3885 3886
 *
 * We also update rd_oidindex, which this module treats as effectively part
 * of the index list.  rd_oidindex is valid when rd_indexvalid isn't zero;
 * it is the pg_class OID of a unique index on OID when the relation has one,
 * and InvalidOid if there is no such index.
3887 3888 3889 3890 3891
 */
List *
RelationGetIndexList(Relation relation)
{
	Relation	indrel;
3892 3893
	SysScanDesc indscan;
	ScanKeyData skey;
3894
	HeapTuple	htup;
3895
	List	   *result;
3896
	Oid			oidIndex;
3897 3898 3899
	MemoryContext oldcxt;

	/* Quick exit if we already computed the list. */
3900
	if (relation->rd_indexvalid != 0)
3901
		return list_copy(relation->rd_indexlist);
3902 3903

	/*
B
Bruce Momjian 已提交
3904 3905 3906 3907
	 * We build the list we intend to return (in the caller's context) while
	 * doing the scan.	After successfully completing the scan, we copy that
	 * list into the relcache entry.  This avoids cache-context memory leakage
	 * if we get some sort of error partway through.
3908 3909
	 */
	result = NIL;
3910
	oidIndex = InvalidOid;
B
Bruce Momjian 已提交
3911

3912
	/* Prepare to scan pg_index for entries having indrelid = this rel. */
3913 3914 3915 3916
	ScanKeyInit(&skey,
				Anum_pg_index_indrelid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(RelationGetRelid(relation)));
3917

3918
	indrel = heap_open(IndexRelationId, AccessShareLock);
3919 3920
	indscan = systable_beginscan(indrel, IndexIndrelidIndexId, true,
								 SnapshotNow, 1, &skey);
3921

3922
	while (HeapTupleIsValid(htup = systable_getnext(indscan)))
3923 3924
	{
		Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
3925

3926
		/* Add index's OID to result list in the proper order */
3927
		result = insert_ordered_oid(result, index->indexrelid);
3928 3929

		/* Check to see if it is a unique, non-partial btree index on OID */
3930 3931
		if (IndexIsValid(index) &&
			index->indnatts == 1 &&
3932 3933 3934 3935 3936
			index->indisunique &&
			index->indkey.values[0] == ObjectIdAttributeNumber &&
			index->indclass.values[0] == OID_BTREE_OPS_OID &&
			heap_attisnull(htup, Anum_pg_index_indpred))
			oidIndex = index->indexrelid;
3937 3938
	}

3939
	systable_endscan(indscan);
3940 3941
	heap_close(indrel, AccessShareLock);

3942
	/* Now save a copy of the completed list in the relcache entry. */
3943
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3944
	relation->rd_indexlist = list_copy(result);
3945
	relation->rd_oidindex = oidIndex;
3946
	relation->rd_indexvalid = 1;
3947 3948 3949 3950 3951
	MemoryContextSwitchTo(oldcxt);

	return result;
}

3952 3953 3954 3955 3956 3957 3958 3959 3960 3961 3962 3963
/*
 * insert_ordered_oid
 *		Insert a new Oid into a sorted list of Oids, preserving ordering
 *
 * Building the ordered list this way is O(N^2), but with a pretty small
 * constant, so for the number of entries we expect it will probably be
 * faster than trying to apply qsort().  Most tables don't have very many
 * indexes...
 */
static List *
insert_ordered_oid(List *list, Oid datum)
{
B
Bruce Momjian 已提交
3964
	ListCell   *prev;
3965 3966

	/* Does the datum belong at the front? */
3967 3968
	if (list == NIL || datum < linitial_oid(list))
		return lcons_oid(datum, list);
3969
	/* No, so find the entry it belongs after */
3970
	prev = list_head(list);
3971 3972
	for (;;)
	{
B
Bruce Momjian 已提交
3973
		ListCell   *curr = lnext(prev);
3974

3975
		if (curr == NULL || datum < lfirst_oid(curr))
B
Bruce Momjian 已提交
3976
			break;				/* it belongs after 'prev', before 'curr' */
3977 3978

		prev = curr;
3979
	}
3980 3981
	/* Insert datum into list after 'prev' */
	lappend_cell_oid(list, prev, datum);
3982 3983 3984
	return list;
}

3985 3986 3987 3988
/*
 * RelationSetIndexList -- externally force the index list contents
 *
 * This is used to temporarily override what we think the set of valid
3989 3990
 * indexes is (including the presence or absence of an OID index).
 * The forcing will be valid only until transaction commit or abort.
3991 3992 3993 3994 3995 3996
 *
 * This should only be applied to nailed relations, because in a non-nailed
 * relation the hacked index list could be lost at any time due to SI
 * messages.  In practice it is only used on pg_class (see REINDEX).
 *
 * It is up to the caller to make sure the given list is correctly ordered.
3997 3998 3999 4000 4001 4002 4003
 *
 * We deliberately do not change rd_indexattr here: even when operating
 * with a temporary partial index list, HOT-update decisions must be made
 * correctly with respect to the full index set.  It is up to the caller
 * to ensure that a correct rd_indexattr set has been cached before first
 * calling RelationSetIndexList; else a subsequent inquiry might cause a
 * wrong rd_indexattr set to get computed and cached.
4004 4005
 */
void
4006
RelationSetIndexList(Relation relation, List *indexIds, Oid oidIndex)
4007 4008 4009
{
	MemoryContext oldcxt;

4010
	Assert(relation->rd_isnailed);
4011 4012
	/* Copy the list into the cache context (could fail for lack of mem) */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4013
	indexIds = list_copy(indexIds);
4014 4015
	MemoryContextSwitchTo(oldcxt);
	/* Okay to replace old list */
4016
	list_free(relation->rd_indexlist);
4017
	relation->rd_indexlist = indexIds;
4018
	relation->rd_oidindex = oidIndex;
B
Bruce Momjian 已提交
4019
	relation->rd_indexvalid = 2;	/* mark list as forced */
4020
	/* must flag that we have a forced index list */
4021
	need_eoxact_work = true;
4022 4023
}

4024 4025 4026 4027 4028 4029 4030 4031 4032 4033 4034
/*
 * RelationGetOidIndex -- get the pg_class OID of the relation's OID index
 *
 * Returns InvalidOid if there is no such index.
 */
Oid
RelationGetOidIndex(Relation relation)
{
	List	   *ilist;

	/*
B
Bruce Momjian 已提交
4035 4036 4037
	 * If relation doesn't have OIDs at all, caller is probably confused. (We
	 * could just silently return InvalidOid, but it seems better to throw an
	 * assertion.)
4038 4039 4040 4041 4042 4043 4044 4045 4046 4047 4048 4049 4050 4051
	 */
	Assert(relation->rd_rel->relhasoids);

	if (relation->rd_indexvalid == 0)
	{
		/* RelationGetIndexList does the heavy lifting. */
		ilist = RelationGetIndexList(relation);
		list_free(ilist);
		Assert(relation->rd_indexvalid != 0);
	}

	return relation->rd_oidindex;
}

4052 4053 4054 4055 4056 4057 4058 4059 4060 4061 4062 4063 4064 4065 4066 4067 4068 4069 4070 4071 4072 4073 4074 4075 4076 4077 4078 4079
/*
 * RelationGetIndexExpressions -- get the index expressions for an index
 *
 * We cache the result of transforming pg_index.indexprs into a node tree.
 * If the rel is not an index or has no expressional columns, we return NIL.
 * Otherwise, the returned tree is copied into the caller's memory context.
 * (We don't want to return a pointer to the relcache copy, since it could
 * disappear due to relcache invalidation.)
 */
List *
RelationGetIndexExpressions(Relation relation)
{
	List	   *result;
	Datum		exprsDatum;
	bool		isnull;
	char	   *exprsString;
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_indexprs)
		return (List *) copyObject(relation->rd_indexprs);

	/* Quick exit if there is nothing to do. */
	if (relation->rd_indextuple == NULL ||
		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs))
		return NIL;

	/*
B
Bruce Momjian 已提交
4080 4081 4082
	 * We build the tree we intend to return in the caller's context. After
	 * successfully completing the work, we copy it into the relcache entry.
	 * This avoids problems if we get some sort of error partway through.
4083 4084 4085 4086 4087
	 */
	exprsDatum = heap_getattr(relation->rd_indextuple,
							  Anum_pg_index_indexprs,
							  GetPgIndexDescriptor(),
							  &isnull);
4088
	Assert(!isnull);
4089
	exprsString = TextDatumGetCString(exprsDatum);
4090 4091 4092 4093
	result = (List *) stringToNode(exprsString);
	pfree(exprsString);

	/*
4094 4095 4096 4097
	 * Run the expressions through eval_const_expressions. This is not just an
	 * optimization, but is necessary, because the planner will be comparing
	 * them to similarly-processed qual clauses, and may fail to detect valid
	 * matches without this.  We don't bother with canonicalize_qual, however.
4098
	 */
4099
	result = (List *) eval_const_expressions(NULL, (Node *) result);
4100

4101 4102 4103 4104 4105 4106
	/*
	 * Also mark any coercion format fields as "don't care", so that the
	 * planner can match to both explicit and implicit coercions.
	 */
	set_coercionform_dontcare((Node *) result);

4107 4108 4109 4110
	/* May as well fix opfuncids too */
	fix_opfuncids((Node *) result);

	/* Now save a copy of the completed tree in the relcache entry. */
4111
	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
4112 4113 4114 4115 4116 4117 4118 4119 4120
	relation->rd_indexprs = (List *) copyObject(result);
	MemoryContextSwitchTo(oldcxt);

	return result;
}

/*
 * RelationGetIndexPredicate -- get the index predicate for an index
 *
4121 4122
 * We cache the result of transforming pg_index.indpred into an implicit-AND
 * node tree (suitable for ExecQual).
4123 4124 4125 4126 4127 4128 4129 4130 4131 4132 4133 4134 4135 4136 4137 4138 4139 4140 4141 4142 4143 4144 4145 4146
 * If the rel is not an index or has no predicate, we return NIL.
 * Otherwise, the returned tree is copied into the caller's memory context.
 * (We don't want to return a pointer to the relcache copy, since it could
 * disappear due to relcache invalidation.)
 */
List *
RelationGetIndexPredicate(Relation relation)
{
	List	   *result;
	Datum		predDatum;
	bool		isnull;
	char	   *predString;
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_indpred)
		return (List *) copyObject(relation->rd_indpred);

	/* Quick exit if there is nothing to do. */
	if (relation->rd_indextuple == NULL ||
		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indpred))
		return NIL;

	/*
B
Bruce Momjian 已提交
4147 4148 4149
	 * We build the tree we intend to return in the caller's context. After
	 * successfully completing the work, we copy it into the relcache entry.
	 * This avoids problems if we get some sort of error partway through.
4150 4151 4152 4153 4154
	 */
	predDatum = heap_getattr(relation->rd_indextuple,
							 Anum_pg_index_indpred,
							 GetPgIndexDescriptor(),
							 &isnull);
4155
	Assert(!isnull);
4156
	predString = TextDatumGetCString(predDatum);
4157 4158 4159 4160
	result = (List *) stringToNode(predString);
	pfree(predString);

	/*
4161 4162 4163 4164 4165
	 * Run the expression through const-simplification and canonicalization.
	 * This is not just an optimization, but is necessary, because the planner
	 * will be comparing it to similarly-processed qual clauses, and may fail
	 * to detect valid matches without this.  This must match the processing
	 * done to qual clauses in preprocess_expression()!  (We can skip the
B
Bruce Momjian 已提交
4166 4167
	 * stuff involving subqueries, however, since we don't allow any in index
	 * predicates.)
4168
	 */
4169
	result = (List *) eval_const_expressions(NULL, (Node *) result);
4170

4171 4172
	result = (List *) canonicalize_qual((Expr *) result);

4173 4174 4175 4176 4177 4178
	/*
	 * Also mark any coercion format fields as "don't care", so that the
	 * planner can match to both explicit and implicit coercions.
	 */
	set_coercionform_dontcare((Node *) result);

4179 4180 4181
	/* Also convert to implicit-AND format */
	result = make_ands_implicit((Expr *) result);

4182 4183 4184 4185
	/* May as well fix opfuncids too */
	fix_opfuncids((Node *) result);

	/* Now save a copy of the completed tree in the relcache entry. */
4186
	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
4187 4188 4189 4190 4191 4192
	relation->rd_indpred = (List *) copyObject(result);
	MemoryContextSwitchTo(oldcxt);

	return result;
}

4193 4194 4195 4196 4197 4198 4199 4200 4201 4202 4203
/*
 * RelationGetIndexAttrBitmap -- get a bitmap of index attribute numbers
 *
 * The result has a bit set for each attribute used anywhere in the index
 * definitions of all the indexes on this relation.  (This includes not only
 * simple index keys, but attributes used in expressions and partial-index
 * predicates.)
 *
 * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
 * we can include system attributes (e.g., OID) in the bitmap representation.
 *
4204 4205 4206 4207
 * Caller had better hold at least RowExclusiveLock on the target relation
 * to ensure that it has a stable set of indexes.  This also makes it safe
 * (deadlock-free) for us to take locks on the relation's indexes.
 *
4208 4209 4210 4211 4212 4213
 * The returned result is palloc'd in the caller's memory context and should
 * be bms_free'd when not needed anymore.
 */
Bitmapset *
RelationGetIndexAttrBitmap(Relation relation)
{
B
Bruce Momjian 已提交
4214 4215 4216
	Bitmapset  *indexattrs;
	List	   *indexoidlist;
	ListCell   *l;
4217 4218 4219 4220 4221 4222 4223 4224 4225 4226 4227 4228 4229 4230 4231 4232 4233 4234 4235 4236 4237
	MemoryContext oldcxt;

	/* Quick exit if we already computed the result. */
	if (relation->rd_indexattr != NULL)
		return bms_copy(relation->rd_indexattr);

	/* Fast path if definitely no indexes */
	if (!RelationGetForm(relation)->relhasindex)
		return NULL;

	/*
	 * Get cached list of index OIDs
	 */
	indexoidlist = RelationGetIndexList(relation);

	/* Fall out if no indexes (but relhasindex was set) */
	if (indexoidlist == NIL)
		return NULL;

	/*
	 * For each index, add referenced attributes to indexattrs.
4238 4239 4240 4241 4242
	 *
	 * Note: we consider all indexes returned by RelationGetIndexList, even if
	 * they are not indisready or indisvalid.  This is important because an
	 * index for which CREATE INDEX CONCURRENTLY has just started must be
	 * included in HOT-safety decisions (see README.HOT).
4243 4244 4245 4246 4247 4248 4249
	 */
	indexattrs = NULL;
	foreach(l, indexoidlist)
	{
		Oid			indexOid = lfirst_oid(l);
		Relation	indexDesc;
		IndexInfo  *indexInfo;
B
Bruce Momjian 已提交
4250
		int			i;
4251 4252 4253 4254 4255 4256 4257 4258 4259

		indexDesc = index_open(indexOid, AccessShareLock);

		/* Extract index key information from the index's pg_index row */
		indexInfo = BuildIndexInfo(indexDesc);

		/* Collect simple attribute references */
		for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
		{
B
Bruce Momjian 已提交
4260
			int			attrnum = indexInfo->ii_KeyAttrNumbers[i];
4261 4262 4263

			if (attrnum != 0)
				indexattrs = bms_add_member(indexattrs,
B
Bruce Momjian 已提交
4264
							   attrnum - FirstLowInvalidHeapAttributeNumber);
4265 4266 4267 4268 4269 4270 4271 4272 4273 4274 4275 4276 4277 4278 4279 4280 4281 4282 4283 4284 4285 4286
		}

		/* Collect all attributes used in expressions, too */
		pull_varattnos((Node *) indexInfo->ii_Expressions, &indexattrs);

		/* Collect all attributes in the index predicate, too */
		pull_varattnos((Node *) indexInfo->ii_Predicate, &indexattrs);

		index_close(indexDesc, AccessShareLock);
	}

	list_free(indexoidlist);

	/* Now save a copy of the bitmap in the relcache entry. */
	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
	relation->rd_indexattr = bms_copy(indexattrs);
	MemoryContextSwitchTo(oldcxt);

	/* We return our original working copy for caller to play with */
	return indexattrs;
}

4287

4288
/*
4289
 *	load_relcache_init_file, write_relcache_init_file
4290
 *
4291 4292 4293
 *		In late 1992, we started regularly having databases with more than
 *		a thousand classes in them.  With this number of classes, it became
 *		critical to do indexed lookups on the system catalogs.
4294
 *
4295 4296 4297 4298
 *		Bootstrapping these lookups is very hard.  We want to be able to
 *		use an index on pg_attribute, for example, but in order to do so,
 *		we must have read pg_attribute for the attributes in the index,
 *		which implies that we need to use the index.
4299
 *
4300
 *		In order to get around the problem, we do the following:
4301
 *
4302
 *		   +  When the database system is initialized (at initdb time), we
4303
 *			  don't use indexes.  We do sequential scans.
4304
 *
4305 4306 4307
 *		   +  When the backend is started up in normal mode, we load an image
 *			  of the appropriate relation descriptors, in internal format,
 *			  from an initialization file in the data/base/... directory.
4308
 *
4309
 *		   +  If the initialization file isn't there, then we create the
4310
 *			  relation descriptors using sequential scans and write 'em to
4311
 *			  the initialization file for use by subsequent backends.
4312
 *
4313
 *		We could dispense with the initialization files and just build the
4314
 *		critical reldescs the hard way on every backend startup, but that
4315 4316 4317 4318 4319 4320
 *		slows down backend startup noticeably.
 *
 *		We can in fact go further, and save more relcache entries than
 *		just the ones that are absolutely critical; this allows us to speed
 *		up backend startup by not having to build such entries the hard way.
 *		Presently, all the catalog and index entries that are referred to
4321
 *		by catcaches are stored in the initialization files.
4322
 *
T
Tom Lane 已提交
4323 4324
 *		The same mechanism that detects when catcache and relcache entries
 *		need to be invalidated (due to catalog updates) also arranges to
4325 4326
 *		unlink the initialization files when the contents may be out of date.
 *		The files will then be rebuilt during the next backend startup.
4327 4328
 */

4329 4330 4331 4332
/*
 * load_relcache_init_file -- attempt to load cache from the init file
 *
 * If successful, return TRUE and set criticalRelcachesBuilt to true.
4333
 * If not successful, return FALSE.
4334 4335 4336 4337
 *
 * NOTE: we assume we are already switched into CacheMemoryContext.
 */
static bool
4338
load_relcache_init_file(bool shared)
4339
{
4340 4341 4342 4343 4344 4345 4346
	FILE	   *fp;
	char		initfilename[MAXPGPATH];
	Relation   *rels;
	int			relno,
				num_rels,
				max_rels,
				nailed_rels,
4347 4348
				nailed_indexes,
				magic;
4349
	int			i;
4350

4351 4352 4353 4354 4355 4356
	if (shared)
		snprintf(initfilename, sizeof(initfilename), "global/%s",
				 RELCACHE_INIT_FILENAME);
	else
		snprintf(initfilename, sizeof(initfilename), "%s/%s",
				 DatabasePath, RELCACHE_INIT_FILENAME);
4357 4358 4359 4360

	fp = AllocateFile(initfilename, PG_BINARY_R);
	if (fp == NULL)
		return false;
4361

4362
	/*
B
Bruce Momjian 已提交
4363 4364 4365
	 * Read the index relcache entries from the file.  Note we will not enter
	 * any of them into the cache if the read fails partway through; this
	 * helps to guard against broken init files.
4366 4367 4368 4369 4370 4371 4372
	 */
	max_rels = 100;
	rels = (Relation *) palloc(max_rels * sizeof(Relation));
	num_rels = 0;
	nailed_rels = nailed_indexes = 0;
	initFileRelationIds = NIL;

4373 4374 4375 4376 4377 4378
	/* check for correct magic number (compatible version) */
	if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
		goto read_failed;
	if (magic != RELCACHE_INIT_FILEMAGIC)
		goto read_failed;

B
Bruce Momjian 已提交
4379
	for (relno = 0;; relno++)
4380
	{
4381 4382 4383 4384
		Size		len;
		size_t		nread;
		Relation	rel;
		Form_pg_class relform;
4385
		bool		has_not_null;
4386

4387
		/* first read the relation descriptor length */
4388 4389 4390 4391
		if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
		{
			if (nread == 0)
				break;			/* end of file */
4392
			goto read_failed;
4393
		}
4394

4395 4396
		/* safety check for incompatible relcache layout */
		if (len != sizeof(RelationData))
4397
			goto read_failed;
4398

4399 4400 4401 4402 4403 4404
		/* allocate another relcache header */
		if (num_rels >= max_rels)
		{
			max_rels *= 2;
			rels = (Relation *) repalloc(rels, max_rels * sizeof(Relation));
		}
4405

4406
		rel = rels[num_rels++] = (Relation) palloc(len);
4407

4408 4409
		/* then, read the Relation structure */
		if ((nread = fread(rel, 1, len, fp)) != len)
4410
			goto read_failed;
4411 4412

		/* next read the relation tuple form */
4413
		if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
4414
			goto read_failed;
4415 4416

		relform = (Form_pg_class) palloc(len);
4417
		if ((nread = fread(relform, 1, len, fp)) != len)
4418
			goto read_failed;
4419

4420
		rel->rd_rel = relform;
4421 4422

		/* initialize attribute tuple forms */
4423 4424
		rel->rd_att = CreateTemplateTupleDesc(relform->relnatts,
											  relform->relhasoids);
4425 4426
		rel->rd_att->tdrefcount = 1;	/* mark as refcounted */

4427
		rel->rd_att->tdtypeid = relform->reltype;
B
Bruce Momjian 已提交
4428
		rel->rd_att->tdtypmod = -1;		/* unnecessary, but... */
4429 4430

		/* next read all the attribute tuple form data entries */
4431
		has_not_null = false;
4432 4433
		for (i = 0; i < relform->relnatts; i++)
		{
4434
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
4435
				goto read_failed;
4436
			if (len != ATTRIBUTE_FIXED_PART_SIZE)
4437
				goto read_failed;
4438
			if ((nread = fread(rel->rd_att->attrs[i], 1, len, fp)) != len)
4439
				goto read_failed;
4440 4441 4442 4443

			has_not_null |= rel->rd_att->attrs[i]->attnotnull;
		}

B
Bruce Momjian 已提交
4444 4445 4446 4447 4448 4449 4450 4451
		/* next read the access method specific field */
		if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
			goto read_failed;
		if (len > 0)
		{
			rel->rd_options = palloc(len);
			if ((nread = fread(rel->rd_options, 1, len, fp)) != len)
				goto read_failed;
4452
			if (len != VARSIZE(rel->rd_options))
B
Bruce Momjian 已提交
4453
				goto read_failed;		/* sanity check */
B
Bruce Momjian 已提交
4454 4455 4456 4457 4458 4459
		}
		else
		{
			rel->rd_options = NULL;
		}

4460 4461 4462 4463 4464 4465 4466
		/* mark not-null status */
		if (has_not_null)
		{
			TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));

			constr->has_not_null = true;
			rel->rd_att->constr = constr;
4467 4468
		}

4469 4470 4471 4472 4473
		/* If it's an index, there's more to do */
		if (rel->rd_rel->relkind == RELKIND_INDEX)
		{
			Form_pg_am	am;
			MemoryContext indexcxt;
4474 4475
			Oid		   *opfamily;
			Oid		   *opcintype;
4476 4477
			Oid		   *operator;
			RegProcedure *support;
4478
			int			nsupport;
4479
			int16	   *indoption;
4480 4481 4482 4483 4484

			/* Count nailed indexes to ensure we have 'em all */
			if (rel->rd_isnailed)
				nailed_indexes++;

4485
			/* next, read the pg_index tuple */
4486 4487
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;
4488

4489 4490
			rel->rd_indextuple = (HeapTuple) palloc(len);
			if ((nread = fread(rel->rd_indextuple, 1, len, fp)) != len)
4491
				goto read_failed;
4492

4493 4494 4495 4496
			/* Fix up internal pointers in the tuple -- see heap_copytuple */
			rel->rd_indextuple->t_data = (HeapTupleHeader) ((char *) rel->rd_indextuple + HEAPTUPLESIZE);
			rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);

4497 4498 4499
			/* next, read the access method tuple form */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;
4500

4501 4502 4503 4504
			am = (Form_pg_am) palloc(len);
			if ((nread = fread(am, 1, len, fp)) != len)
				goto read_failed;
			rel->rd_am = am;
4505

4506 4507 4508 4509 4510 4511
			/*
			 * prepare index info context --- parameters should match
			 * RelationInitIndexAccessInfo
			 */
			indexcxt = AllocSetContextCreate(CacheMemoryContext,
											 RelationGetRelationName(rel),
4512 4513 4514
											 ALLOCSET_SMALL_MINSIZE,
											 ALLOCSET_SMALL_INITSIZE,
											 ALLOCSET_SMALL_MAXSIZE);
4515 4516
			rel->rd_indexcxt = indexcxt;

4517 4518 4519 4520 4521 4522 4523 4524 4525 4526 4527 4528 4529 4530 4531 4532 4533 4534 4535 4536
			/* next, read the vector of opfamily OIDs */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			opfamily = (Oid *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(opfamily, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_opfamily = opfamily;

			/* next, read the vector of opcintype OIDs */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			opcintype = (Oid *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(opcintype, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_opcintype = opcintype;

4537 4538 4539 4540 4541 4542 4543 4544 4545
			/* next, read the vector of operator OIDs */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			operator = (Oid *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(operator, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_operator = operator;
4546

4547
			/* next, read the vector of support procedures */
4548 4549 4550 4551 4552 4553 4554 4555
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;
			support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(support, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_support = support;

4556 4557 4558 4559 4560 4561 4562 4563 4564 4565
			/* finally, read the vector of indoption values */
			if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
				goto read_failed;

			indoption = (int16 *) MemoryContextAlloc(indexcxt, len);
			if ((nread = fread(indoption, 1, len, fp)) != len)
				goto read_failed;

			rel->rd_indoption = indoption;

4566 4567 4568
			/* set up zeroed fmgr-info vectors */
			rel->rd_aminfo = (RelationAmInfo *)
				MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));
4569 4570
			nsupport = relform->relnatts * am->amsupport;
			rel->rd_supportinfo = (FmgrInfo *)
4571
				MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
4572 4573 4574 4575 4576 4577 4578 4579
		}
		else
		{
			/* Count nailed rels to ensure we have 'em all */
			if (rel->rd_isnailed)
				nailed_rels++;

			Assert(rel->rd_index == NULL);
4580
			Assert(rel->rd_indextuple == NULL);
4581 4582
			Assert(rel->rd_am == NULL);
			Assert(rel->rd_indexcxt == NULL);
4583
			Assert(rel->rd_aminfo == NULL);
4584 4585
			Assert(rel->rd_opfamily == NULL);
			Assert(rel->rd_opcintype == NULL);
4586 4587 4588
			Assert(rel->rd_operator == NULL);
			Assert(rel->rd_support == NULL);
			Assert(rel->rd_supportinfo == NULL);
4589
			Assert(rel->rd_indoption == NULL);
4590 4591 4592 4593
		}

		/*
		 * Rules and triggers are not saved (mainly because the internal
B
Bruce Momjian 已提交
4594
		 * format is complex and subject to change).  They must be rebuilt if
4595
		 * needed by RelationCacheInitializePhase3.  This is not expected to
4596 4597
		 * be a big performance hit since few system catalogs have such. Ditto
		 * for index expressions and predicates.
4598 4599 4600 4601
		 */
		rel->rd_rules = NULL;
		rel->rd_rulescxt = NULL;
		rel->trigdesc = NULL;
4602 4603
		rel->rd_indexprs = NIL;
		rel->rd_indpred = NIL;
4604 4605 4606 4607

		/*
		 * Reset transient-state fields in the relcache entry
		 */
4608
		rel->rd_smgr = NULL;
4609 4610
		rel->rd_targblock = InvalidBlockNumber;
		if (rel->rd_isnailed)
4611
			rel->rd_refcnt = 1;
4612
		else
4613
			rel->rd_refcnt = 0;
4614
		rel->rd_indexvalid = 0;
4615
		rel->rd_indexlist = NIL;
4616
		rel->rd_indexattr = NULL;
4617
		rel->rd_oidindex = InvalidOid;
4618
		rel->rd_createSubid = InvalidSubTransactionId;
4619
		rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
4620
		rel->rd_amcache = NULL;
4621
		MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
4622 4623
        rel->rd_cdbpolicy = NULL;
        rel->rd_cdbDefaultStatsWarningIssued = false;
4624

4625
		/*
4626
		 * Recompute lock and physical addressing info.  This is needed in
B
Bruce Momjian 已提交
4627 4628
		 * case the pg_internal.init file was copied from some other database
		 * by CREATE DATABASE.
4629 4630
		 */
		RelationInitLockInfo(rel);
4631
		RelationInitPhysicalAddr(rel);
4632 4633 4634
	}

	/*
B
Bruce Momjian 已提交
4635 4636 4637
	 * We reached the end of the init file without apparent problem. Did we
	 * get the right number of nailed items?  (This is a useful crosscheck in
	 * case the set of critical rels or indexes changes.)
4638
	 */
4639 4640 4641 4642 4643 4644 4645 4646 4647 4648 4649 4650 4651
	if (shared)
	{
		if (nailed_rels != NUM_CRITICAL_SHARED_RELS ||
			nailed_indexes != NUM_CRITICAL_SHARED_INDEXES)
			goto read_failed;
	}
	else
	{
		if (nailed_rels != NUM_CRITICAL_LOCAL_RELS ||
			nailed_indexes != NUM_CRITICAL_LOCAL_INDEXES)
			goto read_failed;
	}

4652 4653 4654 4655 4656 4657 4658 4659 4660 4661

	/*
	 * OK, all appears well.
	 *
	 * Now insert all the new relcache entries into the cache.
	 */
	for (relno = 0; relno < num_rels; relno++)
	{
		RelationCacheInsert(rels[relno]);
		/* also make a list of their OIDs, for RelationIdIsInInitFile */
4662 4663 4664
		if (!shared)
			initFileRelationIds = lcons_oid(RelationGetRelid(rels[relno]),
											initFileRelationIds);
4665
	}
4666

4667 4668 4669
	pfree(rels);
	FreeFile(fp);

4670 4671 4672 4673
	if (shared)
		criticalSharedRelcachesBuilt = true;
	else
		criticalRelcachesBuilt = true;
4674
	return true;
4675

4676
	/*
B
Bruce Momjian 已提交
4677 4678 4679
	 * init file is broken, so do it the hard way.	We don't bother trying to
	 * free the clutter we just allocated; it's not in the relcache so it
	 * won't hurt.
4680
	 */
4681
read_failed:
4682 4683 4684 4685
	pfree(rels);
	FreeFile(fp);

	return false;
4686 4687
}

4688 4689 4690 4691
/*
 * Write out a new initialization file with the current contents
 * of the relcache.
 */
4692
static void
4693
write_relcache_init_file(bool shared)
4694
{
4695
	FILE	   *fp;
4696 4697
	char		tempfilename[MAXPGPATH];
	char		finalfilename[MAXPGPATH];
4698
	int			magic;
4699
	HASH_SEQ_STATUS status;
4700
	RelIdCacheEnt *idhentry;
4701 4702
	MemoryContext oldcxt;
	int			i;
4703 4704

	/*
4705
	 * We must write a temporary file and rename it into place. Otherwise,
B
Bruce Momjian 已提交
4706 4707
	 * another backend starting at about the same time might crash trying to
	 * read the partially-complete file.
4708
	 */
4709 4710 4711 4712 4713 4714 4715 4716 4717 4718 4719 4720 4721 4722
	if (shared)
	{
		snprintf(tempfilename, sizeof(tempfilename), "global/%s.%d",
				 RELCACHE_INIT_FILENAME, MyProcPid);
		snprintf(finalfilename, sizeof(finalfilename), "global/%s",
				 RELCACHE_INIT_FILENAME);
	}
	else
	{
		snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
				 DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
		snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
				 DatabasePath, RELCACHE_INIT_FILENAME);
	}
4723

4724 4725 4726 4727
	unlink(tempfilename);		/* in case it exists w/wrong permissions */

	fp = AllocateFile(tempfilename, PG_BINARY_W);
	if (fp == NULL)
4728 4729 4730 4731 4732
	{
		/*
		 * We used to consider this a fatal error, but we might as well
		 * continue with backend startup ...
		 */
4733 4734
		ereport(WARNING,
				(errcode_for_file_access(),
4735
				 errmsg("could not create relation-cache initialization file \"%s\": %m",
4736
						tempfilename),
B
Bruce Momjian 已提交
4737
			  errdetail("Continuing anyway, but there's something wrong.")));
4738 4739
		return;
	}
4740

4741
	/*
B
Bruce Momjian 已提交
4742
	 * Write a magic number to serve as a file version identifier.	We can
4743 4744 4745 4746 4747 4748
	 * change the magic number whenever the relcache layout changes.
	 */
	magic = RELCACHE_INIT_FILEMAGIC;
	if (fwrite(&magic, 1, sizeof(magic), fp) != sizeof(magic))
		elog(FATAL, "could not write init file");

4749
	/*
4750
	 * Write all the reldescs (in no particular order).
H
Hiroshi Inoue 已提交
4751
	 */
4752
	hash_seq_init(&status, RelationIdCache);
4753

4754
	initFileRelationIds = NIL;
4755

4756
	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
4757
	{
4758
		Relation	rel = idhentry->reldesc;
4759
		Form_pg_class relform = rel->rd_rel;
4760

4761 4762 4763 4764
		/* ignore if not correct group */
		if (relform->relisshared != shared)
			continue;

B
Bruce Momjian 已提交
4765 4766
		/* first write the relcache entry proper */
		write_item(rel, sizeof(RelationData), fp);
4767 4768

		/* next write the relation tuple form */
B
Bruce Momjian 已提交
4769
		write_item(relform, CLASS_TUPLE_SIZE, fp);
4770 4771 4772 4773

		/* next, do all the attribute tuple form data entries */
		for (i = 0; i < relform->relnatts; i++)
		{
4774
			write_item(rel->rd_att->attrs[i], ATTRIBUTE_FIXED_PART_SIZE, fp);
4775 4776
		}

B
Bruce Momjian 已提交
4777 4778
		/* next, do the access method specific field */
		write_item(rel->rd_options,
4779
				   (rel->rd_options ? VARSIZE(rel->rd_options) : 0),
4780
				   fp);
B
Bruce Momjian 已提交
4781

4782 4783 4784 4785
		/* If it's an index, there's more to do */
		if (rel->rd_rel->relkind == RELKIND_INDEX)
		{
			Form_pg_am	am = rel->rd_am;
4786

4787 4788
			/* write the pg_index tuple */
			/* we assume this was created by heap_copytuple! */
B
Bruce Momjian 已提交
4789
			write_item(rel->rd_indextuple,
4790 4791
					   HEAPTUPLESIZE + rel->rd_indextuple->t_len,
					   fp);
4792 4793

			/* next, write the access method tuple form */
B
Bruce Momjian 已提交
4794
			write_item(am, sizeof(FormData_pg_am), fp);
4795

4796 4797 4798 4799 4800 4801 4802 4803 4804 4805
			/* next, write the vector of opfamily OIDs */
			write_item(rel->rd_opfamily,
					   relform->relnatts * sizeof(Oid),
					   fp);

			/* next, write the vector of opcintype OIDs */
			write_item(rel->rd_opcintype,
					   relform->relnatts * sizeof(Oid),
					   fp);

4806
			/* next, write the vector of operator OIDs */
4807 4808 4809
			write_item(rel->rd_operator,
					   relform->relnatts * (am->amstrategies * sizeof(Oid)),
					   fp);
4810

4811
			/* next, write the vector of support procedures */
4812
			write_item(rel->rd_support,
B
Bruce Momjian 已提交
4813
				  relform->relnatts * (am->amsupport * sizeof(RegProcedure)),
4814
					   fp);
4815 4816 4817 4818 4819

			/* finally, write the vector of indoption values */
			write_item(rel->rd_indoption,
					   relform->relnatts * sizeof(int16),
					   fp);
4820
		}
4821

4822
		/* also make a list of their OIDs, for RelationIdIsInInitFile */
4823 4824 4825 4826 4827 4828 4829
		if (!shared)
		{
			oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
			initFileRelationIds = lcons_oid(RelationGetRelid(rel),
											initFileRelationIds);
			MemoryContextSwitchTo(oldcxt);
		}
4830
	}
4831

4832 4833
	if (FreeFile(fp))
		elog(FATAL, "could not write init file");
4834

4835
	/*
4836
	 * Now we have to check whether the data we've so painstakingly
B
Bruce Momjian 已提交
4837 4838 4839 4840 4841
	 * accumulated is already obsolete due to someone else's just-committed
	 * catalog changes.  If so, we just delete the temp file and leave it to
	 * the next backend to try again.  (Our own relcache entries will be
	 * updated by SI message processing, but we can't be sure whether what we
	 * wrote out was up-to-date.)
4842
	 *
4843 4844
	 * This mustn't run concurrently with the code that unlinks an init file
	 * and sends SI messages, so grab a serialization lock for the duration.
4845
	 */
4846 4847 4848 4849 4850 4851
	LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);

	/* Make sure we have seen all incoming SI messages */
	AcceptInvalidationMessages();

	/*
B
Bruce Momjian 已提交
4852 4853
	 * If we have received any SI relcache invals since backend start, assume
	 * we may have written out-of-date data.
4854 4855
	 */
	if (relcacheInvalsReceived == 0L)
4856 4857
	{
		/*
4858 4859
		 * OK, rename the temp file to its final name, deleting any
		 * previously-existing init file.
4860
		 *
4861 4862 4863 4864
		 * Note: a failure here is possible under Cygwin, if some other
		 * backend is holding open an unlinked-but-not-yet-gone init file. So
		 * treat this as a noncritical failure; just remove the useless temp
		 * file on failure.
4865
		 */
4866 4867
		if (rename(tempfilename, finalfilename) < 0)
			unlink(tempfilename);
4868 4869 4870 4871
	}
	else
	{
		/* Delete the already-obsolete temp file */
4872 4873
		unlink(tempfilename);
	}
4874 4875

	LWLockRelease(RelCacheInitLock);
4876 4877
}

4878 4879 4880 4881 4882 4883 4884 4885 4886 4887
/* write a chunk of data preceded by its length */
static void
write_item(const void *data, Size len, FILE *fp)
{
	if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
		elog(FATAL, "could not write init file");
	if (fwrite(data, 1, len, fp) != len)
		elog(FATAL, "could not write init file");
}

4888 4889 4890 4891 4892 4893 4894 4895 4896 4897 4898 4899
/*
 * Detect whether a given relation (identified by OID) is one of the ones
 * we store in the init file.
 *
 * Note that we effectively assume that all backends running in a database
 * would choose to store the same set of relations in the init file;
 * otherwise there are cases where we'd fail to detect the need for an init
 * file invalidation.  This does not seem likely to be a problem in practice.
 */
bool
RelationIdIsInInitFile(Oid relationId)
{
4900
	return list_member_oid(initFileRelationIds, relationId);
4901 4902 4903 4904 4905
}

/*
 * Invalidate (remove) the init file during commit of a transaction that
 * changed one or more of the relation cache entries that are kept in the
4906
 * local init file.
4907
 *
4908 4909 4910 4911 4912 4913 4914 4915 4916 4917 4918 4919
 * To be safe against concurrent inspection or rewriting of the init file,
 * we must take RelCacheInitLock, then remove the old init file, then send
 * the SI messages that include relcache inval for such relations, and then
 * release RelCacheInitLock.  This serializes the whole affair against
 * write_relcache_init_file, so that we can be sure that any other process
 * that's concurrently trying to create a new init file won't move an
 * already-stale version into place after we unlink.  Also, because we unlink
 * before sending the SI messages, a backend that's currently starting cannot
 * read the now-obsolete init file and then miss the SI messages that will
 * force it to update its relcache entries.  (This works because the backend
 * startup sequence gets into the sinval array before trying to load the init
 * file.)
4920
 *
4921 4922 4923
 * We take the lock and do the unlink in RelationCacheInitFilePreInvalidate,
 * then release the lock in RelationCacheInitFilePostInvalidate.  Caller must
 * send any pending SI messages between those calls.
4924 4925 4926 4927 4928 4929 4930
 *
 * Notice this deals only with the local init file, not the shared init file.
 * The reason is that there can never be a "significant" change to the
 * relcache entry of a shared relation; the most that could happen is
 * updates of noncritical fields such as relpages/reltuples.  So, while
 * it's worth updating the shared init file from time to time, it can never
 * be invalid enough to make it necessary to remove it.
4931 4932
 */
void
4933
RelationCacheInitFilePreInvalidate(void)
4934 4935 4936 4937 4938 4939
{
	char		initfilename[MAXPGPATH];

	snprintf(initfilename, sizeof(initfilename), "%s/%s",
			 DatabasePath, RELCACHE_INIT_FILENAME);

4940 4941 4942
	LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);

	if (unlink(initfilename) < 0)
4943 4944
	{
		/*
4945 4946 4947 4948
		 * The file might not be there if no backend has been started since
		 * the last removal.  But complain about failures other than ENOENT.
		 * Fortunately, it's not too late to abort the transaction if we
		 * can't get rid of the would-be-obsolete init file.
4949
		 */
4950 4951 4952 4953 4954
		if (errno != ENOENT)
			ereport(ERROR,
					(errcode_for_file_access(),
					 errmsg("could not remove cache file \"%s\": %m",
							initfilename)));
4955
	}
4956
}
4957

4958 4959 4960 4961 4962 4963
void
RelationCacheInitFilePostInvalidate(void)
{
	LWLockRelease(RelCacheInitLock);
}

4964
/*
4965 4966 4967 4968 4969 4970 4971
 * Remove the init files during postmaster startup.
 *
 * We used to keep the init files across restarts, but that is unsafe even in simple
 * crash-recovery cases as there are windows for the init files to become out-of-sync
 * with the database. So now we just remove them during startup and expect the
 * first backend launch to rebuild them. Of course, this has to happen in each
 * database of the cluster.
4972 4973
 */
void
4974 4975 4976 4977 4978 4979 4980 4981 4982 4983 4984 4985 4986 4987 4988 4989 4990 4991 4992
RelationCacheInitFileRemove(void)
{
	char		path[MAXPGPATH];

	/*
	 * We zap the shared cache file too.  In theory it can't get out of sync
	 * enough to be a problem, but in data-corruption cases, who knows ...
	 */
	snprintf(path, sizeof(path), "global/%s",
			 RELCACHE_INIT_FILENAME);
	unlink_initfile(path);

	/* Scan everything in the default tablespace */
	RelationCacheInitFileRemoveInDir("base");
}

/* Process one per-tablespace directory for RelationCacheInitFileRemove */
static void
RelationCacheInitFileRemoveInDir(const char *tblspcpath)
4993
{
4994 4995
	DIR		   *dir;
	struct dirent *de;
4996 4997
	char		initfilename[MAXPGPATH];

4998 4999 5000 5001 5002 5003 5004 5005 5006 5007 5008 5009 5010 5011 5012 5013 5014 5015 5016 5017 5018 5019 5020 5021 5022 5023 5024 5025 5026 5027 5028 5029
	/* Scan the tablespace directory to find per-database directories */
	dir = AllocateDir(tblspcpath);
	if (dir == NULL)
	{
		elog(LOG, "could not open tablespace directory \"%s\": %m",
			 tblspcpath);
		return;
	}

	while ((de = ReadDir(dir, tblspcpath)) != NULL)
	{
		if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
		{
			/* Try to remove the init file in each database */
			snprintf(initfilename, sizeof(initfilename), "%s/%s/%s",
					 tblspcpath, de->d_name, RELCACHE_INIT_FILENAME);
			unlink_initfile(initfilename);
		}
	}

	FreeDir(dir);
}

static void
unlink_initfile(const char *initfilename)
{
	if (unlink(initfilename) < 0)
	{
		/* It might not be there, but log any error other than ENOENT */
		if (errno != ENOENT)
			elog(LOG, "could not remove cache file \"%s\": %m", initfilename);
	}
5030
}
5031 5032 5033 5034 5035 5036 5037 5038 5039 5040 5041 5042 5043 5044 5045 5046 5047

void
RelationGetPTInfo(Relation rel,
	ItemPointer persistentTid,
	int64 *persistentSerialNum)
{
	if (! GpPersistent_SkipXLogInfo(rel->rd_node.relNode) &&
		! rel->rd_segfile0_relationnodeinfo.isPresent)
	{
		elog(ERROR,
			 "required Persistent Table information missing for relation %u/%u/%u",
			 rel->rd_node.spcNode, rel->rd_node.dbNode, rel->rd_node.relNode);
	}

	*persistentTid = rel->rd_segfile0_relationnodeinfo.persistentTid;
	*persistentSerialNum = rel->rd_segfile0_relationnodeinfo.persistentSerialNum;
}