vacuum.c 165.3 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * vacuum.c
4 5 6
 *	  The postgres vacuum cleaner.
 *
 * This file includes the "full" version of VACUUM, as well as control code
7
 * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.	See
8 9
 * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
 *
10
 *
11 12
 * Portions Copyright (c) 2005-2010, Greenplum inc
 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
B
Add:  
Bruce Momjian 已提交
13
 * Portions Copyright (c) 1994, Regents of the University of California
14 15 16
 *
 *
 * IDENTIFICATION
17
 *	  $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.364.2.4 2009/12/09 21:58:16 tgl Exp $
18 19 20
 *
 *-------------------------------------------------------------------------
 */
21 22
#include "postgres.h"

23
#include <sys/time.h>
24
#include <unistd.h>
25

26
#include "access/clog.h"
B
Bruce Momjian 已提交
27 28
#include "access/genam.h"
#include "access/heapam.h"
29 30 31
#include "access/appendonlywriter.h"
#include "access/appendonlytid.h"
#include "catalog/heap.h"
32 33
#include "access/transam.h"
#include "access/xact.h"
34
#include "access/xlog.h"
35 36 37 38
#include "access/appendonly_compaction.h"
#include "access/appendonly_visimap.h"
#include "access/aocs_compaction.h"
#include "catalog/catalog.h"
39
#include "catalog/namespace.h"
40
#include "catalog/pg_appendonly_fn.h"
41
#include "catalog/pg_database.h"
42 43 44
#include "catalog/pg_index.h"
#include "catalog/indexing.h"
#include "catalog/pg_namespace.h"
45
#include "commands/dbcommands.h"
46
#include "commands/tablecmds.h"
B
Bruce Momjian 已提交
47
#include "commands/vacuum.h"
48
#include "cdb/cdbdisp_query.h"
49 50 51 52 53 54 55
#include "cdb/cdbpartition.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbsrlz.h"
#include "cdb/cdbdispatchresult.h"      /* CdbDispatchResults */
#include "cdb/cdbfilerepprimary.h"
#include "cdb/cdbpersistentfilesysobj.h"
#include "cdb/cdbappendonlyblockdirectory.h"
56
#include "executor/executor.h"
57 58
#include "lib/stringinfo.h"
#include "libpq/pqformat.h"             /* pq_beginmessage() etc. */
B
Bruce Momjian 已提交
59
#include "miscadmin.h"
60
#include "postmaster/autovacuum.h"
61
#include "storage/freespace.h"
62
#include "storage/proc.h"
63
#include "storage/procarray.h"
64
#include "utils/acl.h"
B
Bruce Momjian 已提交
65
#include "utils/builtins.h"
66
#include "utils/faultinjector.h"
67
#include "utils/flatfiles.h"
68
#include "utils/fmgroids.h"
69
#include "utils/guc.h"
B
Bruce Momjian 已提交
70
#include "utils/inval.h"
71
#include "utils/lsyscache.h"
72
#include "utils/memutils.h"
73
#include "utils/pg_rusage.h"
74
#include "utils/relcache.h"
B
Bruce Momjian 已提交
75
#include "utils/syscache.h"
76
#include "pgstat.h"
77 78 79 80
#include "access/distributedlog.h"
#include "nodes/makefuncs.h"     /* makeRangeVar */
#include "gp-libpq-fe.h"
#include "gp-libpq-int.h"
81

82

83 84 85 86 87
/*
 * GUC parameters
 */
int			vacuum_freeze_min_age;

88 89 90 91
/*
 * VacPage structures keep track of each page on which we find useful
 * amounts of free space.
 */
92 93 94 95 96 97
typedef struct VacPageData
{
	BlockNumber blkno;			/* BlockNumber of this Page */
	Size		free;			/* FreeSpace on this Page */
	uint16		offsets_used;	/* Number of OffNums used by vacuum */
	uint16		offsets_free;	/* Number of OffNums free or to be free */
98
	OffsetNumber offsets[1];	/* Array of free OffNums */
99 100 101 102 103 104
} VacPageData;

typedef VacPageData *VacPage;

typedef struct VacPageListData
{
105
	BlockNumber empty_end_pages;	/* Number of "empty" end-pages */
106
	int			num_pages;		/* Number of pages in pagedesc */
107 108
	int			num_allocated_pages;	/* Number of allocated pages in
										 * pagedesc */
109
	VacPage    *pagedesc;		/* Descriptions of pages */
110 111 112 113
} VacPageListData;

typedef VacPageListData *VacPageList;

114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
/*
 * The "vtlinks" array keeps information about each recently-updated tuple
 * ("recent" meaning its XMAX is too new to let us recycle the tuple).
 * We store the tuple's own TID as well as its t_ctid (its link to the next
 * newer tuple version).  Searching in this array allows us to follow update
 * chains backwards from newer to older tuples.  When we move a member of an
 * update chain, we must move *all* the live members of the chain, so that we
 * can maintain their t_ctid link relationships (we must not just overwrite
 * t_ctid in an existing tuple).
 *
 * Note: because t_ctid links can be stale (this would only occur if a prior
 * VACUUM crashed partway through), it is possible that new_tid points to an
 * empty slot or unrelated tuple.  We have to check the linkage as we follow
 * it, just as is done in EvalPlanQual.
 */
typedef struct VTupleLinkData
{
	ItemPointerData new_tid;	/* t_ctid of an updated tuple */
	ItemPointerData this_tid;	/* t_self of the tuple */
} VTupleLinkData;

typedef VTupleLinkData *VTupleLink;
136

137 138 139 140
/*
 * We use an array of VTupleMoveData to plan a chain tuple move fully
 * before we do it.
 */
141 142 143
typedef struct VTupleMoveData
{
	ItemPointerData tid;		/* tuple ID */
144 145
	VacPage		vacpage;		/* where to move it to */
	bool		cleanVpd;		/* clean vacpage before using? */
146 147 148 149
} VTupleMoveData;

typedef VTupleMoveData *VTupleMove;

150 151 152 153 154 155
/*
 * VRelStats contains the data acquired by scan_heap for use later
 */
typedef struct VRelStats
{
	/* miscellaneous statistics */
156 157 158 159 160
	BlockNumber rel_pages;		/* pages in relation */
	double		rel_tuples;		/* tuples that remain after vacuuming */
	double		rel_indexed_tuples;		/* indexed tuples that remain */
	Size		min_tlen;		/* min surviving tuple size */
	Size		max_tlen;		/* max surviving tuple size */
161 162 163 164 165
	bool		hasindex;
	/* vtlinks array for tuple chain following - sorted by new_tid */
	int			num_vtlinks;
	VTupleLink	vtlinks;
} VRelStats;
166

B
Bruce Momjian 已提交
167 168 169 170 171 172
/*----------------------------------------------------------------------
 * ExecContext:
 *
 * As these variables always appear together, we put them into one struct
 * and pull initialization and cleanup into separate routines.
 * ExecContext is used by repair_frag() and move_xxx_tuple().  More
B
Bruce Momjian 已提交
173
 * accurately:	It is *used* only in move_xxx_tuple(), but because this
B
Bruce Momjian 已提交
174 175 176 177 178 179 180 181 182
 * routine is called many times, we initialize the struct just once in
 * repair_frag() and pass it on to move_xxx_tuple().
 */
typedef struct ExecContextData
{
	ResultRelInfo *resultRelInfo;
	EState	   *estate;
	TupleTableSlot *slot;
} ExecContextData;
183

B
Bruce Momjian 已提交
184 185
typedef ExecContextData *ExecContext;

186 187 188 189 190 191 192 193 194 195 196
/*
 * State information used during the (full)
 * vacuum of indexes on append-only tables
 */
typedef struct AppendOnlyIndexVacuumState
{
	AppendOnlyVisimap visiMap;
	AppendOnlyBlockDirectory blockDirectory;
	AppendOnlyBlockDirectoryEntry blockDirectoryEntry;
} AppendOnlyIndexVacuumState;

B
Bruce Momjian 已提交
197 198 199 200 201 202 203 204 205 206 207 208
static void
ExecContext_Init(ExecContext ec, Relation rel)
{
	TupleDesc	tupdesc = RelationGetDescr(rel);

	/*
	 * We need a ResultRelInfo and an EState so we can use the regular
	 * executor's index-entry-making machinery.
	 */
	ec->estate = CreateExecutorState();

	ec->resultRelInfo = makeNode(ResultRelInfo);
B
Bruce Momjian 已提交
209
	ec->resultRelInfo->ri_RangeTableIndex = 1;	/* dummy */
B
Bruce Momjian 已提交
210
	ec->resultRelInfo->ri_RelationDesc = rel;
B
Bruce Momjian 已提交
211
	ec->resultRelInfo->ri_TrigDesc = NULL;		/* we don't fire triggers */
B
Bruce Momjian 已提交
212 213 214 215 216 217 218

	ExecOpenIndices(ec->resultRelInfo);

	ec->estate->es_result_relations = ec->resultRelInfo;
	ec->estate->es_num_result_relations = 1;
	ec->estate->es_result_relation_info = ec->resultRelInfo;

219 220
	/* Set up a tuple slot too */
	ec->slot = MakeSingleTupleTableSlot(tupdesc);
B
Bruce Momjian 已提交
221 222 223 224 225
}

static void
ExecContext_Finish(ExecContext ec)
{
226
	ExecDropSingleTupleTableSlot(ec->slot);
B
Bruce Momjian 已提交
227 228 229
	ExecCloseIndices(ec->resultRelInfo);
	FreeExecutorState(ec->estate);
}
B
Bruce Momjian 已提交
230

B
Bruce Momjian 已提交
231 232 233 234
/*
 * End of ExecContext Implementation
 *----------------------------------------------------------------------
 */
235

236
/* A few variables that don't seem worth passing around as parameters */
237
static MemoryContext vac_context = NULL;
238

B
Bruce Momjian 已提交
239
static int	elevel = -1;
V
Vadim B. Mikheev 已提交
240

241 242 243
static TransactionId OldestXmin;
static TransactionId FreezeLimit;

244 245 246 247 248 249 250 251 252 253 254
/*
 * For two-step full vacuum, we optimize the second scan by remembering
 * relation stats figured by the first scan.  Since QE runs in a different
 * mpp command/transaction, there is no place to keep this information
 * than global variable.  It is very ugly, but as far as QD runs the
 * right order of operation, it should be ok.
 */
/* we need the max number of aux relation for one base rel. */
#define MaxVacFullInitialStatsSize 8
static VPgClassStats VacFullInitialStats[MaxVacFullInitialStatsSize];
static int VacFullInitialStatsSize = 0;
255

256
static BufferAccessStrategy vac_strategy;
257

258
/* non-export function prototypes */
259
static List *get_rel_oids(List *relids, VacuumStmt *vacstmt, bool isVacuum);
260
static void vac_truncate_clog(TransactionId frozenXID);
261 262 263
static void vacuum_rel(Relation onerel, VacuumStmt *vacstmt, LOCKMODE lmode, List *updated_stats,
		   bool for_wraparound);
static bool full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt, List *updated_stats);
264 265
static void scan_heap_for_truncate(VRelStats *vacrelstats, Relation onerel,
		  VacPageList vacuum_pages);
266
static void scan_heap(VRelStats *vacrelstats, Relation onerel,
267
		  VacPageList vacuum_pages, VacPageList fraged_pages);
268
static bool repair_frag(VRelStats *vacrelstats, Relation onerel,
269
			VacPageList vacuum_pages, VacPageList fraged_pages,
270
						int nindexes, Relation *Irel, List *updated_stats,
271
						int reindex_count);
B
Bruce Momjian 已提交
272
static void move_chain_tuple(Relation rel,
B
Bruce Momjian 已提交
273 274 275
				 Buffer old_buf, Page old_page, HeapTuple old_tup,
				 Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
				 ExecContext ec, ItemPointer ctid, bool cleanVpd);
B
Bruce Momjian 已提交
276
static void move_plain_tuple(Relation rel,
B
Bruce Momjian 已提交
277 278 279
				 Buffer old_buf, Page old_page, HeapTuple old_tup,
				 Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
				 ExecContext ec);
280
static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
281
			VacPageList vacpagelist);
282
static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
283
static void vacuum_index(VacPageList vacpagelist, Relation indrel,
284
						 double num_tuples, int keep_tuples, List *updated_stats,
285
						 bool check_stats);
286 287
static void scan_index(Relation indrel, double num_tuples, List *updated_stats, bool isfull,
			bool check_stats);
288
static bool tid_reaped(ItemPointer itemptr, void *state);
289
static bool appendonly_tid_reaped(ItemPointer itemptr, void *state);
290
static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
291
			   BlockNumber rel_pages);
292
static VacPage copy_vac_page(VacPage vacpage);
B
Bruce Momjian 已提交
293
static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
294
static void *vac_bsearch(const void *key, const void *base,
295 296
			size_t nelem, size_t size,
			int (*compar) (const void *, const void *));
B
Bruce Momjian 已提交
297 298 299
static int	vac_cmp_blk(const void *left, const void *right);
static int	vac_cmp_offno(const void *left, const void *right);
static int	vac_cmp_vtlinks(const void *left, const void *right);
B
Bruce Momjian 已提交
300
static bool enough_space(VacPage vacpage, Size len);
B
Bruce Momjian 已提交
301
static Size PageGetFreeSpaceWithFillFactor(Relation relation, Page page);
302 303 304 305 306
static void dispatchVacuum(VacuumStmt *vacstmt, VacuumStatsContext *ctx);
static Relation open_relation_and_check_permission(VacuumStmt *vacstmt,
												   Oid relid,
												   char expected_relkind,
												   bool forceAccessExclusiveLock);
307 308 309
static void vacuumStatement_Relation(VacuumStmt *vacstmt, Oid relid,
						 List *relations, BufferAccessStrategy bstrategy,
						 bool for_wraparound, bool isTopLevel);
310 311

static void
312 313
vacuum_combine_stats(VacuumStatsContext *stats_context,
					CdbPgResults* cdb_pgresults);
314

315 316
static void vacuum_appendonly_index(Relation indexRelation,
		AppendOnlyIndexVacuumState *vacuumIndexState,
317
		List* updated_stats, double rel_tuple_count, bool isfull);
318 319 320 321 322 323 324

/****************************************************************************
 *																			*
 *			Code common to all flavors of VACUUM and ANALYZE				*
 *																			*
 ****************************************************************************
 */
325

326 327 328 329 330 331 332
/*
 * Primary entry point for VACUUM and ANALYZE commands.
 *
 * relids is normally NIL; if it is not, then it provides the list of
 * relation OIDs to be processed, and vacstmt->relation is ignored.
 * (The non-NIL case is currently only used by autovacuum.)
 *
333 334 335
 * for_wraparound is used by autovacuum to let us know when it's forcing
 * a vacuum for wraparound, which should not be auto-cancelled.
 *
336 337 338
 * bstrategy is normally given as NULL, but in autovacuum it can be passed
 * in to use the same buffer strategy object across multiple vacuum() calls.
 *
339 340
 * isTopLevel should be passed down from ProcessUtility.
 *
341
 * It is the caller's responsibility that vacstmt, relids, and bstrategy
342
 * (if given) be allocated in a memory context that won't disappear
343
 * at transaction commit.
344
 */
345
void
346
vacuum(VacuumStmt *vacstmt, List *relids,
347
	   BufferAccessStrategy bstrategy, bool for_wraparound, bool isTopLevel)
348
{
349 350 351 352 353
	const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
	volatile MemoryContext anl_context = NULL;
	volatile bool all_rels,
				in_outer_xact,
				use_own_xacts;
354 355
	List	   *vacuum_relations;
	List	   *analyze_relations;
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386

	if (vacstmt->vacuum && vacstmt->rootonly)
		ereport(ERROR,
				(errcode(ERRCODE_SYNTAX_ERROR),
				 errmsg("ROOTPARTITION option cannot be used together with VACUUM, try ANALYZE ROOTPARTITION")));

	if (vacstmt->verbose)
		elevel = INFO;
	else
		elevel = DEBUG2;

	if (Gp_role == GP_ROLE_DISPATCH)
		elevel = DEBUG2; /* vacuum messages aren't interesting from the QD */

	/*
	 * We cannot run VACUUM inside a user transaction block; if we were inside
	 * a transaction, then our commit- and start-transaction-command calls
	 * would not have the intended effect! Furthermore, the forced commit that
	 * occurs before truncating the relation's file would have the effect of
	 * committing the rest of the user's transaction too, which would
	 * certainly not be the desired behavior.  (This only applies to VACUUM
	 * FULL, though.  We could in theory run lazy VACUUM inside a transaction
	 * block, but we choose to disallow that case because we'd rather commit
	 * as soon as possible after finishing the vacuum.	This is mainly so that
	 * we can let go the AccessExclusiveLock that we may be holding.)
	 *
	 * ANALYZE (without VACUUM) can run either way.
	 */
	if (vacstmt->vacuum)
	{
		if (Gp_role == GP_ROLE_DISPATCH)
387
			PreventTransactionChain(isTopLevel, stmttype);
388 389 390
		in_outer_xact = false;
	}
	else
391
		in_outer_xact = IsInTransactionChain(isTopLevel);
392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411

	/*
	 * Send info about dead objects to the statistics collector, unless we are
	 * in autovacuum --- autovacuum.c does this for itself.
	 */
	if (vacstmt->vacuum && !IsAutoVacuumWorkerProcess())
		pgstat_vacuum_stat();

	/*
	 * Create special memory context for cross-transaction storage.
	 *
	 * Since it is a child of PortalContext, it will go away eventually even
	 * if we suffer an error; there's no need for special abort cleanup logic.
	 */
	vac_context = AllocSetContextCreate(PortalContext,
										"Vacuum",
										ALLOCSET_DEFAULT_MINSIZE,
										ALLOCSET_DEFAULT_INITSIZE,
										ALLOCSET_DEFAULT_MAXSIZE);

412 413 414 415 416 417 418 419 420 421 422 423 424
	/*
	 * If caller didn't give us a buffer strategy object, make one in the
	 * cross-transaction memory context.
	 */
	if (bstrategy == NULL)
	{
		MemoryContext old_context = MemoryContextSwitchTo(vac_context);

		bstrategy = GetAccessStrategy(BAS_VACUUM);
		MemoryContextSwitchTo(old_context);
	}
	vac_strategy = bstrategy;

425 426
	/* Remember whether we are processing everything in the DB */
	all_rels = (relids == NIL && vacstmt->relation == NULL);
427

428 429 430
	/*
	 * Build list of relations to process, unless caller gave us one. (If we
	 * build one, we put it in vac_context for safekeeping.)
431 432 433 434 435 436
	 * Analyze on midlevel partition is not allowed directly so vacuum_relations
	 * and analyze_relations may be different.
	 * In case of partitioned tables, vacuum_relation will contain all OIDs of the
	 * partitions of a partitioned table. However, analyze_relation will contain all the OIDs
	 * of partition of a partitioned table except midlevel partition unless
	 * GUC optimizer_analyze_midlevel_partition is set to on.
437
	 */
438 439 440 441
	if (vacstmt->vacuum)
		vacuum_relations = get_rel_oids(relids, vacstmt, true /* Requesting relations for VACUUM */);
	if (vacstmt->analyze)
		analyze_relations = get_rel_oids(relids, vacstmt, false /* Requesting relations for ANALYZE */);
442

443 444 445 446 447 448 449 450 451 452 453
	/*
	 * Decide whether we need to start/commit our own transactions.
	 *
	 * For VACUUM (with or without ANALYZE): always do so, so that we can
	 * release locks as soon as possible.  (We could possibly use the outer
	 * transaction for a one-table VACUUM, but handling TOAST tables would be
	 * problematic.)
	 *
	 * For ANALYZE (no VACUUM): if inside a transaction block, we cannot
	 * start/commit our own transactions.  Also, there's no need to do so if
	 * only processing one relation.  For multiple relations when not within a
454 455
	 * transaction block, and also in an autovacuum worker, use own
	 * transactions so we can release locks sooner.
456 457 458 459
	 */
	if (vacstmt->vacuum)
		use_own_xacts = true;
	else
460
	{
461
		Assert(vacstmt->analyze);
462 463 464
		if (IsAutoVacuumWorkerProcess())
			use_own_xacts = true;
		else if (in_outer_xact)
465
			use_own_xacts = false;
466
		else if (list_length(analyze_relations) > 1)
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497
			use_own_xacts = true;
		else
			use_own_xacts = false;
	}

	/*
	 * If we are running ANALYZE without per-table transactions, we'll need a
	 * memory context with table lifetime.
	 */
	if (!use_own_xacts)
		anl_context = AllocSetContextCreate(PortalContext,
											"Analyze",
											ALLOCSET_DEFAULT_MINSIZE,
											ALLOCSET_DEFAULT_INITSIZE,
											ALLOCSET_DEFAULT_MAXSIZE);

	/*
	 * vacuum_rel expects to be entered with no transaction active; it will
	 * start and commit its own transaction.  But we are called by an SQL
	 * command, and so we are executing inside a transaction already. We
	 * commit the transaction started in PostgresMain() here, and start
	 * another one before exiting to match the commit waiting for us back in
	 * PostgresMain().
	 */
	if (use_own_xacts)
	{
		/* matches the StartTransaction in PostgresMain() */
		if (Gp_role != GP_ROLE_EXECUTE)
			CommitTransactionCommand();
	}

498
	/* Turn vacuum cost accounting on or off */
499 500 501 502 503 504 505 506
	PG_TRY();
	{
		ListCell   *cur;

		VacuumCostActive = (VacuumCostDelay > 0);
		VacuumCostBalance = 0;

		if (Gp_role == GP_ROLE_DISPATCH)
507
		{
508 509 510 511
			vacstmt->appendonly_compaction_segno = NIL;
			vacstmt->appendonly_compaction_insert_segno = NIL;
			vacstmt->appendonly_compaction_vacuum_cleanup = false;
			vacstmt->appendonly_relation_empty = false;
512
		}
513

514
		if (vacstmt->vacuum)
515
		{
516 517 518
			/*
			 * Loop to process each selected relation which needs to be vacuumed.
			 */
519 520 521 522 523 524
			foreach(cur, vacuum_relations)
			{
				Oid			relid = lfirst_oid(cur);
				vacuumStatement_Relation(vacstmt, relid, vacuum_relations, bstrategy, for_wraparound, isTopLevel);
			}
		}
525

526 527
		if (vacstmt->analyze)
		{
528 529 530 531 532 533 534 535 536 537 538 539 540
			/*
			 * If there are no partition tables in the database and ANALYZE ROOTPARTITION ALL
			 * is executed report a WARNING as no root partitions are there to be analyzed
			 */
			if (vacstmt->rootonly && NIL == analyze_relations && !vacstmt->relation)
			{
				ereport(NOTICE,
						(errmsg("there are no partitioned tables in database to ANALYZE ROOTPARTITION")));
			}

			/*
			 * Loop to process each selected relation which needs to be analyzed.
			 */
541
			foreach(cur, analyze_relations)
542
			{
543
				Oid			relid = lfirst_oid(cur);
544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561
				MemoryContext old_context = NULL;

				/*
				 * If using separate xacts, start one for analyze. Otherwise,
				 * we can use the outer transaction, but we still need to call
				 * analyze_rel in a memory context that will be cleaned up on
				 * return (else we leak memory while processing multiple
				 * tables).
				 */
				if (use_own_xacts)
				{
					StartTransactionCommand();
					/* functions in indexes may want a snapshot set */
					ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
				}
				else
					old_context = MemoryContextSwitchTo(anl_context);

562
				analyze_rel(relid, vacstmt, vac_strategy);
563 564 565 566 567 568 569 570 571

				if (use_own_xacts)
					CommitTransactionCommand();
				else
				{
					MemoryContextSwitchTo(old_context);
					MemoryContextResetAndDeleteChildren(anl_context);
				}
			}
572
		}
573
	}
574 575 576 577 578 579 580
	PG_CATCH();
	{
		/* Make sure cost accounting is turned off after error */
		VacuumCostActive = false;
		PG_RE_THROW();
	}
	PG_END_TRY();
581

582 583 584 585 586 587 588
	/* Turn off vacuum cost accounting */
	VacuumCostActive = false;

	/*
	 * Finish up processing.
	 */
	if (use_own_xacts)
589
	{
590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612
		/* here, we are not in a transaction */

		/*
		 * This matches the CommitTransaction waiting for us in
		 * PostgresMain().
		 *
		 * MPP-7632 and MPP-7984: if we're in a vacuum analyze we need to
		 * make sure that this transaction we're in has the right
		 * properties
		 */
		if (Gp_role == GP_ROLE_DISPATCH)
		{
			/* Set up the distributed transaction context. */
			setupRegularDtxContext();
		}
		StartTransactionCommand();

		/*
		 * Re-establish the transaction snapshot.  This is wasted effort when
		 * we are called as a normal utility command, because the new
		 * transaction will be dropped immediately by PostgresMain(); but it's
		 * necessary if we are called from autovacuum because autovacuum might
		 * continue on to do an ANALYZE-only call.
613
		 */
614
		ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
615
	}
616

617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643
	if (vacstmt->vacuum && !IsAutoVacuumWorkerProcess())
	{
		/*
		 * Update pg_database.datfrozenxid, and truncate pg_clog if possible.
		 * (autovacuum.c does this for itself.)
		 */
		vac_update_datfrozenxid();

		/*
		 * If it was a database-wide VACUUM, print FSM usage statistics (we
		 * don't make you be superuser to see these).  We suppress this in
		 * autovacuum, too.
		 */
		if (all_rels)
			PrintFreeSpaceMapStatistics(elevel);
	}

	/*
	 * Clean up working storage --- note we must do this after
	 * StartTransactionCommand, else we might be trying to delete the active
	 * context!
	 */
	MemoryContextDelete(vac_context);
	vac_context = NULL;

	if (anl_context)
		MemoryContextDelete(anl_context);
644
}
645

646 647 648 649 650 651 652 653 654 655 656 657 658 659 660
/*
 * Assigns the compaction segment information.
 *
 * The vacuum statement will be modified.
 *
 */
static bool vacuum_assign_compaction_segno(
		Relation onerel,
		List *compactedSegmentFileList,
		List *insertedSegmentFileList,
		VacuumStmt *vacstmt)
{
	List *new_compaction_list;
	List *insert_segno;
	bool is_drop;
T
ARGH!  
Tom Lane 已提交
661

662 663 664 665
	Assert(Gp_role != GP_ROLE_EXECUTE);
	Assert(vacstmt->appendonly_compaction_segno == NIL);
	Assert(vacstmt->appendonly_compaction_insert_segno == NIL);
	Assert (RelationIsValid(onerel));
666

667
	/*
668 669
	 * Assign a compaction segment num and insert segment num
	 * on master or on segment if in utility mode
670
	 */
671
	if (!(RelationIsAoRows(onerel) || RelationIsAoCols(onerel)) || !gp_appendonly_compaction)
672
	{
673
		return true;
674 675
	}

676 677 678 679 680
	if (HasSerializableBackends(false))
	{
		elog(LOG, "Skip compaction because of concurrent serializable transactions");
		return false;
	}
681

682
	new_compaction_list = SetSegnoForCompaction(onerel,
683 684
			compactedSegmentFileList, insertedSegmentFileList, &is_drop);
	if (new_compaction_list)
685
	{
686 687
		if (!is_drop)
		{
688
			insert_segno = lappend_int(NIL, SetSegnoForCompactionInsert(onerel,
689 690 691 692 693 694 695 696 697 698 699 700 701 702 703
				new_compaction_list, compactedSegmentFileList, insertedSegmentFileList));
		}
		else
		{
			/*
			 * If we continue an aborted drop phase, we do not assign a real
			 * insert segment file.
			 */
			insert_segno = list_make1_int(APPENDONLY_COMPACTION_SEGNO_INVALID);
		}

		elogif(Debug_appendonly_print_compaction, LOG,
				"Schedule compaction on AO table: "
				"compact segno list length %d, insert segno length %d",
				list_length(new_compaction_list), list_length(insert_segno));
704
	}
705

706
	if (!new_compaction_list)
707
	{
708 709 710 711 712 713 714 715 716 717 718 719
		elog(DEBUG3, "No valid compact segno for releation %s (%d)",
				RelationGetRelationName(onerel),
				RelationGetRelid(onerel));
		return false;
	}
	else
	{
		vacstmt->appendonly_compaction_insert_segno = insert_segno;
		vacstmt->appendonly_compaction_segno = new_compaction_list;
		return true;
	}
}
720

721 722
bool
vacuumStatement_IsTemporary(Relation onerel)
723 724 725
{
	bool bTemp = false;
	/* MPP-7576: don't track internal namespace tables */
726
	switch (RelationGetNamespace(onerel))
727 728 729 730 731 732 733 734
	{
		case PG_CATALOG_NAMESPACE:
			/* MPP-7773: don't track objects in system namespace
			 * if modifying system tables (eg during upgrade)
			 */
			if (allowSystemTableModsDDL)
				bTemp = true;
			break;
735

736 737 738 739 740 741 742 743
		case PG_TOAST_NAMESPACE:
		case PG_BITMAPINDEX_NAMESPACE:
		case PG_AOSEGMENT_NAMESPACE:
			bTemp = true;
			break;
		default:
			break;
	}
744

745 746 747 748
	/* MPP-7572: Don't track metadata if table in any
	 * temporary namespace
	 */
	if (!bTemp)
749
		bTemp = isAnyTempNamespace(RelationGetNamespace(onerel));
750 751
	return bTemp;
}
752

753 754 755 756 757 758 759 760 761 762 763 764 765 766 767
/*
 * Modify the Vacuum statement to vacuum an individual
 * relation. This ensures that only one relation will be
 * locked for vacuum, when the user issues a "vacuum <db>"
 * command, or a "vacuum <parent_partition_table>"
 * command.
 */
static void
vacuumStatement_AssignRelation(VacuumStmt *vacstmt, Oid relid, List *relations)
{
	if (list_length(relations) > 1 || vacstmt->relation == NULL)
	{
		char	*relname		= get_rel_name(relid);
		char	*namespace_name =
			get_namespace_name(get_rel_namespace(relid));
768

769 770 771 772 773
		if (relname == NULL)
		{
			elog(ERROR, "Relation name does not exist for relation with oid %d", relid);
			return;
		}
774

775 776 777 778 779
		if (namespace_name == NULL)
		{
			elog(ERROR, "Namespace does not exist for relation with oid %d", relid);
			return;
		}
780

781 782 783 784
		/* XXX: dispatch OID than name */
		vacstmt->relation = makeRangeVar(namespace_name, relname, -1);
	}
}
785

786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815
/*
 * Chose a source and destination segfile for compaction.  It assumes that we
 * are in the vacuum memory context, and executing in DISPATCH or UTILITY mode.
 * Return false if we are done with all segfiles.
 */
static bool
vacuumStatement_AssignAppendOnlyCompactionInfo(VacuumStmt *vacstmt,
		Relation onerel,
		List *compactedSegmentFileList,
		List *insertedSegmentFileList,
		bool *getnextrelation)
{
	Assert(Gp_role != GP_ROLE_EXECUTE);
	Assert(vacstmt);
	Assert(getnextrelation);
	Assert(RelationIsAoRows(onerel) || RelationIsAoCols(onerel));

	if (!vacuum_assign_compaction_segno(onerel,
				compactedSegmentFileList,
				insertedSegmentFileList,
				vacstmt))
	{
		/* There is nothing left to do for this relation */
		if (list_length(compactedSegmentFileList) > 0)
		{
			/*
			 * We now need to vacuum the auxility relations of the
			 * append-only relation
			 */
			vacstmt->appendonly_compaction_vacuum_cleanup = true;
816

817 818 819 820 821 822 823 824 825
			/* Provide the list of all compacted segment numbers with it */
			list_free(vacstmt->appendonly_compaction_segno);
			vacstmt->appendonly_compaction_segno = list_copy(compactedSegmentFileList);
			list_free(vacstmt->appendonly_compaction_insert_segno);
			vacstmt->appendonly_compaction_insert_segno = list_copy(insertedSegmentFileList);
		}
		else
		{
			return false;
826
		}
827
	}
828 829 830 831

	if (vacstmt->appendonly_compaction_segno &&
			vacstmt->appendonly_compaction_insert_segno &&
			!vacstmt->appendonly_compaction_vacuum_cleanup)
832
	{
833 834 835 836 837
		/*
		 * as long as there are real segno to compact, we
		 * keep processing this relation.
		 */
		*getnextrelation = false;
838
	}
839 840
	return true;
}
841

842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950
bool
vacuumStatement_IsInAppendOnlyDropPhase(VacuumStmt *vacstmt)
{
	Assert(vacstmt);
	return (vacstmt->appendonly_compaction_segno &&
			!vacstmt->appendonly_compaction_insert_segno &&
			!vacstmt->appendonly_compaction_vacuum_prepare &&
			!vacstmt->appendonly_compaction_vacuum_cleanup);
}

bool
vacuumStatement_IsInAppendOnlyCompactionPhase(VacuumStmt *vacstmt)
{
	Assert(vacstmt);
	return (vacstmt->appendonly_compaction_segno &&
			vacstmt->appendonly_compaction_insert_segno &&
			!vacstmt->appendonly_compaction_vacuum_prepare &&
			!vacstmt->appendonly_compaction_vacuum_cleanup);
}

bool
vacuumStatement_IsInAppendOnlyPseudoCompactionPhase(VacuumStmt *vacstmt)
{
	Assert(vacstmt);
	return (vacstmt->appendonly_compaction_segno &&
			vacstmt->appendonly_compaction_insert_segno &&
			linitial_int(vacstmt->appendonly_compaction_insert_segno)
				== APPENDONLY_COMPACTION_SEGNO_INVALID &&
			!vacstmt->appendonly_compaction_vacuum_prepare &&
			!vacstmt->appendonly_compaction_vacuum_cleanup);
}

bool
vacuumStatement_IsInAppendOnlyPreparePhase(VacuumStmt* vacstmt)
{
	Assert(vacstmt);
	return (vacstmt->appendonly_compaction_vacuum_prepare);
}

bool
vacummStatement_IsInAppendOnlyCleanupPhase(VacuumStmt *vacstmt)
{
	Assert(vacstmt);
	return (vacstmt->appendonly_compaction_vacuum_cleanup);
}

/*
 * Processing of the vacuumStatement for given relid.
 *
 * The function is called by vacuumStatement once for each relation to vacuum.
 * In order to connect QD and QE work for vacuum, we employ a little
 * complicated mechanism here; we separate one relation vacuum process
 * to a separate steps, depending on the type of storage (heap/AO),
 * and perform each step in separate transactions, so that QD can open
 * a distributed transaction and embrace QE work inside it.  As opposed to
 * old postgres code, where one transaction is opened and closed for each
 * auxiliary relation, here a transaction processes them as a set starting
 * from the base relation.  This is the entry point of one base relation,
 * and QD makes some decision what kind of stage we perform, and tells it
 * to QE with vacstmt fields through dispatch.
 *
 * For heap VACUUM FULL, we need two transactions.  One is to move tuples
 * from a page to another, to empty out last pages, which typically goes
 * into repair_frag.  We used to perform truncate operation there, but
 * it required to record transaction commit locally, which is not pleasant
 * if QD decides to cancel the whoe distributed transaction.  So the truncate
 * step is separated to a second transaction.  This two step operation is
 * performed on both base relation and toast relation at the same time.
 *
 * Lazy vacuum to heap is one step operation.
 *
 * AO compaction is rather complicated.  There are four phases.
 *   - prepare phase
 *   - compaction phase
 *   - drop phase
 *   - cleanup phase
 * Out of these, compaction and drop phase might repeat multiple times.
 * We go through the list of available segment files by looking up catalog,
 * and perform a compaction operation, which appends the whole segfile
 * to another available one, if the source segfile looks to be dirty enough.
 * If we find such one and perform compaction, the next step is drop. In
 * order to allow concurrent read it is required for the drop phase to
 * be a separate transaction.  We mark the segfile as an awaiting-drop
 * in the catalog, and the drop phase actually drops the segfile from the
 * disk.  There are some cases where we cannot drop the segfile immediately,
 * in which case we just skip it and leave the catalog to have awaiting-drop
 * state for this segfile.  Aside from the compaction and drop phases, the
 * rest is much simpler.  The prepare phase is to truncate unnecessary
 * blocks after the logical EOF, and the cleanup phase does normal heap
 * vacuum on auxiliary relations (toast, aoseg, block directory, visimap,)
 * as well as updating stats info in catalog.  Keep in mind that if the
 * vacuum is full, we need the same two steps as the heap base relation
 * case.  So cleanup phase in AO may consume two transactions.
 *
 * While executing these multiple transactions, we acquire a session
 * lock across transactions, in order to keep concurrent work on the
 * same relation away.  It doesn't look intuitive, though, if you look
 * at QE work, because from its perspective it is always one step, therefore
 * there is no session lock technically (we actually acquire and release
 * it as it's harmless.)  Session lock doesn't work here, because QE
 * is under a distributed transaction and we haven't supported session
 * lock recording in transaction prepare.  This should be ok as far as
 * we are dealing with user table, because other MPP work also tries
 * to take a relation lock, which would conflict with this vacuum work
 * on master.  Be careful with catalog tables, because we take locks on
 * them and release soon much before the end of transaction.  That means
 * QE still needs to deal with concurrent work well.
 */
static void
951 952 953
vacuumStatement_Relation(VacuumStmt *vacstmt, Oid relid,
						 List *relations, BufferAccessStrategy bstrategy,
						 bool for_wraparound, bool isTopLevel)
954 955
{
	LOCKMODE			lmode = NoLock;
956
	Relation			onerel;
957 958 959 960
	LockRelId			onerelid;
	MemoryContext		oldctx;
	bool				bTemp;
	VacuumStatsContext stats_context;
961

962 963 964 965
	vacstmt = copyObject(vacstmt);
	vacstmt->analyze = false;
	vacstmt->vacuum = true;

966
	/*
967 968 969
	 * We compact segment file by segment file.
	 * Therefore in some cases, we have multiple vacuum dispatches
	 * per relation.
970
	 */
971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995
	bool getnextrelation = false;

	/* Number of rounds performed on this relation */
	int relationRound = 0;

	List* compactedSegmentFileList = NIL;
	List* insertedSegmentFileList = NIL;

	bool dropPhase = false;
	bool truncatePhase = false;

	Assert(vacstmt);

	if (Gp_role != GP_ROLE_EXECUTE)
	{
		/* First call on a relation is the prepare phase */
		vacstmt->appendonly_compaction_vacuum_prepare = true;

		/*
		 * Reset truncate flag always as we may iterate more than one relation.
		 */
		vacstmt->heap_truncate = false;
	}

	while (!getnextrelation)
996
	{
997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012
		bTemp = false;
		getnextrelation = true;

		if (Gp_role != GP_ROLE_EXECUTE && (!dropPhase || truncatePhase))
		{
			/* Reset the compaction segno if new relation or segment file is started */
			list_free(vacstmt->appendonly_compaction_segno);
			list_free(vacstmt->appendonly_compaction_insert_segno);
			vacstmt->appendonly_compaction_segno = NIL;
			vacstmt->appendonly_compaction_insert_segno = NIL;
			vacstmt->appendonly_compaction_vacuum_cleanup = false;
		}

		/* Set up the distributed transaction context. */
		if (Gp_role == GP_ROLE_DISPATCH)
			setupRegularDtxContext();
1013

1014
		/*
1015 1016 1017 1018
		 * For each iteration we start/commit our own transactions,
		 * so that we can release resources such as locks and memories,
		 * and we can also safely perform non-transactional work
		 * along with transactional work.
1019
		 */
1020
		StartTransactionCommand();
B
Bruce Momjian 已提交
1021

1022
		/*
1023 1024
		 * Functions in indexes may want a snapshot set. Also, setting
		 * a snapshot ensures that RecentGlobalXmin is kept truly recent.
1025 1026
		 */
		ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
1027

1028 1029 1030
		if (!vacstmt->full)
		{
			/*
1031
			 * PostgreSQL does this:
1032 1033 1034 1035 1036 1037 1038 1039 1040 1041
			 * During a lazy VACUUM we can set the PROC_IN_VACUUM flag, which lets other
			 * concurrent VACUUMs know that they can ignore this one while
			 * determining their OldestXmin.  (The reason we don't set it during a
			 * full VACUUM is exactly that we may have to run user- defined
			 * functions for functional indexes, and we want to make sure that if
			 * they use the snapshot set above, any tuples it requires can't get
			 * removed from other tables.  An index function that depends on the
			 * contents of other tables is arguably broken, but we won't break it
			 * here by violating transaction semantics.)
			 *
1042 1043 1044 1045
			 * GPDB doesn't use PROC_IN_VACUUM, as lazy vacuum for bitmap
			 * indexed tables performs reindex causing updates to pg_class
			 * tuples for index entries.
			 *
1046 1047 1048 1049 1050 1051 1052 1053 1054 1055
			 * We also set the VACUUM_FOR_WRAPAROUND flag, which is passed down
			 * by autovacuum; it's used to avoid cancelling a vacuum that was
			 * invoked in an emergency.
			 *
			 * Note: this flag remains set until CommitTransaction or
			 * AbortTransaction.  We don't want to clear it until we reset
			 * MyProc->xid/xmin, else OldestXmin might appear to go backwards,
			 * which is probably Not Good.
			 */
			LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
1056
#if 0 /* Upstream code not applicable to GPDB */
1057
			MyProc->vacuumFlags |= PROC_IN_VACUUM;
1058
#endif
1059 1060 1061 1062 1063
			if (for_wraparound)
				MyProc->vacuumFlags |= PROC_VACUUM_FOR_WRAPAROUND;
			LWLockRelease(ProcArrayLock);
		}

1064
		/*
1065
		 * AO only: QE can tell drop phase here with dispatched vacstmt.
1066
		 */
1067 1068
		if (Gp_role == GP_ROLE_EXECUTE)
			dropPhase = vacuumStatement_IsInAppendOnlyDropPhase(vacstmt);
1069

1070
		/*
1071
		 * Open the relation with an appropriate lock, and check the permission.
1072
		 */
1073
		onerel = open_relation_and_check_permission(vacstmt, relid, RELKIND_RELATION, dropPhase);
1074

1075 1076 1077 1078 1079 1080 1081 1082 1083 1084
		if (onerel == NULL)
		{
			if (Gp_role != GP_ROLE_EXECUTE)
			{
				DeregisterSegnoForCompactionDrop(relid,
									vacstmt->appendonly_compaction_segno);
				CommitTransactionCommand();
			}
			continue;
		}
1085

1086
		/* XXX not about temporary table, this is about meta data tracking. */
1087
		bTemp = vacuumStatement_IsTemporary(onerel);
1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214

		vacuumStatement_AssignRelation(vacstmt, relid, relations);

		if (Gp_role != GP_ROLE_EXECUTE)
		{
			/*
			 * Keep things generated by this QD decision beyond a transaction.
			 */
			oldctx = MemoryContextSwitchTo(vac_context);
			if (RelationIsHeap(onerel))
			{
				/*
				 * We perform truncate in the second transaction, to avoid making
				 * it necessary to record transaction commit in the middle of
				 * vacuum operation in case we move tuples across pages.  It may
				 * not need to do so if the relation is clean, but the decision
				 * to perform truncate is segment-local and QD cannot tell if
				 * everyone can skip it.
				 */
				if (vacstmt->full)
				{
					Assert(relationRound == 0 || relationRound == 1);
					if (relationRound == 0)
						getnextrelation = false;
					else if (relationRound == 1)
						vacstmt->heap_truncate = true;
				}
			}
			/* the rest is about AO tables */
			else if (vacstmt->appendonly_compaction_vacuum_prepare)
			{
				getnextrelation = false;
				dropPhase = false;
			}
			else if (!dropPhase)
			{
				if (!vacuumStatement_AssignAppendOnlyCompactionInfo(vacstmt,
							onerel, compactedSegmentFileList,
							insertedSegmentFileList, &getnextrelation))
				{
					MemoryContextSwitchTo(oldctx);
					/* Nothing left to do for this relation */
					relation_close(onerel, NoLock);
					CommitTransactionCommand();
					/* don't dispatch this iteration */
					continue;
				}

				compactedSegmentFileList =
					list_union_int(compactedSegmentFileList,
						vacstmt->appendonly_compaction_segno);
				insertedSegmentFileList =
					list_union_int(insertedSegmentFileList,
						vacstmt->appendonly_compaction_insert_segno);

				dropPhase = !getnextrelation;
			}
			else
			{
				if (HasSerializableBackends(false))
				{
					/*
					 * Checking at this point is safe because
					 * any serializable transaction that could start afterwards
					 * will already see the state with AWAITING_DROP. We
					 * have only to deal with transactions that started before
					 * our transaction.
					 *
					 * We immediatelly get the next relation. There is no
					 * reason to stay in this relation. Actually, all
					 * other ao relation will skip the compaction step.
					 */
					elogif(Debug_appendonly_print_compaction, LOG,
							"Skipping freeing compacted append-only segment file "
							"because of concurrent serializable transaction");

					DeregisterSegnoForCompactionDrop(relid, vacstmt->appendonly_compaction_segno);
					MemoryContextSwitchTo(oldctx);
					relation_close(onerel, NoLock);
					CommitTransactionCommand();
					/* don't dispatch this iteration */
					continue;
				}
				elogif(Debug_appendonly_print_compaction, LOG,
						"Dispatch drop transaction on append-only relation %s",
						RelationGetRelationName(onerel));

				RegisterSegnoForCompactionDrop(relid, vacstmt->appendonly_compaction_segno);
				list_free(vacstmt->appendonly_compaction_insert_segno);
				vacstmt->appendonly_compaction_insert_segno = NIL;
				dropPhase = false;
				getnextrelation = false;
			}
			MemoryContextSwitchTo(oldctx);

			/*
			 * For AO tables VACUUM FULL, we perform two-step for aux relations.
			 */
			if (!RelationIsHeap(onerel) &&
				vacstmt->full &&
				vacstmt->appendonly_compaction_vacuum_cleanup)
			{
				if (!truncatePhase)
				{
					truncatePhase = true;
					getnextrelation = false;
				}
				else
				{
					truncatePhase = false;
					vacstmt->heap_truncate = true;
				}
			}
		}

		/*
		 * Reset the global array if this step is not for heap truncate.
		 * We use this array only when trancating.
		 */
		if (!vacstmt->heap_truncate)
			VacFullInitialStatsSize = 0;

		/*
		 * Record the relation that is in the vacuum process, so
		 * that we can clear up its freespace map entry when the
		 * vacuum process crashes or is cancelled.
		 *
1215
		 * XXX: Have to allocate the space inside TopMemoryContext,
1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227
		 * since it is required during commit.
		 */
		oldctx = MemoryContextSwitchTo(TopMemoryContext);
		AppendRelToVacuumRels(onerel);
		MemoryContextSwitchTo(oldctx);

		/*
		 * If we are in the dispatch mode, dispatch this modified
		 * vacuum statement to QEs, and wait for them to finish.
		 */
		if (Gp_role == GP_ROLE_DISPATCH)
		{
H
Heikki Linnakangas 已提交
1228
			int 		i, nindexes;
1229 1230 1231
			bool 		has_bitmap = false;
			Relation   *i_rel = NULL;

1232 1233 1234 1235 1236
			stats_context.ctx = vac_context;
			stats_context.onerel = onerel;
			stats_context.updated_stats = NIL;
			stats_context.vac_stats = NULL;

1237 1238 1239 1240 1241
			vac_open_indexes(onerel, AccessShareLock, &nindexes, &i_rel);
			if (i_rel != NULL)
			{
				for (i = 0; i < nindexes; i++)
				{
H
Heikki Linnakangas 已提交
1242
					if (RelationIsBitmapIndex(i_rel[i]))
1243
					{
H
Heikki Linnakangas 已提交
1244 1245
						has_bitmap = true;
						break;
1246 1247 1248 1249
					}
				}
			}
			vac_close_indexes(nindexes, i_rel, AccessShareLock);
1250 1251

			/*
1252 1253 1254
			 * We have to acquire a ShareLock for the relation which has bitmap
			 * indexes, since reindex is used later. Otherwise, concurrent
			 * vacuum and inserts may cause deadlock. MPP-5960
1255
			 */
1256
			if (has_bitmap)
1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285
				LockRelation(onerel, ShareLock);

			dispatchVacuum(vacstmt, &stats_context);
		}

		if (vacstmt->full)
			lmode = AccessExclusiveLock;
		else if (RelationIsAoRows(onerel) || RelationIsAoCols(onerel))
			lmode = AccessShareLock;
		else
			lmode = ShareUpdateExclusiveLock;

		if (relationRound == 0)
		{
			onerelid = onerel->rd_lockInfo.lockRelId;

			/*
			 * Get a session-level lock too. This will protect our
			 * access to the relation across multiple transactions, so
			 * that we can vacuum the relation's TOAST table (if any)
			 * secure in the knowledge that no one is deleting the
			 * parent relation.
			 *
			 * NOTE: this cannot block, even if someone else is
			 * waiting for access, because the lock manager knows that
			 * both lock requests are from the same process.
			 */
			LockRelationIdForSession(&onerelid, lmode);
		}
1286 1287
		vacuum_rel(onerel, vacstmt, lmode, stats_context.updated_stats,
				   for_wraparound);
1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334

		if (Gp_role == GP_ROLE_DISPATCH)
		{
			list_free_deep(stats_context.updated_stats);
			stats_context.updated_stats = NIL;

			/*
			 * Update ao master tupcount the hard way after the compaction and
			 * after the drop.
			 */
			if (vacstmt->appendonly_compaction_segno)
			{
				Assert(RelationIsAoRows(onerel) || RelationIsAoCols(onerel));

				if (vacuumStatement_IsInAppendOnlyCompactionPhase(vacstmt) &&
						!vacuumStatement_IsInAppendOnlyPseudoCompactionPhase(vacstmt))
				{
					/* In the compact phase, we need to update the information of the segment file we inserted into */
					UpdateMasterAosegTotalsFromSegments(onerel, SnapshotNow, vacstmt->appendonly_compaction_insert_segno, 0);
				}
				else if (vacuumStatement_IsInAppendOnlyDropPhase(vacstmt))
				{
					/* In the drop phase, we need to update the information of the compacted segment file(s) */
					UpdateMasterAosegTotalsFromSegments(onerel, SnapshotNow, vacstmt->appendonly_compaction_segno, 0);
				}
			}
		}

		/*
		 * Close source relation now, but keep lock so that no one
		 * deletes it before we commit.  (If someone did, they'd
		 * fail to clean up the entries we made in pg_statistic.
		 * Also, releasing the lock before commit would expose us
		 * to concurrent-update failures in update_attstats.)
		 */
		relation_close(onerel, NoLock);

		/*
		 * MPP-6929: metadata tracking
		 * We need some transaction to update the catalog.  We could do
		 * it on the outer vacuumStatement, but it is useful to track
		 * relation by relation.
		 */
		if (relationRound == 0 && !bTemp && (Gp_role == GP_ROLE_DISPATCH))
		{
			char *vsubtype = ""; /* NOFULL */

1335
			if (IsAutoVacuumWorkerProcess())
1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403
				vsubtype = "AUTO";
			else
			{
				if (vacstmt->full &&
					(0 == vacstmt->freeze_min_age))
					vsubtype = "FULL FREEZE";
				else if (vacstmt->full)
					vsubtype = "FULL";
				else if (0 == vacstmt->freeze_min_age)
					vsubtype = "FREEZE";
			}
			MetaTrackUpdObject(RelationRelationId,
							   relid,
							   GetUserId(),
							   "VACUUM",
							   vsubtype);
		}

		if (list_length(relations) > 1)
		{
			pfree(vacstmt->relation->schemaname);
			pfree(vacstmt->relation->relname);
			pfree(vacstmt->relation);
			vacstmt->relation = NULL;
		}
		vacstmt->appendonly_compaction_vacuum_prepare = false;

		/*
		 * Transaction commit is always executed on QD.
		 */
		if (Gp_role != GP_ROLE_EXECUTE)
			CommitTransactionCommand();

		if (relationRound == 0)
		{
			SIMPLE_FAULT_INJECTOR(VacuumRelationEndOfFirstRound);
		}

		relationRound++;
	}

	if (lmode != NoLock)
	{
		UnlockRelationIdForSession(&onerelid, lmode);
	}

	if (compactedSegmentFileList)
	{
		list_free(compactedSegmentFileList);
		compactedSegmentFileList = NIL;
	}
	if (insertedSegmentFileList)
	{
		list_free(insertedSegmentFileList);
		insertedSegmentFileList = NIL;
	}
	if (vacstmt->appendonly_compaction_segno)
	{
		list_free(vacstmt->appendonly_compaction_segno);
		vacstmt->appendonly_compaction_segno = NIL;
	}
	if (vacstmt->appendonly_compaction_insert_segno)
	{
		list_free(vacstmt->appendonly_compaction_insert_segno);
		vacstmt->appendonly_compaction_insert_segno = NIL;
	}
}

1404
/*
1405
 * Build a list of Oids for each relation to be processed
1406 1407 1408
 *
 * The list is built in vac_context so that it will survive across our
 * per-relation transactions.
1409
 */
1410
static List *
1411
get_rel_oids(List *relids, VacuumStmt *vacstmt, bool isVacuum)
1412
{
N
Neil Conway 已提交
1413
	List	   *oid_list = NIL;
1414 1415
	MemoryContext oldcontext;

1416 1417 1418 1419
	/* List supplied by VACUUM's caller? */
	if (relids)
		return relids;

1420
	if (vacstmt->relation)
1421
	{
1422 1423 1424 1425 1426
		if (isVacuum)
		{
			/* Process a specific relation */
			Oid			relid;
			List	   *prels = NIL;
1427

1428
			relid = RangeVarGetRelid(vacstmt->relation, false);
1429

1430 1431 1432 1433 1434
			if (rel_is_partitioned(relid))
			{
				PartitionNode *pn;

				pn = get_parts(relid, 0, 0, false, true /*includesubparts*/);
1435

1436 1437 1438 1439 1440 1441 1442
				prels = all_partition_relids(pn);
			}
			else if (rel_is_child_partition(relid))
			{
				/* get my children */
				prels = find_all_inheritors(relid);
			}
1443

1444 1445 1446 1447 1448
			/* Make a relation list entry for this relation */
			oldcontext = MemoryContextSwitchTo(vac_context);
			oid_list = lappend_oid(oid_list, relid);
			oid_list = list_concat_unique_oid(oid_list, prels);
			MemoryContextSwitchTo(oldcontext);
1449
		}
1450
		else
1451
		{
1452 1453 1454 1455 1456 1457 1458 1459
			oldcontext = MemoryContextSwitchTo(vac_context);
			/**
			 * ANALYZE one relation (optionally, a list of columns).
			 */
			Oid relationOid = InvalidOid;

			relationOid = RangeVarGetRelid(vacstmt->relation, false);
			PartStatus ps = rel_part_status(relationOid);
1460

1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474
			if (ps != PART_STATUS_ROOT && vacstmt->rootonly)
			{
				ereport(WARNING,
						(errmsg("skipping \"%s\" --- cannot analyze a non-root partition using ANALYZE ROOTPARTITION",
								get_rel_name(relationOid))));
			}
			else if (ps == PART_STATUS_ROOT)
			{
				PartitionNode *pn = get_parts(relationOid, 0 /*level*/ ,
											  0 /*parent*/, false /* inctemplate */, true /*includesubparts*/);
				Assert(pn);
				if (!vacstmt->rootonly)
				{
					oid_list = all_leaf_partition_relids(pn); /* all leaves */
1475 1476 1477 1478 1479

					if (optimizer_analyze_midlevel_partition)
					{
						oid_list = list_concat(oid_list, all_interior_partition_relids(pn)); /* interior partitions */
					}
1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502
				}
				oid_list = lappend_oid(oid_list, relationOid); /* root partition */
			}
			else if (ps == PART_STATUS_INTERIOR) /* analyze an interior partition directly */
			{
				/* disable analyzing mid-level partitions directly since the users are encouraged
				 * to work with the root partition only. To gather stats on mid-level partitions
				 * (for Orca's use), the user should run ANALYZE or ANALYZE ROOTPARTITION on the
				 * root level with optimizer_analyze_midlevel_partition GUC set to ON.
				 * Planner uses the stats on leaf partitions, so its unnecesary to collect stats on
				 * midlevel partitions.
				 */
				ereport(WARNING,
						(errmsg("skipping \"%s\" --- cannot analyze a mid-level partition. "
								"Please run ANALYZE on the root partition table.",
								get_rel_name(relationOid))));
			}
			else
			{
				oid_list = list_make1_oid(relationOid);
			}
			MemoryContextSwitchTo(oldcontext);
		}
1503 1504 1505
	}
	else
	{
1506
		/* Process all plain relations listed in pg_class */
1507 1508
		Relation	pgclass;
		HeapScanDesc scan;
1509
		HeapTuple	tuple;
1510
		ScanKeyData key;
1511
		Oid candidateOid;
1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522

		ScanKeyInit(&key,
					Anum_pg_class_relkind,
					BTEqualStrategyNumber, F_CHAREQ,
					CharGetDatum(RELKIND_RELATION));

		pgclass = heap_open(RelationRelationId, AccessShareLock);

		scan = heap_beginscan(pgclass, SnapshotNow, 1, &key);

		while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1523 1524
		{
			Form_pg_class classForm = (Form_pg_class) GETSTRUCT(tuple);
1525

1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536
			/*
			 * Don't include non-vacuum-able relations:
			 *   - External tables
			 *   - Foreign tables
			 *   - etc.
			 */
			if (classForm->relkind == RELKIND_RELATION && (
					classForm->relstorage == RELSTORAGE_EXTERNAL ||
					classForm->relstorage == RELSTORAGE_FOREIGN  ||
					classForm->relstorage == RELSTORAGE_VIRTUAL))
				continue;
1537

1538 1539 1540
			/* Skip persistent tables for Vacuum full. Vacuum full could turn
			 * out dangerous as it has potential to move tuples around causing
			 * the TIDs for tuples to change, which violates its reference from
1541 1542 1543
			 * gp_relation_node. One scenario where this can happen is zero-page
			 * due to failure after page extension but before page initialization.
			 */
1544 1545 1546
			if (vacstmt->full &&
				GpPersistent_IsPersistentRelation(HeapTupleGetOid(tuple)))
				continue;
1547

1548
			/* Make a relation list entry for this guy */
1549 1550 1551 1552 1553 1554 1555
			candidateOid = HeapTupleGetOid(tuple);

			/* Skip non root partition tables if ANALYZE ROOTPARTITION ALL is executed */
			if (vacstmt->rootonly && !rel_is_partitioned(candidateOid))
			{
				continue;
			}
1556 1557 1558 1559 1560 1561 1562 1563

			// skip mid-level partition tables if we have disabled collecting statistics for them
			PartStatus ps = rel_part_status(candidateOid);
			if (!optimizer_analyze_midlevel_partition && ps == PART_STATUS_INTERIOR)
			{
				continue;
			}

1564
			oldcontext = MemoryContextSwitchTo(vac_context);
1565
			oid_list = lappend_oid(oid_list, candidateOid);
1566
			MemoryContextSwitchTo(oldcontext);
1567
		}
1568

1569 1570
		heap_endscan(scan);
		heap_close(pgclass, AccessShareLock);
1571 1572
	}

N
Neil Conway 已提交
1573
	return oid_list;
1574 1575
}

1576 1577 1578 1579
/*
 * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
 */
void
1580
vacuum_set_xid_limits(int freeze_min_age, bool sharedRel,
1581 1582 1583
					  TransactionId *oldestXmin,
					  TransactionId *freezeLimit)
{
1584
	int			freezemin;
1585
	TransactionId limit;
1586
	TransactionId safeLimit;
1587

1588
	/*
B
Bruce Momjian 已提交
1589
	 * We can always ignore processes running lazy vacuum.	This is because we
1590
	 * use these values only for deciding which tuples we must keep in the
1591 1592 1593 1594 1595
	 * tables.	Since lazy vacuum doesn't write its XID anywhere, it's safe to
	 * ignore it.  In theory it could be problematic to ignore lazy vacuums on
	 * a full vacuum, but keep in mind that only one vacuum process can be
	 * working on a particular table at any time, and that each vacuum is
	 * always an independent transaction.
1596
	 */
1597
	*oldestXmin = GetOldestXmin(sharedRel, true);
1598 1599 1600

	Assert(TransactionIdIsNormal(*oldestXmin));

1601
	/*
1602 1603
	 * Determine the minimum freeze age to use: as specified by the caller, or
	 * vacuum_freeze_min_age, but in any case not more than half
1604 1605 1606
	 * autovacuum_freeze_max_age, so that autovacuums to prevent XID
	 * wraparound won't occur too frequently.
	 */
1607
	freezemin = freeze_min_age;
1608 1609 1610 1611
	if (freezemin < 0)
		freezemin = vacuum_freeze_min_age;
	freezemin = Min(freezemin, autovacuum_freeze_max_age / 2);
	Assert(freezemin >= 0);
1612

1613
	/*
1614
	 * Compute the cutoff XID, being careful not to generate a "permanent" XID
1615
	 */
1616
	limit = *oldestXmin - freezemin;
1617 1618 1619
	if (!TransactionIdIsNormal(limit))
		limit = FirstNormalTransactionId;

1620
	/*
1621
	 * If oldestXmin is very far back (in practice, more than
1622 1623
	 * autovacuum_freeze_max_age / 2 XIDs old), complain and force a minimum
	 * freeze age of zero.
1624
	 */
1625 1626 1627 1628 1629
	safeLimit = ReadNewTransactionId() - autovacuum_freeze_max_age;
	if (!TransactionIdIsNormal(safeLimit))
		safeLimit = FirstNormalTransactionId;

	if (TransactionIdPrecedes(limit, safeLimit))
1630
	{
1631
		ereport(WARNING,
1632
				(errmsg("oldest xmin is far in the past"),
1633
				 errhint("Close open transactions soon to avoid wraparound problems.")));
1634 1635 1636 1637 1638 1639
		limit = *oldestXmin;
	}

	*freezeLimit = limit;
}

H
Heikki Linnakangas 已提交
1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669
void
vac_update_relstats_from_list(Relation rel,
							  BlockNumber num_pages, double num_tuples,
							  bool hasindex, TransactionId frozenxid,
							  List *updated_stats)
{
	/*
	 * If this is QD, use the stats collected in updated_stats instead of
	 * the one provided through 'num_pages' and 'num_tuples'.  It doesn't
	 * seem worth doing so for system tables, though (it'd better say
	 * "non-distributed" tables than system relations here, but for now
	 * it's effectively the same.)
	 */
	if (Gp_role == GP_ROLE_DISPATCH && !IsSystemRelation(rel))
	{
		ListCell *lc;
		num_pages = 0;
		num_tuples = 0.0;
		foreach (lc, updated_stats)
		{
			VPgClassStats *stats = (VPgClassStats *) lfirst(lc);
			if (stats->relid == RelationGetRelid(rel))
			{
				num_pages += stats->rel_pages;
				num_tuples += stats->rel_tuples;
				break;
			}
		}
	}

1670 1671
	vac_update_relstats(RelationGetRelid(rel), num_pages, num_tuples,
						hasindex, frozenxid);
H
Heikki Linnakangas 已提交
1672
}
1673

1674
/*
1675
 *	vac_update_relstats() -- update statistics for one relation
1676
 *
1677 1678 1679 1680 1681
 *		Update the whole-relation statistics that are kept in its pg_class
 *		row.  There are additional stats that will be updated if we are
 *		doing ANALYZE, but we always update these stats.  This routine works
 *		for both index and heap relation entries in pg_class.
 *
1682 1683 1684 1685
 *		We violate transaction semantics here by overwriting the rel's
 *		existing pg_class tuple with the new values.  This is reasonably
 *		safe since the new values are correct whether or not this transaction
 *		commits.  The reason for this is that if we updated these tuples in
1686 1687 1688 1689 1690
 *		the usual way, vacuuming pg_class itself wouldn't work very well ---
 *		by the time we got done with a vacuum cycle, most of the tuples in
 *		pg_class would've been obsoleted.  Of course, this only works for
 *		fixed-size never-null columns, but these are.
 *
1691 1692
 *		This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
 *		ANALYZE.
1693 1694
 */
void
1695
vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
H
Heikki Linnakangas 已提交
1696
					bool hasindex, TransactionId frozenxid)
1697 1698 1699 1700
{
	Relation	rd;
	HeapTuple	ctup;
	Form_pg_class pgcform;
1701
	bool		dirty;
1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716

	Assert(relid != InvalidOid);

	/*
	 * CDB: send the number of tuples and the number of pages in pg_class located
	 * at QEs through the dispatcher.
	 */
	if (Gp_role == GP_ROLE_EXECUTE)
	{
		/* cdbanalyze_get_relstats(rel, &num_pages, &num_tuples);*/
		StringInfoData buf;
		VPgClassStats stats;

		pq_beginmessage(&buf, 'y');
		pq_sendstring(&buf, "VACUUM");
1717
		stats.relid = relid;
1718 1719 1720 1721
		stats.rel_pages = num_pages;
		stats.rel_tuples = num_tuples;
		stats.empty_end_pages = 0;
		pq_sendint(&buf, sizeof(VPgClassStats), sizeof(int));
1722
		pq_sendbytes(&buf, (char *) &stats, sizeof(VPgClassStats));
1723 1724
		pq_endmessage(&buf);
	}
1725

1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740
	/*
	 * We need a way to distinguish these 2 cases:
	 * a) ANALYZEd/VACUUMed table is empty
	 * b) Table has never been ANALYZEd/VACUUMed
	 * To do this, in case (a), we set relPages = 1. For case (b), relPages = 0.
	 */
	if (num_pages < 1.0)
	{
		Assert(num_tuples < 1.0);
		num_pages = 1.0;
	}

	/*
	 * update number of tuples and number of pages in pg_class
	 */
1741
	rd = heap_open(RelationRelationId, RowExclusiveLock);
1742

1743
	/* Fetch a copy of the tuple to scribble on */
1744 1745 1746
	ctup = SearchSysCacheCopy(RELOID,
							  ObjectIdGetDatum(relid),
							  0, 0, 0);
1747 1748 1749
	if (!HeapTupleIsValid(ctup))
		elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
			 relid);
1750
	pgcform = (Form_pg_class) GETSTRUCT(ctup);
1751

1752
	/* Apply required updates, if any, to copied tuple */
1753

1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769
	dirty = false;
	if (pgcform->relpages != (int32) num_pages)
	{
		pgcform->relpages = (int32) num_pages;
		dirty = true;
	}
	if (pgcform->reltuples != (float4) num_tuples)
	{
		pgcform->reltuples = (float4) num_tuples;
		dirty = true;
	}
	if (pgcform->relhasindex != hasindex)
	{
		pgcform->relhasindex = hasindex;
		dirty = true;
	}
B
Bruce Momjian 已提交
1770

1771 1772
	elog(DEBUG2, "Vacuum oid=%u pages=%d tuples=%f",
		 relid, pgcform->relpages, pgcform->reltuples);
1773
	/*
1774 1775
	 * If we have discovered that there are no indexes, then there's no
	 * primary key either.	This could be done more thoroughly...
1776 1777
	 */
	if (!hasindex)
1778 1779 1780 1781 1782 1783 1784
	{
		if (pgcform->relhaspkey)
		{
			pgcform->relhaspkey = false;
			dirty = true;
		}
	}
1785 1786 1787 1788 1789 1790 1791

	/*
	 * relfrozenxid should never go backward.  Caller can pass
	 * InvalidTransactionId if it has no new data.
	 */
	if (TransactionIdIsNormal(frozenxid) &&
		TransactionIdPrecedes(pgcform->relfrozenxid, frozenxid))
1792
	{
1793
		pgcform->relfrozenxid = frozenxid;
1794 1795
		dirty = true;
	}
1796

1797
	/*
1798 1799 1800
	 * If anything changed, write out the tuple.  Even if nothing changed,
	 * force relcache invalidation so all backends reset their rd_targblock
	 * --- otherwise it might point to a page we truncated away.
1801
	 */
1802
	if (dirty)
1803
	{
1804
		heap_inplace_update(rd, ctup);
1805 1806 1807 1808 1809 1810 1811
		/* the above sends a cache inval message */
	}
	else
	{
		/* no need to change tuple, but force relcache inval anyway */
		CacheInvalidateRelcacheByTuple(ctup);
	}
1812 1813 1814 1815 1816

	heap_close(rd, RowExclusiveLock);
}


1817
/*
1818
 *	vac_update_datfrozenxid() -- update pg_database.datfrozenxid for our DB
1819
 *
1820 1821 1822
 *		Update pg_database's datfrozenxid entry for our database to be the
 *		minimum of the pg_class.relfrozenxid values.  If we are able to
 *		advance pg_database.datfrozenxid, also try to truncate pg_clog.
1823
 *
1824
 *		We violate transaction semantics here by overwriting the database's
1825
 *		existing pg_database tuple with the new value.	This is reasonably
1826
 *		safe since the new value is correct whether or not this transaction
1827 1828
 *		commits.  As with vac_update_relstats, this avoids leaving dead tuples
 *		behind after a VACUUM.
1829
 *
1830
 *		This routine is shared by full and lazy VACUUM.
1831
 */
1832 1833
void
vac_update_datfrozenxid(void)
1834 1835 1836
{
	HeapTuple	tuple;
	Form_pg_database dbform;
1837
	Relation	relation;
1838
	SysScanDesc scan;
1839
	HeapTuple	classTup;
1840
	TransactionId newFrozenXid;
1841 1842
	bool		dirty = false;

1843
	/*
1844 1845 1846 1847
	 * Initialize the "min" calculation with GetOldestXmin, which is a
	 * reasonable approximation to the minimum relfrozenxid for not-yet-
	 * committed pg_class entries for new tables; see AddNewRelationTuple().
	 * Se we cannot produce a wrong minimum by starting with this.
1848
	 */
1849
	newFrozenXid = GetOldestXmin(true, true);
1850

B
Bruce Momjian 已提交
1851 1852 1853
	/*
	 * We must seqscan pg_class to find the minimum Xid, because there is no
	 * index that can help us here.
1854
	 */
1855 1856 1857 1858
	relation = heap_open(RelationRelationId, AccessShareLock);

	scan = systable_beginscan(relation, InvalidOid, false,
							  SnapshotNow, 0, NULL);
1859

1860
	while ((classTup = systable_getnext(scan)) != NULL)
1861
	{
1862
		Form_pg_class classForm = (Form_pg_class) GETSTRUCT(classTup);
1863 1864 1865

		/*
		 * Only consider heap and TOAST tables (anything else should have
1866
		 * InvalidTransactionId in relfrozenxid anyway.)
1867 1868
		 */
		if (classForm->relkind != RELKIND_RELATION &&
1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879
			classForm->relkind != RELKIND_TOASTVALUE &&
			classForm->relkind != RELKIND_AOSEGMENTS &&
			classForm->relkind != RELKIND_AOBLOCKDIR &&
			classForm->relkind != RELKIND_AOVISIMAP)
			continue;

		/* MPP-10108 - exclude relations with external storage */
		if (classForm->relkind == RELKIND_RELATION && (
				classForm->relstorage == RELSTORAGE_EXTERNAL ||
				classForm->relstorage == RELSTORAGE_FOREIGN  ||
				classForm->relstorage == RELSTORAGE_VIRTUAL))
1880 1881
			continue;

1882 1883 1884 1885
		/* exclude persistent tables, as all updates to it are frozen */
		if (GpPersistent_IsPersistentRelation(HeapTupleGetOid(classTup)))
			continue;

1886
		Assert(TransactionIdIsNormal(classForm->relfrozenxid));
1887

1888 1889
		if (TransactionIdPrecedes(classForm->relfrozenxid, newFrozenXid))
			newFrozenXid = classForm->relfrozenxid;
1890
	}
1891

1892
	/* we're done with pg_class */
1893 1894
	systable_endscan(scan);
	heap_close(relation, AccessShareLock);
1895

1896
	Assert(TransactionIdIsNormal(newFrozenXid));
1897 1898

	/* Now fetch the pg_database tuple we need to update. */
1899
	relation = heap_open(DatabaseRelationId, RowExclusiveLock);
1900

1901
	/* Fetch a copy of the tuple to scribble on */
1902 1903 1904
	tuple = SearchSysCacheCopy(DATABASEOID,
							   ObjectIdGetDatum(MyDatabaseId),
							   0, 0, 0);
1905
	if (!HeapTupleIsValid(tuple))
1906
		elog(ERROR, "could not find tuple for database %u", MyDatabaseId);
1907 1908
	dbform = (Form_pg_database) GETSTRUCT(tuple);

1909 1910 1911 1912 1913
	/*
	 * Don't allow datfrozenxid to go backward (probably can't happen anyway);
	 * and detect the common case where it doesn't go forward either.
	 */
	if (TransactionIdPrecedes(dbform->datfrozenxid, newFrozenXid))
1914
	{
1915
		dbform->datfrozenxid = newFrozenXid;
1916 1917
		dirty = true;
	}
1918

1919 1920
	if (dirty)
		heap_inplace_update(relation, tuple);
1921

1922
	heap_freetuple(tuple);
1923
	heap_close(relation, RowExclusiveLock);
1924

1925 1926
	/*
	 * If we were able to advance datfrozenxid, mark the flat-file copy of
1927
	 * pg_database for update at commit, and see if we can truncate pg_clog.
1928 1929 1930 1931 1932 1933
	 */
	if (dirty)
	{
		database_file_update_needed();
		vac_truncate_clog(newFrozenXid);
	}
1934 1935 1936 1937 1938 1939
}


/*
 *	vac_truncate_clog() -- attempt to truncate the commit log
 *
1940
 *		Scan pg_database to determine the system-wide oldest datfrozenxid,
1941
 *		and use it to truncate the transaction commit log (pg_clog).
1942
 *		Also update the XID wrap limit info maintained by varsup.c.
1943
 *
1944 1945
 *		The passed XID is simply the one I just wrote into my pg_database
 *		entry.	It's used to initialize the "min" calculation.
1946
 *
1947 1948 1949
 *		This routine is shared by full and lazy VACUUM.  Note that it's
 *		only invoked when we've managed to change our DB's datfrozenxid
 *		entry.
1950 1951
 */
static void
1952
vac_truncate_clog(TransactionId frozenXID)
1953
{
B
Bruce Momjian 已提交
1954
	TransactionId myXID = GetCurrentTransactionId();
1955 1956
	Relation	relation;
	HeapScanDesc scan;
1957
	HeapTuple	tuple;
1958
	NameData	oldest_datname;
1959
	bool		frozenAlreadyWrapped = false;
1960

1961 1962 1963 1964 1965 1966 1967 1968 1969 1970
	/* init oldest_datname to sync with my frozenXID */
	namestrcpy(&oldest_datname, get_database_name(MyDatabaseId));

	/*
	 * Scan pg_database to compute the minimum datfrozenxid
	 *
	 * Note: we need not worry about a race condition with new entries being
	 * inserted by CREATE DATABASE.  Any such entry will have a copy of some
	 * existing DB's datfrozenxid, and that source DB cannot be ours because
	 * of the interlock against copying a DB containing an active backend.
1971 1972 1973 1974
	 * Hence the new entry will not reduce the minimum.  Also, if two VACUUMs
	 * concurrently modify the datfrozenxid's of different databases, the
	 * worst possible outcome is that pg_clog is not truncated as aggressively
	 * as it could be.
1975
	 */
1976 1977 1978
	relation = heap_open(DatabaseRelationId, AccessShareLock);

	scan = heap_beginscan(relation, SnapshotNow, 0, NULL);
1979

1980
	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001
	{
		Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);

		Assert(TransactionIdIsNormal(dbform->datfrozenxid));

		/*
		 * MPP-20053: Skip databases that cannot be connected to in computing
		 * the oldest database.
		 */
		if (dbform->datallowconn)
		{
			if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
				frozenAlreadyWrapped = true;
			else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
			{
				frozenXID = dbform->datfrozenxid;
				namecpy(&oldest_datname, &dbform->datname);
			}
		}
	}

2002 2003 2004
	heap_endscan(scan);

	heap_close(relation, AccessShareLock);
2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015

	/*
	 * Do not truncate CLOG if we seem to have suffered wraparound already;
	 * the computed minimum XID might be bogus.  This case should now be
	 * impossible due to the defenses in GetNewTransactionId, but we keep the
	 * test anyway.
	 */
	if (frozenAlreadyWrapped)
	{
		ereport(WARNING,
				(errmsg("some databases have not been vacuumed in over 2 billion transactions"),
2016
				 errdetail("You might have already suffered transaction-wraparound data loss.")));
2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038
		return;
	}

	/* Truncate CLOG to the oldest frozenxid */
	TruncateCLOG(frozenXID);
	DistributedLog_Truncate(frozenXID);

	/*
	 * Update the wrap limit for GetNewTransactionId.  Note: this function
	 * will also signal the postmaster for an(other) autovac cycle if needed.
	 */
	SetTransactionIdLimit(frozenXID, &oldest_datname);
}


/****************************************************************************
 *																			*
 *			Code common to both flavors of VACUUM							*
 *																			*
 ****************************************************************************
 */

2039

2040 2041 2042 2043 2044 2045 2046 2047 2048 2049
/*
 *	vacuum_rel() -- vacuum one heap relation
 *
 *		Doing one heap at a time incurs extra overhead, since we need to
 *		check that the heap exists again just before we vacuum it.	The
 *		reason that we do this is so that vacuuming can be spread across
 *		many small transactions.  Otherwise, two-phase locking would require
 *		us to lock the entire database during one pass of the vacuum cleaner.
 */
static void
2050 2051
vacuum_rel(Relation onerel, VacuumStmt *vacstmt, LOCKMODE lmode, List *updated_stats,
		   bool for_wraparound)
2052 2053 2054 2055 2056 2057 2058 2059
{
	Oid			toast_relid;
	Oid			aoseg_relid = InvalidOid;
	Oid         aoblkdir_relid = InvalidOid;
	Oid         aovisimap_relid = InvalidOid;
	Oid			save_userid;
	int			save_sec_context;
	int			save_nestlevel;
2060
	bool		heldoff;
2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074

	/*
	 * Check for user-requested abort.	Note we want this to be inside a
	 * transaction, so xact.c doesn't issue useless WARNING.
	 */
	CHECK_FOR_INTERRUPTS();

	/*
	 * Remember the relation's TOAST and AO segments relations for later
	 */
	toast_relid = onerel->rd_rel->reltoastrelid;
	if (RelationIsAoRows(onerel) || RelationIsAoCols(onerel))
	{
		GetAppendOnlyEntryAuxOids(RelationGetRelid(onerel), SnapshotNow,
2075
								  &aoseg_relid,
2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097
								  &aoblkdir_relid, NULL,
								  &aovisimap_relid, NULL);
		vacstmt->appendonly_relation_empty =
				AppendOnlyCompaction_IsRelationEmpty(onerel);
	}


	/*
	 * Switch to the table owner's userid, so that any index functions are run
	 * as that user.  Also lock down security-restricted operations and
	 * arrange to make GUC variable changes local to this command.
	 * (This is unnecessary, but harmless, for lazy VACUUM.)
	 */
	GetUserIdAndSecContext(&save_userid, &save_sec_context);
	SetUserIdAndSecContext(onerel->rd_rel->relowner,
						   save_sec_context | SECURITY_RESTRICTED_OPERATION);
	save_nestlevel = NewGUCNestLevel();

	/*
	 * Do the actual work --- either FULL or "lazy" vacuum
	 */
	if (vacstmt->full)
2098
		heldoff = full_vacuum_rel(onerel, vacstmt, updated_stats);
2099
	else
2100
		heldoff = lazy_vacuum_rel(onerel, vacstmt, vac_strategy, updated_stats);
2101 2102 2103 2104 2105 2106 2107

	/* Roll back any GUC changes executed by index functions */
	AtEOXact_GUC(false, save_nestlevel);

	/* Restore userid and security context */
	SetUserIdAndSecContext(save_userid, save_sec_context);

2108 2109 2110 2111
	/* now we can allow interrupts again, if disabled */
	if (heldoff)
		RESUME_INTERRUPTS();

2112
	/*
2113 2114 2115 2116
	 * If the relation has a secondary toast rel, vacuum that too while we
	 * still hold the session lock on the master table.  We do this in
	 * cleanup phase when it's AO table or in prepare phase if it's an
	 * empty AO table.
2117 2118 2119 2120 2121 2122 2123 2124 2125 2126
	 */
	if ((RelationIsHeap(onerel) && toast_relid != InvalidOid) ||
		(!RelationIsHeap(onerel) && (
				vacstmt->appendonly_compaction_vacuum_cleanup ||
				vacstmt->appendonly_relation_empty)))
	{
		Relation toast_rel = open_relation_and_check_permission(vacstmt, toast_relid,
																RELKIND_TOASTVALUE, false);
		if (toast_rel != NULL)
		{
2127
			vacuum_rel(toast_rel, vacstmt, lmode, updated_stats, for_wraparound);
2128 2129 2130 2131 2132 2133

			/* all done with this class, but hold lock until commit */
			relation_close(toast_rel, NoLock);
		}
	}

2134 2135 2136 2137 2138 2139 2140 2141 2142 2143
	/*
	 * If an AO/CO table is empty on a segment,
	 * vacstmt->appendonly_relation_empty will get set to true even in the
	 * compaction phase. In such a case, we end up updating the auxiliary
	 * tables and try to vacuum them all in the same transaction. This causes
	 * the auxiliary relation to not get vacuumed and it generates a notice to
	 * the user saying that transaction is already in progress. Hence we want
	 * to vacuum the auxliary relations only in cleanup phase or if we are in
	 * the prepare phase and the AO/CO table is empty.
	 */
2144
	if (vacstmt->appendonly_compaction_vacuum_cleanup ||
2145
		(vacstmt->appendonly_relation_empty && vacstmt->appendonly_compaction_vacuum_prepare))
2146 2147 2148 2149 2150 2151 2152 2153
	{
		/* do the same for an AO segments table, if any */
		if (aoseg_relid != InvalidOid)
		{
			Relation aoseg_rel = open_relation_and_check_permission(vacstmt, aoseg_relid,
																	RELKIND_AOSEGMENTS, false);
			if (aoseg_rel != NULL)
			{
2154
				vacuum_rel(aoseg_rel, vacstmt, lmode, updated_stats, for_wraparound);
2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167

				/* all done with this class, but hold lock until commit */
				relation_close(aoseg_rel, NoLock);
			}
		}

		/* do the same for an AO block directory table, if any */
		if (aoblkdir_relid != InvalidOid)
		{
			Relation aoblkdir_rel = open_relation_and_check_permission(vacstmt, aoblkdir_relid,
																	   RELKIND_AOBLOCKDIR, false);
			if (aoblkdir_rel != NULL)
			{
2168
				vacuum_rel(aoblkdir_rel, vacstmt, lmode, updated_stats, for_wraparound);
2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181

				/* all done with this class, but hold lock until commit */
				relation_close(aoblkdir_rel, NoLock);
			}
		}

		/* do the same for an AO visimap, if any */
		if (aovisimap_relid != InvalidOid)
		{
			Relation aovisimap_rel = open_relation_and_check_permission(vacstmt, aovisimap_relid,
																	   RELKIND_AOVISIMAP, false);
			if (aovisimap_rel != NULL)
			{
2182
				vacuum_rel(aovisimap_rel, vacstmt, lmode, updated_stats, for_wraparound);
2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204

				/* all done with this class, but hold lock until commit */
				relation_close(aovisimap_rel, NoLock);
			}
		}
	}
}


/****************************************************************************
 *																			*
 *			Code for VACUUM FULL (only)										*
 *																			*
 ****************************************************************************
 */

/*
 * Remember the relation stats that will be used in the next truncate phase.
 */
static void
save_vacstats(Oid relid, BlockNumber rel_pages, double rel_tuples, BlockNumber empty_end_pages)
{
2205
	VPgClassStats *stats;
2206 2207 2208 2209 2210 2211

	if (VacFullInitialStatsSize >= MaxVacFullInitialStatsSize)
		elog(ERROR, "out of stats slot");

	stats = &VacFullInitialStats[VacFullInitialStatsSize++];

2212
	stats->relid = relid;
2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306
	stats->rel_pages = rel_pages;
	stats->rel_tuples = rel_tuples;
	stats->empty_end_pages = empty_end_pages;

	/* Should not happen */
	if (stats->rel_pages < stats->empty_end_pages)
		elog(ERROR, "rel_pages %u < empty_end_pages %u",
					stats->rel_pages, stats->empty_end_pages);
}

static bool vacuum_appendonly_index_should_vacuum(Relation aoRelation,
		VacuumStmt *vacstmt,
		AppendOnlyIndexVacuumState *vacuumIndexState, double *rel_tuple_count)
{
	int64 hidden_tupcount;
	FileSegTotals *totals;

	Assert(RelationIsAoRows(aoRelation) || RelationIsAoCols(aoRelation));

	if(Gp_role == GP_ROLE_DISPATCH)
	{
		if (rel_tuple_count)
		{
			*rel_tuple_count = 0.0;
		}
		return false;
	}

	if(RelationIsAoRows(aoRelation))
	{
		totals = GetSegFilesTotals(aoRelation, SnapshotNow);
	}
	else
	{
		Assert(RelationIsAoCols(aoRelation));
		totals = GetAOCSSSegFilesTotals(aoRelation, SnapshotNow);
	}
	hidden_tupcount = AppendOnlyVisimap_GetRelationHiddenTupleCount(&vacuumIndexState->visiMap);

	if(rel_tuple_count)
	{
		*rel_tuple_count = (double)(totals->totaltuples - hidden_tupcount);
		Assert((*rel_tuple_count) > -1.0);
	}

	pfree(totals);

	if(hidden_tupcount > 0 || vacstmt->full)
	{
		return true;
	}
	return false;
}

/*
 * vacuum_appendonly_indexes()
 *
 * Perform a vacuum on all indexes of an append-only relation.
 *
 * The page and tuplecount information in vacrelstats are used, the
 * nindex value is set by this function.
 *
 * It returns the number of indexes on the relation.
 */
int
vacuum_appendonly_indexes(Relation aoRelation,
		VacuumStmt *vacstmt,
		List* updated_stats)
{
	int reindex_count = 1;
	int i;
	Relation   *Irel;
	int			nindexes;
	AppendOnlyIndexVacuumState vacuumIndexState;
	FileSegInfo **segmentFileInfo = NULL; /* Might be a casted AOCSFileSegInfo */
	int totalSegfiles;

	Assert(RelationIsAoRows(aoRelation) || RelationIsAoCols(aoRelation));
	Assert(vacstmt);

	memset(&vacuumIndexState, 0, sizeof(vacuumIndexState));

	elogif (Debug_appendonly_print_compaction, LOG,
			"Vacuum indexes for append-only relation %s",
			RelationGetRelationName(aoRelation));

	/* Now open all indexes of the relation */
	if (vacstmt->full)
		vac_open_indexes(aoRelation, AccessExclusiveLock, &nindexes, &Irel);
	else
		vac_open_indexes(aoRelation, RowExclusiveLock, &nindexes, &Irel);

	if (RelationIsAoRows(aoRelation))
	{
2307
		segmentFileInfo = GetAllFileSegInfo(aoRelation, SnapshotNow, &totalSegfiles);
2308 2309 2310 2311
	}
	else
	{
		Assert(RelationIsAoCols(aoRelation));
2312
		segmentFileInfo = (FileSegInfo **)GetAllAOCSFileSegInfo(aoRelation, SnapshotNow, &totalSegfiles);
2313 2314 2315 2316
	}

	AppendOnlyVisimap_Init(
			&vacuumIndexState.visiMap,
2317 2318
			aoRelation->rd_appendonly->visimaprelid,
			aoRelation->rd_appendonly->visimapidxid,
2319 2320 2321 2322 2323 2324 2325 2326 2327
			AccessShareLock,
			SnapshotNow);

	AppendOnlyBlockDirectory_Init_forSearch(&vacuumIndexState.blockDirectory,
			SnapshotNow,
			segmentFileInfo,
			totalSegfiles,
			aoRelation,
			1,
2328 2329
			RelationIsAoCols(aoRelation),
			NULL);
2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341

	/* Clean/scan index relation(s) */
	if (Irel != NULL)
	{
		double rel_tuple_count = 0.0;
		if (vacuum_appendonly_index_should_vacuum(aoRelation, vacstmt,
					&vacuumIndexState, &rel_tuple_count))
		{
			Assert(rel_tuple_count > -1.0);

			for (i = 0; i < nindexes; i++)
			{
2342
				vacuum_appendonly_index(Irel[i], &vacuumIndexState, updated_stats,
2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385
						rel_tuple_count, vacstmt->full);
			}
			reindex_count++;
		}
		else
		{
			/* just scan indexes to update statistic */
			for (i = 0; i < nindexes; i++)
				scan_index(Irel[i], rel_tuple_count, updated_stats, vacstmt->full, true);
		}
	}

	AppendOnlyVisimap_Finish(&vacuumIndexState.visiMap, AccessShareLock);
	AppendOnlyBlockDirectory_End_forSearch(&vacuumIndexState.blockDirectory);

	if (segmentFileInfo)
	{
		if (RelationIsAoRows(aoRelation))
		{
			FreeAllSegFileInfo(segmentFileInfo, totalSegfiles);
		}
		else
		{
			FreeAllAOCSSegFileInfo((AOCSFileSegInfo **)segmentFileInfo, totalSegfiles);
		}
		pfree(segmentFileInfo);
	}

	vac_close_indexes(nindexes, Irel, NoLock);
	return nindexes;
}

/*
 * vacuum_heap_rel()
 *
 * This is the workhorse of full_vacuum_rel for heap tables.  This is called
 * twice per relation per command.  In the first call, we scan the relation
 * first to identify dead tuples and find free spaces, then clean up indexes
 * and move tuples from end pages to head pages if available.  In the second,
 * vacstmt->truncate is true, and we scan the heap again to verify the empty
 * end pages are still empty, and truncate if so.  In the second transaction,
 * we don't check the number of tuple integrity with indexes.
 */
2386
static bool
2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401
vacuum_heap_rel(Relation onerel, VacuumStmt *vacstmt,
		VRelStats *vacrelstats, List *updated_stats)
{
	VacPageListData vacuum_pages;		/* List of pages to vacuum and/or
										 * clean indexes */
	VacPageListData fraged_pages =		/* List of pages with space enough for */
		{								/* re-using */
		0, /* empty_end_pages */
		0, /* num_pages */
		0, /* num_allocated_pages */
		NULL /* pageesc */
		};

	Relation   *Irel;
	int			nindexes;
2402 2403 2404
	int			i;
	bool		heldoff = false;
	int			reindex_count = 1;
2405
	bool		check_stats;
2406
	bool		save_disable_tuple_hints;
2407 2408 2409

	Assert(RelationIsHeap(onerel));

2410
	/*
2411
	 * scan the heap
2412
	 *
2413 2414 2415
	 * repair_frag() assumes that scan_heap() has set all hint bits on the
	 * tuples, so temporarily turn off 'gp_disable_tuple_hints', i.e. allow
	 * hint bits to be set, if we're running in FULL mode.
2416
	 */
2417 2418
	vacuum_pages.num_pages = fraged_pages.num_pages = 0;

2419 2420 2421 2422 2423 2424 2425 2426 2427 2428
	save_disable_tuple_hints = gp_disable_tuple_hints;
	PG_TRY();
	{
		if (vacstmt->full)
			gp_disable_tuple_hints = false;

		if (vacstmt->heap_truncate)
			scan_heap_for_truncate(vacrelstats, onerel, &vacuum_pages);
		else
			scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
2429

2430 2431 2432
		gp_disable_tuple_hints = save_disable_tuple_hints;
	}
	PG_CATCH();
2433
	{
2434 2435 2436 2437
		gp_disable_tuple_hints = save_disable_tuple_hints;
		PG_RE_THROW();
	}
	PG_END_TRY();
2438 2439 2440 2441 2442

	/* Now open all indexes of the relation */
	vac_open_indexes(onerel, AccessExclusiveLock, &nindexes, &Irel);
	if (nindexes > 0)
		vacrelstats->hasindex = true;
2443

2444
	/*
2445 2446 2447
	 * Since the truncate transaction doesn't read all pages, it may not be
	 * the exact number of tuples.  vacuum_index should not check the
	 * stat consistency.
2448
	 */
2449 2450 2451
	check_stats = !vacstmt->heap_truncate;
	/* Clean/scan index relation(s) */
	if (Irel != NULL)
2452
	{
2453 2454 2455 2456 2457
		if (vacuum_pages.num_pages > 0)
		{
			for (i = 0; i < nindexes; i++)
			{
				vacuum_index(&vacuum_pages, Irel[i],
2458
							 vacrelstats->rel_indexed_tuples, 0, updated_stats,
2459
							 check_stats);
2460 2461 2462 2463
			}
			reindex_count++;
		}
		else if (check_stats)
2464
		{
2465 2466
			/* just scan indexes to update statistic */
			for (i = 0; i < nindexes; i++)
2467
				scan_index(Irel[i], vacrelstats->rel_indexed_tuples, updated_stats, true,
2468
						   check_stats);
2469
		}
2470 2471
	}

2472
	/*
2473 2474 2475 2476 2477 2478
	 * For heap tables FULL vacuum we perform truncate-only transaction as
	 * the second step, after moving tuples across pages if any.  By
	 * separating transactions, we don't loose transactional changes
	 * by non-transactional truncate operation.  Note scan_heap still
	 * performs some xlog operation in non-empty pages, which is ok with
	 * this truncate operation in the same transaction.
2479
	 */
2480
	if (vacstmt->heap_truncate)
2481
	{
2482
		Assert(vacrelstats->rel_pages >= vacuum_pages.empty_end_pages);
2483

2484
		SIMPLE_FAULT_INJECTOR(VacuumFullBeforeTruncate);
2485

2486 2487 2488
		if (vacuum_pages.empty_end_pages > 0)
		{
			BlockNumber relblocks;
2489

2490 2491 2492 2493 2494
			relblocks = vacrelstats->rel_pages - vacuum_pages.empty_end_pages;
			RelationTruncate(onerel, relblocks, true);
			vacrelstats->rel_pages = relblocks;
		}
		vac_close_indexes(nindexes, Irel, NoLock);
2495

2496 2497 2498 2499 2500 2501 2502
		SIMPLE_FAULT_INJECTOR(VacuumFullAfterTruncate);
	}
	else
	{
		if (fraged_pages.num_pages > 0)
		{
			/* Try to shrink heap */
2503
			heldoff = repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
2504
								  nindexes, Irel, updated_stats, reindex_count);
2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515
			vac_close_indexes(nindexes, Irel, NoLock);
		}
		else
		{
			vac_close_indexes(nindexes, Irel, NoLock);
			if (vacuum_pages.num_pages > 0)
			{
				/* Clean pages from vacuum_pages list */
				vacuum_heap(vacrelstats, onerel, &vacuum_pages);
			}
		}
2516

2517 2518 2519 2520 2521 2522 2523 2524 2525
		/*
		 * Store the relation stats in global array, so that we can
		 * resume the truncate work later.
		 */
		save_vacstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
					  vacrelstats->rel_tuples, vacuum_pages.empty_end_pages);
		/* update shared free space map with final free space info */
		vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
	}
2526

2527
	return heldoff;
2528
}
2529 2530

/*
2531
 *	full_vacuum_rel() -- perform FULL VACUUM for one heap relation
2532
 *
2533 2534
 *		This routine vacuums a single heap, cleans out its indexes, and
 *		updates its num_pages and num_tuples statistics.
2535
 *
2536 2537
 *		At entry, we have already established a transaction and opened
 *		and locked the relation.
2538
 *
2539 2540
 *		The return value indicates whether this function has held off
 *		interrupts -- caller must RESUME_INTERRUPTS() after commit if true.
2541
 */
2542
static bool
2543
full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt, List *updated_stats)
2544
{
2545
	VRelStats* vacrelstats;
2546
	bool		heldoff = false;
2547
	bool update_relstats = true;
2548

2549
	vacuum_set_xid_limits(vacstmt->freeze_min_age, onerel->rd_rel->relisshared,
2550
						  &OldestXmin, &FreezeLimit);
2551

2552
	/*
2553 2554 2555 2556
	 * Flush any previous async-commit transactions.  This does not guarantee
	 * that we will be able to set hint bits for tuples they inserted, but it
	 * improves the probability, especially in simple sequential-commands
	 * cases.  See scan_heap() and repair_frag() for more about this.
2557
	 */
2558
	XLogAsyncCommitFlush();
2559

2560 2561 2562 2563 2564 2565 2566 2567 2568
	/*
	 * Set up statistics-gathering machinery.
	 */
	vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
	vacrelstats->rel_pages = 0;
	vacrelstats->rel_tuples = 0;
	vacrelstats->hasindex = false;

	if(RelationIsAoRows(onerel) || RelationIsAoCols(onerel))
2569
	{
2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590
		if(vacuumStatement_IsInAppendOnlyPreparePhase(vacstmt))
		{
			elogif(Debug_appendonly_print_compaction, LOG,
					"Vacuum full prepare phase %s", RelationGetRelationName(onerel));

			vacuum_appendonly_indexes(onerel, vacstmt, updated_stats);
			if (RelationIsAoRows(onerel))
				AppendOnlyTruncateToEOF(onerel);
			else
				AOCSTruncateToEOF(onerel);
			update_relstats = false;
		}
		else if(!vacummStatement_IsInAppendOnlyCleanupPhase(vacstmt))
		{
			vacuum_appendonly_rel(onerel, vacstmt);
			update_relstats = false;
		}
		else
		{
			elogif(Debug_appendonly_print_compaction, LOG,
					"Vacuum full cleanup phase %s", RelationGetRelationName(onerel));
2591 2592 2593 2594 2595 2596 2597 2598 2599
			vacuum_appendonly_fill_stats(onerel, ActiveSnapshot,
										 &vacrelstats->rel_pages,
										 &vacrelstats->rel_tuples,
										 &vacrelstats->hasindex);
			/* Reset the remaining VRelStats values */
			vacrelstats->min_tlen = 0;
			vacrelstats->max_tlen = 0;
			vacrelstats->num_vtlinks = 0;
			vacrelstats->vtlinks = NULL;
2600
		}
2601 2602 2603
	}
	else
	{
2604
		/* For heap. */
2605
		heldoff = vacuum_heap_rel(onerel, vacstmt, vacrelstats, updated_stats);
2606
	}
2607

2608 2609 2610 2611
	/* Do not run update the relstats if the vacuuming has been skipped */
	if (update_relstats)
	{
		/* update statistics in pg_class */
H
Heikki Linnakangas 已提交
2612
		vac_update_relstats_from_list(onerel, vacrelstats->rel_pages,
2613 2614
						vacrelstats->rel_tuples, vacrelstats->hasindex,
						FreezeLimit, updated_stats);
2615

2616 2617 2618 2619
		/* report results to the stats collector, too */
		pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared, true,
						 vacstmt->analyze, vacrelstats->rel_tuples);
	}
2620

2621
	pfree(vacrelstats);
2622

2623
	return heldoff;
2624
}
2625

2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646
/*
 * This is a small version of scan_heap, performed in the second transaction of
 * heap vacuum full.  We assume we did the first transaction and kept some of
 * the stats information already, so start from the last known truncate point,
 * and rescan to the end to see if they are still empty.  Note someone might
 * have already modified these pages before we come back from QD, in case of
 * catalog table, because concurrent DDL can go in QE even if QD is holding
 * an exclusive lock on the catalog table, and QE just releases locks between
 * separate transactions.
 *
 * We don't touch other pages than the ones that are potentially truncated.
 * Note index may also have such tuples that are inserted after the first
 * transaction, and it'd not be easy to clean them up all.  Here we just
 * focus on truncate.  We skip checking stats in scan_index or vacuum_index,
 * as our reltuples may not be exactly correct.
 */
static void
scan_heap_for_truncate(VRelStats *vacrelstats, Relation onerel,
					   VacPageList vacuum_pages)
{
	MIRROREDLOCK_BUFMGR_DECLARE;
2647

2648 2649 2650 2651 2652 2653 2654
	BlockNumber nblocks, blkno;
	char	   *relname;
	VacPage		vacpage;
	BlockNumber empty_end_pages;
	double		num_tuples;
	bool		do_shrinking = true;
	int			i;
2655
	VPgClassStats *prev_stats = NULL;
2656

2657
	relname = RelationGetRelationName(onerel);
2658

2659 2660
	empty_end_pages = 0;
	num_tuples = 0;
2661

2662
	nblocks = RelationGetNumberOfBlocks(onerel);
2663

2664
	vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
2665

2666 2667
	/* Fetch gp_persistent_relation_node information for XLOG. */
	RelationFetchGpRelationNodeForXLog(onerel);
2668

2669 2670 2671
	/* Retrieve the relation stats info from the previous transaction. */
	for (i = 0; i < VacFullInitialStatsSize; i++)
	{
2672 2673
		VPgClassStats *stats = &VacFullInitialStats[i];
		if (stats->relid == RelationGetRelid(onerel))
2674 2675 2676 2677 2678 2679 2680
		{
			prev_stats = stats;
			break;
		}
	}
	if (prev_stats == NULL)
		elog(ERROR, "could not find previous vacuum infomation for %s", relname);
2681

2682 2683 2684 2685 2686 2687 2688 2689 2690
	Assert(prev_stats->rel_pages >= prev_stats->empty_end_pages);
	blkno = prev_stats->rel_pages - prev_stats->empty_end_pages;
	for (; blkno < nblocks; blkno++)
	{
		Buffer		buf;
		Page		page;
		OffsetNumber offnum, maxoff;
		VacPage		vacpagecopy;
		bool		notup = true;
2691

2692
		vacuum_delay_point();
2693

2694 2695
		// -------- MirroredLock ----------
		MIRROREDLOCK_BUFMGR_LOCK;
2696

2697
		buf = ReadBufferWithStrategy(onerel, blkno, vac_strategy);
2698
		page = BufferGetPage(buf);
2699

2700
		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2701

2702 2703 2704
		vacpage->blkno = blkno;
		vacpage->offsets_used = 0;
		vacpage->offsets_free = 0;
2705

2706 2707 2708 2709 2710 2711 2712 2713 2714 2715
		/*
		 * If the page is empty, just remember it and delete index pointers
		 * later if there are any tuples pointing to this page.
		 */
		if (PageIsNew(page) || PageIsEmpty(page))
		{
			empty_end_pages++;
			vacpagecopy = copy_vac_page(vacpage);
			vpage_insert(vacuum_pages, vacpagecopy);
			UnlockReleaseBuffer(buf);
2716

2717 2718
			MIRROREDLOCK_BUFMGR_UNLOCK;
			// -------- MirroredLock ----------
2719

2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730
			continue;
		}

		maxoff = PageGetMaxOffsetNumber(page);
		for (offnum = FirstOffsetNumber;
			 offnum <= maxoff;
			 offnum = OffsetNumberNext(offnum))
		{
			ItemId		itemid = PageGetItemId(page, offnum);
			HeapTupleData	tuple;
			bool			tupgone = false;
2731

2732 2733
			if (!ItemIdIsUsed(itemid))
				continue;
2734

2735 2736
			if (ItemIdIsDead(itemid))
				continue;
2737

2738 2739 2740
			tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
			tuple.t_len = ItemIdGetLength(itemid);
			ItemPointerSet(&(tuple.t_self), blkno, offnum);
B
Bruce Momjian 已提交
2741

2742
			switch (HeapTupleSatisfiesVacuum(onerel, tuple.t_data, OldestXmin, buf))
2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761
			{
				case HEAPTUPLE_DEAD:
					tupgone = true;
					break;
				case HEAPTUPLE_LIVE:
				case HEAPTUPLE_RECENTLY_DEAD:
					break;
				case HEAPTUPLE_INSERT_IN_PROGRESS:
					ereport(LOG,
							(errmsg("relation \"%s\" TID %u/%u: InsertTransactionInProgress %u --- can't shrink relation",
									relname, blkno, offnum, HeapTupleHeaderGetXmin(tuple.t_data))));
					do_shrinking = false;
					break;
				case HEAPTUPLE_DELETE_IN_PROGRESS:
					ereport(LOG,
							(errmsg("relation \"%s\" TID %u/%u: DeleteTransactionInProgress %u --- can't shrink relation",
									relname, blkno, offnum, HeapTupleHeaderGetXmax(tuple.t_data))));
					do_shrinking = false;
					break;
2762

2763 2764 2765 2766
				default:
					elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
					break;
			}
B
Bruce Momjian 已提交
2767

2768 2769 2770 2771 2772 2773 2774 2775
			if (!tupgone)
			{
				num_tuples += 1;
				notup = false;
			}
		}

		if (notup)
2776
		{
2777 2778 2779
			empty_end_pages++;
			vacpagecopy = copy_vac_page(vacpage);
			vpage_insert(vacuum_pages, vacpagecopy);
2780 2781 2782
		}
		else
		{
2783 2784 2785 2786 2787 2788 2789 2790
			/*
			 * If we are seeing live tuples in those pages that should have
			 * been truncated in the previous transaction, someone already
			 * modified them.  In that case it's safer to not truncate
			 * at all.
			 */
			do_shrinking = false;
			empty_end_pages = 0;
2791
		}
2792 2793 2794 2795 2796

		UnlockReleaseBuffer(buf);

		MIRROREDLOCK_BUFMGR_UNLOCK;
		// -------- MirroredLock ----------
2797 2798
	}

2799 2800 2801 2802 2803
	pfree(vacpage);

	vacrelstats->rel_tuples = prev_stats->rel_tuples + num_tuples;
	vacrelstats->rel_pages = nblocks;
	if (!do_shrinking)
2804
	{
2805 2806 2807 2808 2809 2810
		int		i;

		vacuum_pages->empty_end_pages = 0;
		for (i = 0; i < vacuum_pages->num_pages; i++)
			pfree(vacuum_pages->pagedesc[i]);
		vacuum_pages->num_pages = 0;
2811
	}
2812 2813
	else
	{
2814
		vacuum_pages->empty_end_pages = empty_end_pages;
2815
	}
2816 2817
}

2818

2819
/*
B
Bruce Momjian 已提交
2820
 *	scan_heap() -- scan an open heap relation
2821
 *
2822 2823 2824 2825
 *		This routine sets commit status bits, constructs vacuum_pages (list
 *		of pages we need to compact free space on and/or clean indexes of
 *		deleted tuples), constructs fraged_pages (list of pages with free
 *		space that tuples could be moved into), and calculates statistics
2826
 *		on the number of live tuples in the heap.
2827 2828
 */
static void
B
Bruce Momjian 已提交
2829
scan_heap(VRelStats *vacrelstats, Relation onerel,
2830
		  VacPageList vacuum_pages, VacPageList fraged_pages)
2831
{
2832 2833
	MIRROREDLOCK_BUFMGR_DECLARE;

2834
	BlockNumber nblocks,
2835 2836
				blkno;
	char	   *relname;
B
Bruce Momjian 已提交
2837
	VacPage		vacpage;
2838
	BlockNumber empty_pages,
B
Bruce Momjian 已提交
2839
				empty_end_pages;
2840
	double		num_tuples,
2841
				num_indexed_tuples,
2842 2843
				tups_vacuumed,
				nkeep,
2844
				nunused;
2845 2846
	double		free_space,
				usable_free_space;
2847
	Size		min_tlen = MaxHeapTupleSize;
2848 2849
	Size		max_tlen = 0;
	bool		do_shrinking = true;
2850 2851 2852
	VTupleLink	vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
	int			num_vtlinks = 0;
	int			free_vtlinks = 100;
2853
	PGRUsage	ru0;
2854

2855
	pg_rusage_init(&ru0);
2856

2857
	relname = RelationGetRelationName(onerel);
2858 2859 2860 2861
	ereport(elevel,
			(errmsg("vacuuming \"%s.%s\"",
					get_namespace_name(RelationGetNamespace(onerel)),
					relname)));
2862

2863
	empty_pages = empty_end_pages = 0;
2864
	num_tuples = num_indexed_tuples = tups_vacuumed = nkeep = nunused = 0;
2865
	free_space = 0;
2866 2867 2868

	nblocks = RelationGetNumberOfBlocks(onerel);

2869 2870 2871 2872
	/*
	 * We initially create each VacPage item in a maximal-sized workspace,
	 * then copy the workspace into a just-large-enough copy.
	 */
B
Bruce Momjian 已提交
2873
	vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
2874

2875 2876 2877
	/* Fetch gp_persistent_relation_node information for XLOG. */
	RelationFetchGpRelationNodeForXLog(onerel);

2878 2879
	for (blkno = 0; blkno < nblocks; blkno++)
	{
2880 2881 2882 2883
		Page		page,
					tempPage = NULL;
		bool		do_reap,
					do_frag;
B
Bruce Momjian 已提交
2884 2885 2886
		Buffer		buf;
		OffsetNumber offnum,
					maxoff;
2887 2888 2889
		bool		notup;
		OffsetNumber frozen[MaxOffsetNumber];
		int			nfrozen;
2890

2891
		vacuum_delay_point();
2892

2893 2894 2895
		// -------- MirroredLock ----------
		MIRROREDLOCK_BUFMGR_LOCK;

2896
		buf = ReadBufferWithStrategy(onerel, blkno, vac_strategy);
2897
		page = BufferGetPage(buf);
2898

2899
		/*
2900
		 * Since we are holding exclusive lock on the relation, no other
B
Bruce Momjian 已提交
2901 2902 2903 2904
		 * backend can be accessing the page; however it is possible that the
		 * background writer will try to write the page if it's already marked
		 * dirty.  To ensure that invalid data doesn't get written to disk, we
		 * must take exclusive buffer lock wherever we potentially modify
2905
		 * pages.  In fact, we insist on cleanup lock so that we can safely
B
Bruce Momjian 已提交
2906 2907 2908 2909 2910
		 * call heap_page_prune().	(This might be overkill, since the
		 * bgwriter pays no attention to individual tuples, but on the other
		 * hand it's unlikely that the bgwriter has this particular page
		 * pinned at this instant.	So violating the coding rule would buy us
		 * little anyway.)
2911
		 */
2912
		LockBufferForCleanup(buf);
2913

B
Bruce Momjian 已提交
2914
		vacpage->blkno = blkno;
2915
		vacpage->offsets_used = 0;
B
Bruce Momjian 已提交
2916
		vacpage->offsets_free = 0;
2917

2918 2919
		if (PageIsNew(page))
		{
B
Bruce Momjian 已提交
2920
			VacPage		vacpagecopy;
B
Bruce Momjian 已提交
2921

2922
			ereport(WARNING,
B
Bruce Momjian 已提交
2923 2924
			   (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
					   relname, blkno)));
2925
			PageInit(page, BufferGetPageSize(buf), 0);
2926
			MarkBufferDirty(buf);
B
Bruce Momjian 已提交
2927
			vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
2928 2929
			free_space += vacpage->free;
			empty_pages++;
B
Bruce Momjian 已提交
2930
			empty_end_pages++;
2931 2932 2933
			vacpagecopy = copy_vac_page(vacpage);
			vpage_insert(vacuum_pages, vacpagecopy);
			vpage_insert(fraged_pages, vacpagecopy);
2934
			UnlockReleaseBuffer(buf);
2935 2936 2937 2938

			MIRROREDLOCK_BUFMGR_UNLOCK;
			// -------- MirroredLock ----------

2939
			continue;
V
Vadim B. Mikheev 已提交
2940
		}
2941 2942

		if (PageIsEmpty(page))
2943
		{
B
Bruce Momjian 已提交
2944
			VacPage		vacpagecopy;
B
Bruce Momjian 已提交
2945

B
Bruce Momjian 已提交
2946
			vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
2947
			free_space += vacpage->free;
B
Bruce Momjian 已提交
2948
			empty_pages++;
B
Bruce Momjian 已提交
2949
			empty_end_pages++;
2950 2951 2952
			vacpagecopy = copy_vac_page(vacpage);
			vpage_insert(vacuum_pages, vacpagecopy);
			vpage_insert(fraged_pages, vacpagecopy);
2953
			UnlockReleaseBuffer(buf);
2954 2955 2956 2957

			MIRROREDLOCK_BUFMGR_UNLOCK;
			// -------- MirroredLock ----------

2958
			continue;
2959 2960
		}

B
Bruce Momjian 已提交
2961
		/*
2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972
		 * Prune all HOT-update chains in this page.
		 *
		 * We use the redirect_move option so that redirecting line pointers
		 * get collapsed out; this allows us to not worry about them below.
		 *
		 * We count tuples removed by the pruning step as removed by VACUUM.
		 */
		tups_vacuumed += heap_page_prune(onerel, buf, OldestXmin,
										 true, false);

		/*
B
Bruce Momjian 已提交
2973 2974
		 * Now scan the page to collect vacuumable items and check for tuples
		 * requiring freezing.
2975
		 */
2976
		nfrozen = 0;
2977 2978 2979 2980 2981
		notup = true;
		maxoff = PageGetMaxOffsetNumber(page);
		for (offnum = FirstOffsetNumber;
			 offnum <= maxoff;
			 offnum = OffsetNumberNext(offnum))
2982
		{
B
Bruce Momjian 已提交
2983 2984
			ItemId		itemid = PageGetItemId(page, offnum);
			bool		tupgone = false;
2985
			HeapTupleData tuple;
2986 2987

			/*
2988
			 * Collect un-used items too - it's possible to have indexes
B
Bruce Momjian 已提交
2989 2990 2991
			 * pointing here after crash.  (That's an ancient comment and is
			 * likely obsolete with WAL, but we might as well continue to
			 * check for such problems.)
2992 2993 2994
			 */
			if (!ItemIdIsUsed(itemid))
			{
B
Bruce Momjian 已提交
2995
				vacpage->offsets[vacpage->offsets_free++] = offnum;
2996
				nunused += 1;
2997 2998 2999
				continue;
			}

3000 3001
			/*
			 * DEAD item pointers are to be vacuumed normally; but we don't
B
Bruce Momjian 已提交
3002 3003 3004
			 * count them in tups_vacuumed, else we'd be double-counting (at
			 * least in the common case where heap_page_prune() just freed up
			 * a non-HOT tuple).
3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016
			 */
			if (ItemIdIsDead(itemid))
			{
				vacpage->offsets[vacpage->offsets_free++] = offnum;
				continue;
			}

			/* Shouldn't have any redirected items anymore */
			if (!ItemIdIsNormal(itemid))
				elog(ERROR, "relation \"%s\" TID %u/%u: unexpected redirect item",
					 relname, blkno, offnum);

3017 3018 3019
			tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
			tuple.t_len = ItemIdGetLength(itemid);
			ItemPointerSet(&(tuple.t_self), blkno, offnum);
3020

3021
			switch (HeapTupleSatisfiesVacuum(onerel, tuple.t_data, OldestXmin, buf))
3022
			{
3023
				case HEAPTUPLE_LIVE:
3024
					/* Tuple is good --- but let's do some validity checks */
3025 3026 3027 3028
					if (onerel->rd_rel->relhasoids &&
						!OidIsValid(HeapTupleGetOid(&tuple)))
						elog(WARNING, "relation \"%s\" TID %u/%u: OID is invalid",
							 relname, blkno, offnum);
B
Bruce Momjian 已提交
3029

3030 3031 3032 3033 3034
					/*
					 * The shrinkage phase of VACUUM FULL requires that all
					 * live tuples have XMIN_COMMITTED set --- see comments in
					 * repair_frag()'s walk-along-page loop.  Use of async
					 * commit may prevent HeapTupleSatisfiesVacuum from
B
Bruce Momjian 已提交
3035
					 * setting the bit for a recently committed tuple.	Rather
3036 3037 3038 3039 3040 3041 3042 3043 3044
					 * than trying to handle this corner case, we just give up
					 * and don't shrink.
					 */
					if (do_shrinking &&
						!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
					{
						ereport(LOG,
								(errmsg("relation \"%s\" TID %u/%u: XMIN_COMMITTED not set for transaction %u --- cannot shrink relation",
										relname, blkno, offnum,
B
Bruce Momjian 已提交
3045
									 HeapTupleHeaderGetXmin(tuple.t_data))));
3046 3047 3048 3049
						do_shrinking = false;
					}
					break;
				case HEAPTUPLE_DEAD:
B
Bruce Momjian 已提交
3050

3051
					/*
3052 3053 3054 3055 3056
					 * Ordinarily, DEAD tuples would have been removed by
					 * heap_page_prune(), but it's possible that the tuple
					 * state changed since heap_page_prune() looked.  In
					 * particular an INSERT_IN_PROGRESS tuple could have
					 * changed to DEAD if the inserter aborted.  So this
B
Bruce Momjian 已提交
3057 3058
					 * cannot be considered an error condition, though it does
					 * suggest that someone released a lock early.
3059 3060 3061 3062
					 *
					 * If the tuple is HOT-updated then it must only be
					 * removed by a prune operation; so we keep it as if it
					 * were RECENTLY_DEAD, and abandon shrinking. (XXX is it
B
Bruce Momjian 已提交
3063 3064
					 * worth trying to make the shrinking code smart enough to
					 * handle this?  It's an unusual corner case.)
3065 3066 3067
					 *
					 * DEAD heap-only tuples can safely be removed if they
					 * aren't themselves HOT-updated, although this is a bit
B
Bruce Momjian 已提交
3068 3069
					 * inefficient since we'll uselessly try to remove index
					 * entries for them.
3070
					 */
3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081
					if (HeapTupleIsHotUpdated(&tuple))
					{
						nkeep += 1;
						if (do_shrinking)
							ereport(LOG,
									(errmsg("relation \"%s\" TID %u/%u: dead HOT-updated tuple --- cannot shrink relation",
											relname, blkno, offnum)));
						do_shrinking = false;
					}
					else
					{
B
Bruce Momjian 已提交
3082 3083
						tupgone = true; /* we can delete the tuple */

3084 3085 3086 3087 3088 3089 3090
						/*
						 * We need not require XMIN_COMMITTED or
						 * XMAX_COMMITTED to be set, since we will remove the
						 * tuple without any further examination of its hint
						 * bits.
						 */
					}
3091 3092
					break;
				case HEAPTUPLE_RECENTLY_DEAD:
3093

3094
					/*
B
Bruce Momjian 已提交
3095 3096
					 * If tuple is recently deleted then we must not remove it
					 * from relation.
3097
					 */
3098
					nkeep += 1;
3099

3100
					/*
B
Bruce Momjian 已提交
3101 3102
					 * As with the LIVE case, shrinkage requires
					 * XMIN_COMMITTED to be set.
3103 3104 3105 3106 3107 3108 3109
					 */
					if (do_shrinking &&
						!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
					{
						ereport(LOG,
								(errmsg("relation \"%s\" TID %u/%u: XMIN_COMMITTED not set for transaction %u --- cannot shrink relation",
										relname, blkno, offnum,
B
Bruce Momjian 已提交
3110
									 HeapTupleHeaderGetXmin(tuple.t_data))));
3111 3112 3113
						do_shrinking = false;
					}

3114
					/*
B
Bruce Momjian 已提交
3115 3116
					 * If we do shrinking and this tuple is updated one then
					 * remember it to construct updated tuple dependencies.
3117
					 */
3118 3119 3120
					if (do_shrinking &&
						!(ItemPointerEquals(&(tuple.t_self),
											&(tuple.t_data->t_ctid))))
3121 3122 3123 3124
					{
						if (free_vtlinks == 0)
						{
							free_vtlinks = 1000;
B
Bruce Momjian 已提交
3125
							vtlinks = (VTupleLink) repalloc(vtlinks,
B
Bruce Momjian 已提交
3126 3127
											   (free_vtlinks + num_vtlinks) *
													 sizeof(VTupleLinkData));
3128 3129 3130 3131 3132 3133
						}
						vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
						vtlinks[num_vtlinks].this_tid = tuple.t_self;
						free_vtlinks--;
						num_vtlinks++;
					}
3134 3135
					break;
				case HEAPTUPLE_INSERT_IN_PROGRESS:
3136

3137
					/*
B
Bruce Momjian 已提交
3138
					 * This should not happen, since we hold exclusive lock on
3139
					 * the relation; shouldn't we raise an error?  (Actually,
B
Bruce Momjian 已提交
3140
					 * it can happen in system catalogs, since we tend to
B
Bruce Momjian 已提交
3141 3142
					 * release write lock before commit there.)  As above, we
					 * can't apply repair_frag() if the tuple state is
3143
					 * uncertain.
3144
					 */
3145 3146 3147 3148
					if (do_shrinking)
						ereport(LOG,
								(errmsg("relation \"%s\" TID %u/%u: InsertTransactionInProgress %u --- cannot shrink relation",
										relname, blkno, offnum,
B
Bruce Momjian 已提交
3149
									 HeapTupleHeaderGetXmin(tuple.t_data))));
3150 3151 3152
					do_shrinking = false;
					break;
				case HEAPTUPLE_DELETE_IN_PROGRESS:
3153

3154
					/*
B
Bruce Momjian 已提交
3155
					 * This should not happen, since we hold exclusive lock on
3156
					 * the relation; shouldn't we raise an error?  (Actually,
B
Bruce Momjian 已提交
3157
					 * it can happen in system catalogs, since we tend to
B
Bruce Momjian 已提交
3158 3159
					 * release write lock before commit there.)  As above, we
					 * can't apply repair_frag() if the tuple state is
3160
					 * uncertain.
3161
					 */
3162 3163 3164 3165
					if (do_shrinking)
						ereport(LOG,
								(errmsg("relation \"%s\" TID %u/%u: DeleteTransactionInProgress %u --- cannot shrink relation",
										relname, blkno, offnum,
B
Bruce Momjian 已提交
3166
									 HeapTupleHeaderGetXmax(tuple.t_data))));
3167 3168 3169
					do_shrinking = false;
					break;
				default:
3170
					elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
3171
					break;
3172 3173 3174 3175
			}

			if (tupgone)
			{
3176
				ItemId		lpp;
3177

3178
				/*
B
Bruce Momjian 已提交
3179 3180
				 * Here we are building a temporary copy of the page with dead
				 * tuples removed.	Below we will apply
3181
				 * PageRepairFragmentation to the copy, so that we can
B
Bruce Momjian 已提交
3182 3183 3184
				 * determine how much space will be available after removal of
				 * dead tuples.  But note we are NOT changing the real page
				 * yet...
3185
				 */
3186
				if (tempPage == NULL)
3187
				{
3188
					Size		pageSize;
3189 3190 3191

					pageSize = PageGetPageSize(page);
					tempPage = (Page) palloc(pageSize);
3192
					memcpy(tempPage, page, pageSize);
3193 3194
				}

3195
				/* mark it unused on the temp page */
3196
				lpp = PageGetItemId(tempPage, offnum);
3197
				ItemIdSetUnused(lpp);
3198

B
Bruce Momjian 已提交
3199
				vacpage->offsets[vacpage->offsets_free++] = offnum;
3200
				tups_vacuumed += 1;
3201 3202 3203
			}
			else
			{
3204
				num_tuples += 1;
3205 3206
				if (!HeapTupleIsHeapOnly(&tuple))
					num_indexed_tuples += 1;
3207
				notup = false;
3208 3209 3210 3211
				if (tuple.t_len < min_tlen)
					min_tlen = tuple.t_len;
				if (tuple.t_len > max_tlen)
					max_tlen = tuple.t_len;
3212

B
Bruce Momjian 已提交
3213
				/*
B
Bruce Momjian 已提交
3214 3215
				 * Each non-removable tuple must be checked to see if it needs
				 * freezing.
3216
				 */
3217 3218 3219
				if (heap_freeze_tuple(tuple.t_data, FreezeLimit,
									  InvalidBuffer))
					frozen[nfrozen++] = offnum;
3220
			}
3221
		}						/* scan along page */
3222

3223
		if (tempPage != NULL)
3224 3225
		{
			/* Some tuples are removable; figure free space after removal */
3226
			PageRepairFragmentation(tempPage);
B
Bruce Momjian 已提交
3227
			vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, tempPage);
3228
			pfree(tempPage);
3229
			do_reap = true;
3230
		}
3231 3232 3233
		else
		{
			/* Just use current available space */
B
Bruce Momjian 已提交
3234
			vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
3235
			/* Need to reap the page if it has UNUSED or DEAD line pointers */
3236
			do_reap = (vacpage->offsets_free > 0);
V
Vadim B. Mikheev 已提交
3237
		}
3238

3239
		free_space += vacpage->free;
3240

3241
		/*
3242 3243 3244 3245 3246 3247 3248 3249 3250
		 * Add the page to vacuum_pages if it requires reaping, and add it to
		 * fraged_pages if it has a useful amount of free space.  "Useful"
		 * means enough for a minimal-sized tuple.  But we don't know that
		 * accurately near the start of the relation, so add pages
		 * unconditionally if they have >= BLCKSZ/10 free space.  Also
		 * forcibly add pages with no live tuples, to avoid confusing the
		 * empty_end_pages logic.  (In the presence of unreasonably small
		 * fillfactor, it seems possible that such pages might not pass
		 * the free-space test, but they had better be in the list anyway.)
3251
		 */
3252 3253
		do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10 ||
				   notup);
3254 3255

		if (do_reap || do_frag)
3256
		{
B
Bruce Momjian 已提交
3257 3258
			VacPage		vacpagecopy = copy_vac_page(vacpage);

3259 3260 3261 3262
			if (do_reap)
				vpage_insert(vacuum_pages, vacpagecopy);
			if (do_frag)
				vpage_insert(fraged_pages, vacpagecopy);
3263 3264
		}

3265 3266
		/*
		 * Include the page in empty_end_pages if it will be empty after
B
Bruce Momjian 已提交
3267
		 * vacuuming; this is to keep us from using it as a move destination.
3268
		 * Note that such pages are guaranteed to be in fraged_pages.
3269
		 */
3270
		if (notup)
3271 3272
		{
			empty_pages++;
B
Bruce Momjian 已提交
3273
			empty_end_pages++;
3274
		}
3275
		else
B
Bruce Momjian 已提交
3276
			empty_end_pages = 0;
3277

3278 3279 3280 3281 3282 3283 3284
		/*
		 * If we froze any tuples, mark the buffer dirty, and write a WAL
		 * record recording the changes.  We must log the changes to be
		 * crash-safe against future truncation of CLOG.
		 */
		if (nfrozen > 0)
		{
3285
			MarkBufferDirty(buf);
3286 3287 3288 3289 3290 3291 3292 3293 3294 3295 3296 3297
			/* no XLOG for temp tables, though */
			if (!onerel->rd_istemp)
			{
				XLogRecPtr	recptr;

				recptr = log_heap_freeze(onerel, buf, FreezeLimit,
										 frozen, nfrozen);
				PageSetLSN(page, recptr);
				PageSetTLI(page, ThisTimeLineID);
			}
		}

3298
		UnlockReleaseBuffer(buf);
3299 3300 3301 3302

		MIRROREDLOCK_BUFMGR_UNLOCK;
		// -------- MirroredLock ----------

3303 3304
	}

B
Bruce Momjian 已提交
3305
	pfree(vacpage);
3306 3307

	/* save stats in the rel list for use later */
3308
	vacrelstats->rel_tuples = num_tuples;
3309
	vacrelstats->rel_indexed_tuples = num_indexed_tuples;
3310
	vacrelstats->rel_pages = nblocks;
B
Bruce Momjian 已提交
3311
	if (num_tuples == 0)
3312 3313 3314 3315
		min_tlen = max_tlen = 0;
	vacrelstats->min_tlen = min_tlen;
	vacrelstats->max_tlen = max_tlen;

B
Bruce Momjian 已提交
3316 3317
	vacuum_pages->empty_end_pages = empty_end_pages;
	fraged_pages->empty_end_pages = empty_end_pages;
3318 3319

	/*
3320 3321 3322
	 * Clear the fraged_pages list if we found we couldn't shrink. Else,
	 * remove any "empty" end-pages from the list, and compute usable free
	 * space = free space in remaining pages.
3323
	 */
3324
	if (do_shrinking)
3325
	{
B
Bruce Momjian 已提交
3326 3327
		int			i;

3328 3329
		Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
		fraged_pages->num_pages -= empty_end_pages;
3330
		usable_free_space = 0;
3331
		for (i = 0; i < fraged_pages->num_pages; i++)
3332
			usable_free_space += fraged_pages->pagedesc[i]->free;
3333 3334 3335 3336
	}
	else
	{
		fraged_pages->num_pages = 0;
3337
		usable_free_space = 0;
V
Vadim B. Mikheev 已提交
3338
	}
3339

3340 3341
	/* don't bother to save vtlinks if we will not call repair_frag */
	if (fraged_pages->num_pages > 0 && num_vtlinks > 0)
3342
	{
B
Bruce Momjian 已提交
3343
		qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
B
Bruce Momjian 已提交
3344
			  vac_cmp_vtlinks);
3345 3346 3347 3348 3349 3350 3351 3352 3353 3354
		vacrelstats->vtlinks = vtlinks;
		vacrelstats->num_vtlinks = num_vtlinks;
	}
	else
	{
		vacrelstats->vtlinks = NULL;
		vacrelstats->num_vtlinks = 0;
		pfree(vtlinks);
	}

3355
	ereport(elevel,
3356
			(errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u pages",
3357 3358
					RelationGetRelationName(onerel),
					tups_vacuumed, num_tuples, nblocks),
3359
			 errdetail("%.0f dead row versions cannot be removed yet.\n"
B
Bruce Momjian 已提交
3360
			  "Nonremovable row versions range from %lu to %lu bytes long.\n"
3361
					   "There were %.0f unused item pointers.\n"
B
Bruce Momjian 已提交
3362
	   "Total free space (including removable row versions) is %.0f bytes.\n"
3363
					   "%u pages are or will become empty, including %u at the end of the table.\n"
B
Bruce Momjian 已提交
3364
	 "%u pages containing %.0f free bytes are potential move destinations.\n"
3365
					   "%s.",
3366 3367 3368 3369 3370 3371
					   nkeep,
					   (unsigned long) min_tlen, (unsigned long) max_tlen,
					   nunused,
					   free_space,
					   empty_pages, empty_end_pages,
					   fraged_pages->num_pages, usable_free_space,
3372
					   pg_rusage_show(&ru0))));
B
Bruce Momjian 已提交
3373
}
V
Vadim B. Mikheev 已提交
3374

3375 3376

/*
B
Bruce Momjian 已提交
3377
 *	repair_frag() -- try to repair relation's fragmentation
3378
 *
3379
 *		This routine marks dead tuples as unused and tries re-use dead space
3380 3381
 *		by moving tuples (and inserting indexes if needed). It constructs
 *		Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
3382 3383 3384
 *		for them after committing (in hack-manner - without losing locks
 *		and freeing memory!) current transaction. It truncates relation
 *		if some end-blocks are gone away.
3385 3386 3387
 *
 *		The return value indicates whether this function has held off
 *		interrupts -- caller must RESUME_INTERRUPTS() after commit if true.
3388
 */
3389
static bool
B
Bruce Momjian 已提交
3390
repair_frag(VRelStats *vacrelstats, Relation onerel,
B
Bruce Momjian 已提交
3391
			VacPageList vacuum_pages, VacPageList fraged_pages,
3392
			int nindexes, Relation *Irel, List *updated_stats,
3393
			int reindex_count)
3394
{
3395 3396
	MIRROREDLOCK_BUFMGR_DECLARE;

B
Bruce Momjian 已提交
3397 3398
	TransactionId myXID = GetCurrentTransactionId();
	Buffer		dst_buffer = InvalidBuffer;
3399
	BlockNumber nblocks,
3400
				blkno;
3401
	BlockNumber last_move_dest_block = 0,
3402
				last_vacuum_block;
B
Bruce Momjian 已提交
3403
	Page		dst_page = NULL;
B
Bruce Momjian 已提交
3404
	ExecContextData ec;
3405
	VacPageListData Nvacpagelist = {0, 0, 0, NULL};
B
Bruce Momjian 已提交
3406
	VacPage		dst_vacpage = NULL,
B
Bruce Momjian 已提交
3407
				last_vacuum_page,
B
Bruce Momjian 已提交
3408 3409
				vacpage,
			   *curpage;
3410
	int			i;
B
Bruce Momjian 已提交
3411
	int			num_moved = 0,
B
Bruce Momjian 已提交
3412 3413
				num_fraged_pages,
				vacuumed_pages;
B
Bruce Momjian 已提交
3414
	int			keep_tuples = 0;
3415
	int			keep_indexed_tuples = 0;
3416
	PGRUsage	ru0;
3417
	bool		heldoff = false;
3418

3419
	pg_rusage_init(&ru0);
3420

3421 3422 3423
	// Fetch gp_persistent_relation_node information that will be added to XLOG record.
	RelationFetchGpRelationNodeForXLog(onerel);

B
Bruce Momjian 已提交
3424
	ExecContext_Init(&ec, onerel);
3425

B
Bruce Momjian 已提交
3426 3427
	Nvacpagelist.num_pages = 0;
	num_fraged_pages = fraged_pages->num_pages;
3428
	Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
B
Bruce Momjian 已提交
3429
	vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
3430 3431 3432 3433 3434 3435 3436 3437 3438 3439 3440
	if (vacuumed_pages > 0)
	{
		/* get last reaped page from vacuum_pages */
		last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
		last_vacuum_block = last_vacuum_page->blkno;
	}
	else
	{
		last_vacuum_page = NULL;
		last_vacuum_block = InvalidBlockNumber;
	}
3441

B
Bruce Momjian 已提交
3442 3443
	vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
	vacpage->offsets_used = vacpage->offsets_free = 0;
3444

3445
	/*
B
Bruce Momjian 已提交
3446 3447 3448 3449 3450
	 * Scan pages backwards from the last nonempty page, trying to move tuples
	 * down to lower pages.  Quit when we reach a page that we have moved any
	 * tuples onto, or the first page if we haven't moved anything, or when we
	 * find a page we cannot completely empty (this last condition is handled
	 * by "break" statements within the loop).
3451
	 *
3452 3453
	 * NB: this code depends on the vacuum_pages and fraged_pages lists being
	 * in order by blkno.
3454
	 */
3455
	nblocks = vacrelstats->rel_pages;
B
Bruce Momjian 已提交
3456
	for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
3457 3458
		 blkno > last_move_dest_block;
		 blkno--)
V
Vadim B. Mikheev 已提交
3459
	{
B
Bruce Momjian 已提交
3460 3461 3462 3463 3464 3465
		Buffer		buf;
		Page		page;
		OffsetNumber offnum,
					maxoff;
		bool		isempty,
					chain_tuple_moved;
B
Bruce Momjian 已提交
3466

3467
		vacuum_delay_point();
3468

3469
		/*
B
Bruce Momjian 已提交
3470 3471 3472 3473
		 * Forget fraged_pages pages at or after this one; they're no longer
		 * useful as move targets, since we only want to move down. Note that
		 * since we stop the outer loop at last_move_dest_block, pages removed
		 * here cannot have had anything moved onto them already.
3474
		 *
3475 3476 3477
		 * Also note that we don't change the stored fraged_pages list, only
		 * our local variable num_fraged_pages; so the forgotten pages are
		 * still available to be loaded into the free space map later.
3478 3479
		 */
		while (num_fraged_pages > 0 &&
B
Bruce Momjian 已提交
3480
			   fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
3481
		{
3482
			Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
3483 3484 3485
			--num_fraged_pages;
		}

3486 3487 3488
		/*
		 * Process this page of relation.
		 */
3489 3490 3491 3492

		// -------- MirroredLock ----------
		MIRROREDLOCK_BUFMGR_LOCK;

3493
		buf = ReadBufferWithStrategy(onerel, blkno, vac_strategy);
3494 3495
		page = BufferGetPage(buf);

B
Bruce Momjian 已提交
3496
		vacpage->offsets_free = 0;
3497 3498 3499

		isempty = PageIsEmpty(page);

3500 3501
		/* Is the page in the vacuum_pages list? */
		if (blkno == last_vacuum_block)
3502
		{
3503 3504 3505
			if (last_vacuum_page->offsets_free > 0)
			{
				/* there are dead tuples on this page - clean them */
3506
				Assert(!isempty);
3507 3508 3509
				LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
				vacuum_page(onerel, buf, last_vacuum_page);
				LockBuffer(buf, BUFFER_LOCK_UNLOCK);
3510 3511 3512
			}
			else
				Assert(isempty);
B
Bruce Momjian 已提交
3513
			--vacuumed_pages;
3514 3515
			if (vacuumed_pages > 0)
			{
B
Bruce Momjian 已提交
3516
				/* get prev reaped page from vacuum_pages */
B
Bruce Momjian 已提交
3517 3518
				last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
				last_vacuum_block = last_vacuum_page->blkno;
3519 3520
			}
			else
3521
			{
3522
				last_vacuum_page = NULL;
3523
				last_vacuum_block = InvalidBlockNumber;
3524
			}
3525 3526
			if (isempty)
			{
3527 3528 3529 3530

				MIRROREDLOCK_BUFMGR_UNLOCK;
				// -------- MirroredLock ----------

3531 3532 3533 3534 3535 3536 3537
				ReleaseBuffer(buf);
				continue;
			}
		}
		else
			Assert(!isempty);

B
Bruce Momjian 已提交
3538 3539
		chain_tuple_moved = false;		/* no one chain-tuple was moved off
										 * this page, yet */
B
Bruce Momjian 已提交
3540
		vacpage->blkno = blkno;
3541 3542 3543 3544 3545
		maxoff = PageGetMaxOffsetNumber(page);
		for (offnum = FirstOffsetNumber;
			 offnum <= maxoff;
			 offnum = OffsetNumberNext(offnum))
		{
B
Bruce Momjian 已提交
3546 3547 3548
			Size		tuple_len;
			HeapTupleData tuple;
			ItemId		itemid = PageGetItemId(page, offnum);
3549 3550 3551 3552

			if (!ItemIdIsUsed(itemid))
				continue;

3553 3554 3555 3556 3557 3558 3559 3560 3561 3562
			if (ItemIdIsDead(itemid))
			{
				/* just remember it for vacuum_page() */
				vacpage->offsets[vacpage->offsets_free++] = offnum;
				continue;
			}

			/* Shouldn't have any redirected items now */
			Assert(ItemIdIsNormal(itemid));

3563 3564 3565
			tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
			tuple_len = tuple.t_len = ItemIdGetLength(itemid);
			ItemPointerSet(&(tuple.t_self), blkno, offnum);
3566

B
Bruce Momjian 已提交
3567
			/* ---
3568 3569
			 * VACUUM FULL has an exclusive lock on the relation.  So
			 * normally no other transaction can have pending INSERTs or
B
Bruce Momjian 已提交
3570
			 * DELETEs in this relation.  A tuple is either:
3571
			 *		(a) live (XMIN_COMMITTED)
B
Bruce Momjian 已提交
3572 3573
			 *		(b) known dead (XMIN_INVALID, or XMAX_COMMITTED and xmax
			 *			is visible to all active transactions)
3574 3575 3576 3577 3578 3579 3580 3581 3582 3583 3584 3585
			 *		(c) inserted and deleted (XMIN_COMMITTED+XMAX_COMMITTED)
			 *			but at least one active transaction does not see the
			 *			deleting transaction (ie, it's RECENTLY_DEAD)
			 *		(d) moved by the currently running VACUUM
			 *		(e) inserted or deleted by a not yet committed transaction,
			 *			or by a transaction we couldn't set XMIN_COMMITTED for.
			 * In case (e) we wouldn't be in repair_frag() at all, because
			 * scan_heap() detects those cases and shuts off shrinking.
			 * We can't see case (b) here either, because such tuples were
			 * already removed by vacuum_page().  Cases (a) and (c) are
			 * normal and will have XMIN_COMMITTED set.  Case (d) is only
			 * possible if a whole tuple chain has been moved while
3586
			 * processing this or a higher numbered block.
B
Bruce Momjian 已提交
3587
			 * ---
B
Bruce Momjian 已提交
3588
			 */
3589 3590 3591 3592 3593 3594 3595 3596 3597 3598 3599 3600 3601 3602 3603 3604

			/*
			 * In PostgreSQL, we assume that the first pass of vacuum already
			 * set the hint bit. However, we cannot rely on that in GPDB,
			 * because of gp_disable_tuple_hints GUC. If it's ever set, then
			 * the first pass might've seen that all the hint bits on the page
			 * were already set, but the backend that set those bits didn't
			 * mark the buffer as dirty. If the buffer is subsequently evicted
			 * from the buffer cache, the hint bit updates are lost, and we
			 * will see them as not set here, even though they were set in the
			 * first pass.
			 *
			 * To fix that, just call HeapTupleSatisfiesVacuum() here to set
			 * the hint bits again, if not set already.
			 */
			LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
3605
			(void) HeapTupleSatisfiesVacuum(onerel, tuple.t_data, OldestXmin, buf);
3606 3607
			LockBuffer(buf, BUFFER_LOCK_UNLOCK);

3608 3609
			if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
			{
3610 3611 3612 3613 3614
				if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
					elog(ERROR, "HEAP_MOVED_IN was not expected");
				if (!(tuple.t_data->t_infomask & HEAP_MOVED_OFF))
					elog(ERROR, "HEAP_MOVED_OFF was expected");

B
Bruce Momjian 已提交
3615
				/*
3616 3617
				 * MOVED_OFF by another VACUUM would have caused the
				 * visibility check to set XMIN_COMMITTED or XMIN_INVALID.
B
Bruce Momjian 已提交
3618
				 */
3619 3620
				if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
					elog(ERROR, "invalid XVAC in tuple header");
B
Bruce Momjian 已提交
3621 3622

				/*
B
Bruce Momjian 已提交
3623 3624 3625
				 * If this (chain) tuple is moved by me already then I have to
				 * check is it in vacpage or not - i.e. is it moved while
				 * cleaning this page or some previous one.
3626
				 */
B
Bruce Momjian 已提交
3627 3628 3629 3630

				/* Can't we Assert(keep_tuples > 0) here? */
				if (keep_tuples == 0)
					continue;
3631 3632 3633
				if (chain_tuple_moved)
				{
					/* some chains were moved while cleaning this page */
B
Bruce Momjian 已提交
3634 3635 3636 3637 3638
					Assert(vacpage->offsets_free > 0);
					for (i = 0; i < vacpage->offsets_free; i++)
					{
						if (vacpage->offsets[i] == offnum)
							break;
3639
					}
B
Bruce Momjian 已提交
3640
					if (i >= vacpage->offsets_free)		/* not found */
3641
					{
B
Bruce Momjian 已提交
3642
						vacpage->offsets[vacpage->offsets_free++] = offnum;
B
Bruce Momjian 已提交
3643

3644 3645 3646
						/*
						 * If this is not a heap-only tuple, there must be an
						 * index entry for this item which will be removed in
B
Bruce Momjian 已提交
3647 3648
						 * the index cleanup. Decrement the
						 * keep_indexed_tuples count to remember this.
3649 3650 3651
						 */
						if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
							keep_indexed_tuples--;
3652 3653 3654
						keep_tuples--;
					}
				}
B
Bruce Momjian 已提交
3655 3656 3657
				else
				{
					vacpage->offsets[vacpage->offsets_free++] = offnum;
B
Bruce Momjian 已提交
3658

3659 3660
					/*
					 * If this is not a heap-only tuple, there must be an
B
Bruce Momjian 已提交
3661 3662 3663
					 * index entry for this item which will be removed in the
					 * index cleanup. Decrement the keep_indexed_tuples count
					 * to remember this.
3664 3665 3666
					 */
					if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
						keep_indexed_tuples--;
B
Bruce Momjian 已提交
3667 3668 3669
					keep_tuples--;
				}
				continue;
3670 3671 3672
			}

			/*
B
Bruce Momjian 已提交
3673 3674 3675 3676
			 * If this tuple is in a chain of tuples created in updates by
			 * "recent" transactions then we have to move the whole chain of
			 * tuples to other places, so that we can write new t_ctid links
			 * that preserve the chain relationship.
3677 3678
			 *
			 * This test is complicated.  Read it as "if tuple is a recently
B
Bruce Momjian 已提交
3679 3680 3681 3682
			 * created updated version, OR if it is an obsoleted version". (In
			 * the second half of the test, we needn't make any check on XMAX
			 * --- it must be recently obsoleted, else scan_heap would have
			 * deemed it removable.)
3683
			 *
3684 3685 3686 3687 3688 3689
			 * NOTE: this test is not 100% accurate: it is possible for a
			 * tuple to be an updated one with recent xmin, and yet not match
			 * any new_tid entry in the vtlinks list.  Presumably there was
			 * once a parent tuple with xmax matching the xmin, but it's
			 * possible that that tuple has been removed --- for example, if
			 * it had xmin = xmax and wasn't itself an updated version, then
B
Bruce Momjian 已提交
3690 3691
			 * HeapTupleSatisfiesVacuum would deem it removable as soon as the
			 * xmin xact completes.
3692
			 *
3693 3694
			 * To be on the safe side, we abandon the repair_frag process if
			 * we cannot find the parent tuple in vtlinks.	This may be overly
B
Bruce Momjian 已提交
3695
			 * conservative; AFAICS it would be safe to move the chain.
3696 3697 3698 3699 3700 3701 3702 3703 3704
			 *
			 * Also, because we distinguish DEAD and RECENTLY_DEAD tuples
			 * using OldestXmin, which is a rather coarse test, it is quite
			 * possible to have an update chain in which a tuple we think is
			 * RECENTLY_DEAD links forward to one that is definitely DEAD.
			 * In such a case the RECENTLY_DEAD tuple must actually be dead,
			 * but it seems too complicated to try to make VACUUM remove it.
			 * We treat each contiguous set of RECENTLY_DEAD tuples as a
			 * separately movable chain, ignoring any intervening DEAD ones.
3705
			 */
3706
			if (((tuple.t_data->t_infomask & HEAP_UPDATED) &&
B
Bruce Momjian 已提交
3707 3708
				 !TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
										OldestXmin)) ||
3709
				(!(tuple.t_data->t_infomask & (HEAP_XMAX_INVALID |
3710
											   HEAP_IS_LOCKED)) &&
3711 3712
				 !(ItemPointerEquals(&(tuple.t_self),
									 &(tuple.t_data->t_ctid)))))
3713
			{
B
Bruce Momjian 已提交
3714
				Buffer		Cbuf = buf;
3715 3716
				bool		freeCbuf = false;
				bool		chain_move_failed = false;
3717
				bool		moved_target = false;
B
Bruce Momjian 已提交
3718 3719 3720
				ItemPointerData Ctid;
				HeapTupleData tp = tuple;
				Size		tlen = tuple_len;
3721 3722 3723
				VTupleMove	vtmove;
				int			num_vtmove;
				int			free_vtmove;
B
Bruce Momjian 已提交
3724
				VacPage		to_vacpage = NULL;
B
Bruce Momjian 已提交
3725 3726
				int			to_item = 0;
				int			ti;
3727

B
Bruce Momjian 已提交
3728
				if (dst_buffer != InvalidBuffer)
3729
				{
3730
					ReleaseBuffer(dst_buffer);
B
Bruce Momjian 已提交
3731
					dst_buffer = InvalidBuffer;
3732
				}
B
Bruce Momjian 已提交
3733

3734 3735 3736
				/* Quick exit if we have no vtlinks to search in */
				if (vacrelstats->vtlinks == NULL)
				{
3737
					elog(DEBUG2, "parent item in update-chain not found --- cannot continue repair_frag");
B
Bruce Momjian 已提交
3738
					break;		/* out of walk-along-page loop */
3739 3740
				}

3741
				/*
B
Bruce Momjian 已提交
3742 3743 3744
				 * If this tuple is in the begin/middle of the chain then we
				 * have to move to the end of chain.  As with any t_ctid
				 * chase, we have to verify that each new tuple is really the
B
Bruce Momjian 已提交
3745 3746 3747 3748 3749
				 * descendant of the tuple we came from; however, here we need
				 * even more than the normal amount of paranoia. If t_ctid
				 * links forward to a tuple determined to be DEAD, then
				 * depending on where that tuple is, it might already have
				 * been removed, and perhaps even replaced by a MOVED_IN
3750 3751
				 * tuple.  We don't want to include any DEAD tuples in the
				 * chain, so we have to recheck HeapTupleSatisfiesVacuum.
3752
				 */
3753
				while (!(tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
3754
												  HEAP_IS_LOCKED)) &&
3755 3756
					   !(ItemPointerEquals(&(tp.t_self),
										   &(tp.t_data->t_ctid))))
3757
				{
3758 3759 3760 3761 3762 3763 3764
					ItemPointerData nextTid;
					TransactionId priorXmax;
					Buffer		nextBuf;
					Page		nextPage;
					OffsetNumber nextOffnum;
					ItemId		nextItemid;
					HeapTupleHeader nextTdata;
B
Bruce Momjian 已提交
3765
					HTSV_Result nextTstatus;
3766 3767 3768 3769

					nextTid = tp.t_data->t_ctid;
					priorXmax = HeapTupleHeaderGetXmax(tp.t_data);
					/* assume block# is OK (see heap_fetch comments) */
3770 3771 3772
					nextBuf = ReadBufferWithStrategy(onerel,
										 ItemPointerGetBlockNumber(&nextTid),
													 vac_strategy);
3773 3774 3775 3776 3777
					nextPage = BufferGetPage(nextBuf);
					/* If bogus or unused slot, assume tp is end of chain */
					nextOffnum = ItemPointerGetOffsetNumber(&nextTid);
					if (nextOffnum < FirstOffsetNumber ||
						nextOffnum > PageGetMaxOffsetNumber(nextPage))
3778
					{
3779 3780 3781 3782
						ReleaseBuffer(nextBuf);
						break;
					}
					nextItemid = PageGetItemId(nextPage, nextOffnum);
3783
					if (!ItemIdIsNormal(nextItemid))
3784 3785 3786
					{
						ReleaseBuffer(nextBuf);
						break;
3787
					}
3788 3789 3790 3791 3792 3793 3794 3795 3796
					/* if not matching XMIN, assume tp is end of chain */
					nextTdata = (HeapTupleHeader) PageGetItem(nextPage,
															  nextItemid);
					if (!TransactionIdEquals(HeapTupleHeaderGetXmin(nextTdata),
											 priorXmax))
					{
						ReleaseBuffer(nextBuf);
						break;
					}
B
Bruce Momjian 已提交
3797

3798
					/*
B
Bruce Momjian 已提交
3799 3800 3801
					 * Must check for DEAD or MOVED_IN tuple, too.	This could
					 * potentially update hint bits, so we'd better hold the
					 * buffer content lock.
3802 3803
					 */
					LockBuffer(nextBuf, BUFFER_LOCK_SHARE);
3804 3805
					nextTstatus = HeapTupleSatisfiesVacuum(onerel,
														   nextTdata,
3806
														   OldestXmin,
3807
														   nextBuf);
3808 3809 3810
					if (nextTstatus == HEAPTUPLE_DEAD ||
						nextTstatus == HEAPTUPLE_INSERT_IN_PROGRESS)
					{
3811
						UnlockReleaseBuffer(nextBuf);
3812 3813
						break;
					}
3814
					LockBuffer(nextBuf, BUFFER_LOCK_UNLOCK);
3815 3816 3817
					/* if it's MOVED_OFF we shoulda moved this one with it */
					if (nextTstatus == HEAPTUPLE_DELETE_IN_PROGRESS)
						elog(ERROR, "updated tuple is already HEAP_MOVED_OFF");
3818 3819 3820 3821
					/* OK, switch our attention to the next tuple in chain */
					tp.t_data = nextTdata;
					tp.t_self = nextTid;
					tlen = tp.t_len = ItemIdGetLength(nextItemid);
3822 3823
					if (freeCbuf)
						ReleaseBuffer(Cbuf);
3824 3825
					Cbuf = nextBuf;
					freeCbuf = true;
3826 3827
				}

3828 3829 3830 3831 3832
				/* Set up workspace for planning the chain move */
				vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
				num_vtmove = 0;
				free_vtmove = 100;

3833
				/*
B
Bruce Momjian 已提交
3834 3835 3836
				 * Now, walk backwards up the chain (towards older tuples) and
				 * check if all items in chain can be moved.  We record all
				 * the moves that need to be made in the vtmove array.
3837
				 */
B
Bruce Momjian 已提交
3838
				for (;;)
3839
				{
3840 3841 3842
					Buffer		Pbuf;
					Page		Ppage;
					ItemId		Pitemid;
3843
					HeapTupleHeader PTdata;
3844 3845 3846
					VTupleLinkData vtld,
							   *vtlp;

3847
					/* Identify a target page to move this tuple to */
B
Bruce Momjian 已提交
3848 3849
					if (to_vacpage == NULL ||
						!enough_space(to_vacpage, tlen))
3850 3851 3852
					{
						for (i = 0; i < num_fraged_pages; i++)
						{
B
Bruce Momjian 已提交
3853
							if (enough_space(fraged_pages->pagedesc[i], tlen))
3854 3855
								break;
						}
B
Bruce Momjian 已提交
3856 3857

						if (i == num_fraged_pages)
B
Bruce Momjian 已提交
3858
						{
3859
							/* can't move item anywhere */
3860
							chain_move_failed = true;
B
Bruce Momjian 已提交
3861
							break;		/* out of check-all-items loop */
3862 3863
						}
						to_item = i;
B
Bruce Momjian 已提交
3864
						to_vacpage = fraged_pages->pagedesc[to_item];
3865
					}
B
Bruce Momjian 已提交
3866 3867
					to_vacpage->free -= MAXALIGN(tlen);
					if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
3868
						to_vacpage->free -= sizeof(ItemIdData);
B
Bruce Momjian 已提交
3869
					(to_vacpage->offsets_used)++;
3870 3871

					/* Add an entry to vtmove list */
3872 3873 3874
					if (free_vtmove == 0)
					{
						free_vtmove = 1000;
3875 3876 3877 3878
						vtmove = (VTupleMove)
							repalloc(vtmove,
									 (free_vtmove + num_vtmove) *
									 sizeof(VTupleMoveData));
3879 3880
					}
					vtmove[num_vtmove].tid = tp.t_self;
B
Bruce Momjian 已提交
3881 3882
					vtmove[num_vtmove].vacpage = to_vacpage;
					if (to_vacpage->offsets_used == 1)
3883 3884 3885 3886 3887
						vtmove[num_vtmove].cleanVpd = true;
					else
						vtmove[num_vtmove].cleanVpd = false;
					free_vtmove--;
					num_vtmove++;
B
Bruce Momjian 已提交
3888

3889 3890 3891 3892 3893
					/* Remember if we reached the original target tuple */
					if (ItemPointerGetBlockNumber(&tp.t_self) == blkno &&
						ItemPointerGetOffsetNumber(&tp.t_self) == offnum)
						moved_target = true;

3894
					/* Done if at beginning of chain */
B
Bruce Momjian 已提交
3895
					if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
B
Bruce Momjian 已提交
3896 3897 3898
					 TransactionIdPrecedes(HeapTupleHeaderGetXmin(tp.t_data),
										   OldestXmin))
						break;	/* out of check-all-items loop */
B
Bruce Momjian 已提交
3899

3900
					/* Move to tuple with prior row version */
3901 3902 3903 3904 3905 3906 3907 3908
					vtld.new_tid = tp.t_self;
					vtlp = (VTupleLink)
						vac_bsearch((void *) &vtld,
									(void *) (vacrelstats->vtlinks),
									vacrelstats->num_vtlinks,
									sizeof(VTupleLinkData),
									vac_cmp_vtlinks);
					if (vtlp == NULL)
3909
					{
3910
						/* see discussion above */
3911
						elog(DEBUG2, "parent item in update-chain not found --- cannot continue repair_frag");
3912
						chain_move_failed = true;
B
Bruce Momjian 已提交
3913
						break;	/* out of check-all-items loop */
3914 3915
					}
					tp.t_self = vtlp->this_tid;
3916
					Pbuf = ReadBufferWithStrategy(onerel,
B
Bruce Momjian 已提交
3917
									 ItemPointerGetBlockNumber(&(tp.t_self)),
3918
												  vac_strategy);
3919 3920
					Ppage = BufferGetPage(Pbuf);
					Pitemid = PageGetItemId(Ppage,
B
Bruce Momjian 已提交
3921
								   ItemPointerGetOffsetNumber(&(tp.t_self)));
3922
					/* this can't happen since we saw tuple earlier: */
3923
					if (!ItemIdIsNormal(Pitemid))
3924
						elog(ERROR, "parent itemid marked as unused");
3925
					PTdata = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
3926 3927 3928

					/* ctid should not have changed since we saved it */
					Assert(ItemPointerEquals(&(vtld.new_tid),
3929
											 &(PTdata->t_ctid)));
B
Bruce Momjian 已提交
3930

3931
					/*
3932
					 * Read above about cases when !ItemIdIsUsed(nextItemid)
B
Bruce Momjian 已提交
3933 3934 3935 3936 3937 3938 3939 3940
					 * (child item is removed)... Due to the fact that at the
					 * moment we don't remove unuseful part of update-chain,
					 * it's possible to get non-matching parent row here. Like
					 * as in the case which caused this problem, we stop
					 * shrinking here. I could try to find real parent row but
					 * want not to do it because of real solution will be
					 * implemented anyway, later, and we are too close to 6.5
					 * release. - vadim 06/11/99
3941
					 */
3942 3943
					if ((PTdata->t_infomask & HEAP_XMAX_IS_MULTI) ||
						!(TransactionIdEquals(HeapTupleHeaderGetXmax(PTdata),
B
Bruce Momjian 已提交
3944
										 HeapTupleHeaderGetXmin(tp.t_data))))
3945 3946
					{
						ReleaseBuffer(Pbuf);
3947
						elog(DEBUG2, "too old parent tuple found --- cannot continue repair_frag");
3948
						chain_move_failed = true;
B
Bruce Momjian 已提交
3949
						break;	/* out of check-all-items loop */
3950
					}
3951
					tp.t_data = PTdata;
3952 3953 3954 3955 3956
					tlen = tp.t_len = ItemIdGetLength(Pitemid);
					if (freeCbuf)
						ReleaseBuffer(Cbuf);
					Cbuf = Pbuf;
					freeCbuf = true;
B
Bruce Momjian 已提交
3957
				}				/* end of check-all-items loop */
3958

3959 3960
				if (freeCbuf)
					ReleaseBuffer(Cbuf);
3961 3962
				freeCbuf = false;

3963 3964 3965 3966 3967 3968 3969
				/* Double-check that we will move the current target tuple */
				if (!moved_target && !chain_move_failed)
				{
					elog(DEBUG2, "failed to chain back to target --- cannot continue repair_frag");
					chain_move_failed = true;
				}

3970
				if (chain_move_failed)
3971
				{
3972
					/*
B
Bruce Momjian 已提交
3973 3974 3975
					 * Undo changes to offsets_used state.	We don't bother
					 * cleaning up the amount-free state, since we're not
					 * going to do any further tuple motion.
3976 3977 3978 3979 3980 3981
					 */
					for (i = 0; i < num_vtmove; i++)
					{
						Assert(vtmove[i].vacpage->offsets_used > 0);
						(vtmove[i].vacpage->offsets_used)--;
					}
3982
					pfree(vtmove);
3983
					break;		/* out of walk-along-page loop */
3984
				}
3985

3986
				/*
3987 3988 3989
				 * Okay, move the whole tuple chain in reverse order.
				 *
				 * Ctid tracks the new location of the previously-moved tuple.
3990
				 */
3991 3992 3993
				ItemPointerSetInvalid(&Ctid);
				for (ti = 0; ti < num_vtmove; ti++)
				{
B
Bruce Momjian 已提交
3994
					VacPage		destvacpage = vtmove[ti].vacpage;
B
Bruce Momjian 已提交
3995 3996
					Page		Cpage;
					ItemId		Citemid;
3997

V
Vadim B. Mikheev 已提交
3998
					/* Get page to move from */
3999
					tuple.t_self = vtmove[ti].tid;
4000
					Cbuf = ReadBufferWithStrategy(onerel,
B
Bruce Momjian 已提交
4001
								  ItemPointerGetBlockNumber(&(tuple.t_self)),
4002
												  vac_strategy);
V
Vadim B. Mikheev 已提交
4003 4004

					/* Get page to move to */
4005 4006 4007
					dst_buffer = ReadBufferWithStrategy(onerel,
														destvacpage->blkno,
														vac_strategy);
V
Vadim B. Mikheev 已提交
4008

B
Bruce Momjian 已提交
4009 4010
					LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
					if (dst_buffer != Cbuf)
V
Vadim B. Mikheev 已提交
4011 4012
						LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);

B
Bruce Momjian 已提交
4013
					dst_page = BufferGetPage(dst_buffer);
4014
					Cpage = BufferGetPage(Cbuf);
V
Vadim B. Mikheev 已提交
4015

B
Bruce Momjian 已提交
4016
					Citemid = PageGetItemId(Cpage,
B
Bruce Momjian 已提交
4017
								ItemPointerGetOffsetNumber(&(tuple.t_self)));
4018 4019
					tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
					tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
4020

B
Bruce Momjian 已提交
4021 4022 4023
					move_chain_tuple(onerel, Cbuf, Cpage, &tuple,
									 dst_buffer, dst_page, destvacpage,
									 &ec, &Ctid, vtmove[ti].cleanVpd);
V
Vadim B. Mikheev 已提交
4024

4025
					/*
B
Bruce Momjian 已提交
4026 4027 4028 4029
					 * If the tuple we are moving is a heap-only tuple, this
					 * move will generate an additional index entry, so
					 * increment the rel_indexed_tuples count.
					 */
4030 4031 4032
					if (HeapTupleHeaderIsHeapOnly(tuple.t_data))
						vacrelstats->rel_indexed_tuples++;

B
Bruce Momjian 已提交
4033
					num_moved++;
4034
					if (destvacpage->blkno > last_move_dest_block)
B
Bruce Momjian 已提交
4035
						last_move_dest_block = destvacpage->blkno;
B
Bruce Momjian 已提交
4036

4037 4038 4039 4040
					/*
					 * Remember that we moved tuple from the current page
					 * (corresponding index tuple will be cleaned).
					 */
4041
					if (Cbuf == buf)
B
Bruce Momjian 已提交
4042
						vacpage->offsets[vacpage->offsets_free++] =
B
Bruce Momjian 已提交
4043
							ItemPointerGetOffsetNumber(&(tuple.t_self));
4044
					else
4045 4046 4047 4048
					{
						/*
						 * When we move tuple chains, we may need to move
						 * tuples from a block that we haven't yet scanned in
B
Bruce Momjian 已提交
4049 4050 4051 4052 4053 4054 4055 4056
						 * the outer walk-along-the-relation loop. Note that
						 * we can't be moving a tuple from a block that we
						 * have already scanned because if such a tuple
						 * exists, then we must have moved the chain along
						 * with that tuple when we scanned that block. IOW the
						 * test of (Cbuf != buf) guarantees that the tuple we
						 * are looking at right now is in a block which is yet
						 * to be scanned.
4057 4058 4059 4060 4061
						 *
						 * We maintain two counters to correctly count the
						 * moved-off tuples from blocks that are not yet
						 * scanned (keep_tuples) and how many of them have
						 * index pointers (keep_indexed_tuples).  The main
B
Bruce Momjian 已提交
4062 4063 4064
						 * reason to track the latter is to help verify that
						 * indexes have the expected number of entries when
						 * all the dust settles.
4065 4066 4067
						 */
						if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
							keep_indexed_tuples++;
4068
						keep_tuples++;
4069
					}
4070

4071 4072
					ReleaseBuffer(dst_buffer);
					ReleaseBuffer(Cbuf);
B
Bruce Momjian 已提交
4073
				}				/* end of move-the-tuple-chain loop */
4074

B
Bruce Momjian 已提交
4075
				dst_buffer = InvalidBuffer;
4076
				pfree(vtmove);
4077
				chain_tuple_moved = true;
4078 4079

				/* advance to next tuple in walk-along-page loop */
4080
				continue;
B
Bruce Momjian 已提交
4081
			}					/* end of is-tuple-in-chain test */
4082

4083
			/* try to find new page for this tuple */
B
Bruce Momjian 已提交
4084 4085
			if (dst_buffer == InvalidBuffer ||
				!enough_space(dst_vacpage, tuple_len))
4086
			{
B
Bruce Momjian 已提交
4087
				if (dst_buffer != InvalidBuffer)
4088
				{
4089
					ReleaseBuffer(dst_buffer);
B
Bruce Momjian 已提交
4090
					dst_buffer = InvalidBuffer;
4091
				}
B
Bruce Momjian 已提交
4092
				for (i = 0; i < num_fraged_pages; i++)
4093
				{
B
Bruce Momjian 已提交
4094
					if (enough_space(fraged_pages->pagedesc[i], tuple_len))
4095 4096
						break;
				}
B
Bruce Momjian 已提交
4097
				if (i == num_fraged_pages)
4098
					break;		/* can't move item anywhere */
B
Bruce Momjian 已提交
4099
				dst_vacpage = fraged_pages->pagedesc[i];
4100 4101 4102
				dst_buffer = ReadBufferWithStrategy(onerel,
													dst_vacpage->blkno,
													vac_strategy);
B
Bruce Momjian 已提交
4103 4104
				LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
				dst_page = BufferGetPage(dst_buffer);
4105
				/* if this page was not used before - clean it */
B
Bruce Momjian 已提交
4106 4107
				if (!PageIsEmpty(dst_page) && dst_vacpage->offsets_used == 0)
					vacuum_page(onerel, dst_buffer, dst_vacpage);
4108
			}
V
Vadim B. Mikheev 已提交
4109
			else
B
Bruce Momjian 已提交
4110
				LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
V
Vadim B. Mikheev 已提交
4111 4112

			LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
4113

B
Bruce Momjian 已提交
4114 4115
			move_plain_tuple(onerel, buf, page, &tuple,
							 dst_buffer, dst_page, dst_vacpage, &ec);
4116

4117
			/*
B
Bruce Momjian 已提交
4118 4119 4120
			 * If the tuple we are moving is a heap-only tuple, this move will
			 * generate an additional index entry, so increment the
			 * rel_indexed_tuples count.
4121 4122 4123 4124
			 */
			if (HeapTupleHeaderIsHeapOnly(tuple.t_data))
				vacrelstats->rel_indexed_tuples++;

B
Bruce Momjian 已提交
4125 4126 4127
			num_moved++;
			if (dst_vacpage->blkno > last_move_dest_block)
				last_move_dest_block = dst_vacpage->blkno;
4128

B
Bruce Momjian 已提交
4129
			/*
B
Bruce Momjian 已提交
4130 4131
			 * Remember that we moved tuple from the current page
			 * (corresponding index tuple will be cleaned).
4132
			 */
B
Bruce Momjian 已提交
4133
			vacpage->offsets[vacpage->offsets_free++] = offnum;
B
Bruce Momjian 已提交
4134
		}						/* walk along page */
4135

4136
		/*
B
Bruce Momjian 已提交
4137 4138 4139 4140
		 * If we broke out of the walk-along-page loop early (ie, still have
		 * offnum <= maxoff), then we failed to move some tuple off this page.
		 * No point in shrinking any more, so clean up and exit the per-page
		 * loop.
4141
		 */
4142 4143
		if (offnum < maxoff && keep_tuples > 0)
		{
B
Bruce Momjian 已提交
4144
			OffsetNumber off;
4145

4146
			/*
B
Bruce Momjian 已提交
4147
			 * Fix vacpage state for any unvisited tuples remaining on page
4148
			 */
4149
			for (off = OffsetNumberNext(offnum);
B
Bruce Momjian 已提交
4150 4151
				 off <= maxoff;
				 off = OffsetNumberNext(off))
4152
			{
B
Bruce Momjian 已提交
4153 4154
				ItemId		itemid = PageGetItemId(page, off);
				HeapTupleHeader htup;
B
Bruce Momjian 已提交
4155

4156 4157
				if (!ItemIdIsUsed(itemid))
					continue;
4158 4159 4160
				/* Shouldn't be any DEAD or REDIRECT items anymore */
				Assert(ItemIdIsNormal(itemid));

B
Bruce Momjian 已提交
4161 4162
				htup = (HeapTupleHeader) PageGetItem(page, itemid);
				if (htup->t_infomask & HEAP_XMIN_COMMITTED)
4163
					continue;
B
Bruce Momjian 已提交
4164

B
Bruce Momjian 已提交
4165
				/*
B
Bruce Momjian 已提交
4166 4167
				 * See comments in the walk-along-page loop above about why
				 * only MOVED_OFF tuples should be found here.
B
Bruce Momjian 已提交
4168
				 */
4169 4170 4171 4172 4173 4174 4175
				if (htup->t_infomask & HEAP_MOVED_IN)
					elog(ERROR, "HEAP_MOVED_IN was not expected");
				if (!(htup->t_infomask & HEAP_MOVED_OFF))
					elog(ERROR, "HEAP_MOVED_OFF was expected");
				if (HeapTupleHeaderGetXvac(htup) != myXID)
					elog(ERROR, "invalid XVAC in tuple header");

B
Bruce Momjian 已提交
4176
				if (chain_tuple_moved)
4177
				{
4178
					/* some chains were moved while cleaning this page */
B
Bruce Momjian 已提交
4179 4180 4181 4182 4183
					Assert(vacpage->offsets_free > 0);
					for (i = 0; i < vacpage->offsets_free; i++)
					{
						if (vacpage->offsets[i] == off)
							break;
4184
					}
B
Bruce Momjian 已提交
4185
					if (i >= vacpage->offsets_free)		/* not found */
4186
					{
B
Bruce Momjian 已提交
4187
						vacpage->offsets[vacpage->offsets_free++] = off;
4188
						Assert(keep_tuples > 0);
B
Bruce Momjian 已提交
4189

4190 4191 4192
						/*
						 * If this is not a heap-only tuple, there must be an
						 * index entry for this item which will be removed in
B
Bruce Momjian 已提交
4193 4194
						 * the index cleanup. Decrement the
						 * keep_indexed_tuples count to remember this.
4195 4196 4197
						 */
						if (!HeapTupleHeaderIsHeapOnly(htup))
							keep_indexed_tuples--;
4198 4199 4200
						keep_tuples--;
					}
				}
4201
				else
B
Bruce Momjian 已提交
4202 4203 4204
				{
					vacpage->offsets[vacpage->offsets_free++] = off;
					Assert(keep_tuples > 0);
4205 4206
					if (!HeapTupleHeaderIsHeapOnly(htup))
						keep_indexed_tuples--;
B
Bruce Momjian 已提交
4207 4208
					keep_tuples--;
				}
4209 4210 4211
			}
		}

B
Bruce Momjian 已提交
4212
		if (vacpage->offsets_free > 0)	/* some tuples were moved */
V
Vadim B. Mikheev 已提交
4213
		{
4214 4215
			if (chain_tuple_moved)		/* else - they are ordered */
			{
B
Bruce Momjian 已提交
4216
				qsort((char *) (vacpage->offsets), vacpage->offsets_free,
B
Bruce Momjian 已提交
4217
					  sizeof(OffsetNumber), vac_cmp_offno);
4218
			}
4219
			vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
V
Vadim B. Mikheev 已提交
4220
		}
4221

4222 4223 4224
		MIRROREDLOCK_BUFMGR_UNLOCK;
		// -------- MirroredLock ----------

4225
		ReleaseBuffer(buf);
V
Vadim B. Mikheev 已提交
4226

4227
		if (offnum <= maxoff)
4228
			break;				/* had to quit early, see above note */
4229 4230 4231 4232 4233

	}							/* walk along relation */

	blkno++;					/* new number of blocks */

B
Bruce Momjian 已提交
4234
	if (dst_buffer != InvalidBuffer)
V
Vadim B. Mikheev 已提交
4235
	{
B
Bruce Momjian 已提交
4236
		Assert(num_moved > 0);
4237
		ReleaseBuffer(dst_buffer);
V
Vadim B. Mikheev 已提交
4238
	}
4239

4240 4241 4242 4243 4244 4245
	/*
	 * In GPDB, the moving of relation tuples and truncating the relation is
	 * performed in two separate transactions one after the other so we don't
	 * need to commit the transaction here unlike the upstream code. The
	 * transactions are started and ended in vacuumStatement_Relation().
	 */
4246 4247

	/*
4248
	 * We are not going to move any more tuples across pages, but we still
B
Bruce Momjian 已提交
4249 4250 4251 4252
	 * need to apply vacuum_page to compact free space in the remaining pages
	 * in vacuum_pages list.  Note that some of these pages may also be in the
	 * fraged_pages list, and may have had tuples moved onto them; if so, we
	 * already did vacuum_page and needn't do it again.
4253
	 */
4254 4255 4256
	for (i = 0, curpage = vacuum_pages->pagedesc;
		 i < vacuumed_pages;
		 i++, curpage++)
V
Vadim B. Mikheev 已提交
4257
	{
4258 4259
		vacuum_delay_point();

4260
		Assert((*curpage)->blkno < blkno);
4261
		if ((*curpage)->offsets_used == 0)
4262
		{
B
Bruce Momjian 已提交
4263 4264 4265
			Buffer		buf;
			Page		page;

4266
			/* this page was not used as a move target, so must clean it */
4267 4268 4269 4270

			// -------- MirroredLock ----------
			MIRROREDLOCK_BUFMGR_LOCK;

4271 4272 4273
			buf = ReadBufferWithStrategy(onerel,
										 (*curpage)->blkno,
										 vac_strategy);
4274 4275
			LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
			page = BufferGetPage(buf);
4276
			if (!PageIsEmpty(page))
4277
				vacuum_page(onerel, buf, *curpage);
4278
			UnlockReleaseBuffer(buf);
4279 4280 4281 4282

			MIRROREDLOCK_BUFMGR_UNLOCK;
			// -------- MirroredLock ----------

4283
		}
4284 4285
	}

4286
	/*
B
Bruce Momjian 已提交
4287 4288 4289 4290
	 * It'd be cleaner to make this report at the bottom of this routine, but
	 * then the rusage would double-count the second pass of index vacuuming.
	 * So do it here and ignore the relatively small amount of processing that
	 * occurs below.
4291 4292
	 */
	ereport(elevel,
4293
			(errmsg("\"%s\": moved %u row versions, will truncate %u to %u pages",
B
Bruce Momjian 已提交
4294 4295 4296 4297
					RelationGetRelationName(onerel),
					num_moved, nblocks, blkno),
			 errdetail("%s.",
					   pg_rusage_show(&ru0))));
4298

4299 4300 4301
	/*
	 * Reflect the motion of system tuples to catalog cache here.
	 */
4302
	CommandCounterIncrement();
4303

4304
	/* clean up */
B
Bruce Momjian 已提交
4305
	pfree(vacpage);
4306 4307
	if (vacrelstats->vtlinks != NULL)
		pfree(vacrelstats->vtlinks);
4308

B
Bruce Momjian 已提交
4309
	ExecContext_Finish(&ec);
4310 4311 4312 4313

	vacuum_pages->empty_end_pages = nblocks - blkno;

	SIMPLE_FAULT_INJECTOR(RepairFragEnd);
4314

4315
	return heldoff;
B
Bruce Momjian 已提交
4316
}
4317

B
Bruce Momjian 已提交
4318 4319 4320 4321 4322 4323 4324 4325 4326 4327 4328 4329 4330 4331 4332 4333 4334 4335 4336 4337
/*
 *	move_chain_tuple() -- move one tuple that is part of a tuple chain
 *
 *		This routine moves old_tup from old_page to dst_page.
 *		old_page and dst_page might be the same page.
 *		On entry old_buf and dst_buf are locked exclusively, both locks (or
 *		the single lock, if this is a intra-page-move) are released before
 *		exit.
 *
 *		Yes, a routine with ten parameters is ugly, but it's still better
 *		than having these 120 lines of code in repair_frag() which is
 *		already too long and almost unreadable.
 */
static void
move_chain_tuple(Relation rel,
				 Buffer old_buf, Page old_page, HeapTuple old_tup,
				 Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
				 ExecContext ec, ItemPointer ctid, bool cleanVpd)
{
	TransactionId myXID = GetCurrentTransactionId();
B
Bruce Momjian 已提交
4338 4339 4340 4341
	HeapTupleData newtup;
	OffsetNumber newoff;
	ItemId		newitemid;
	Size		tuple_len = old_tup->t_len;
B
Bruce Momjian 已提交
4342

4343 4344 4345 4346 4347
	MIRROREDLOCK_BUFMGR_MUST_ALREADY_BE_HELD;

	// Fetch gp_persistent_relation_node information that will be added to XLOG record.
	RelationFetchGpRelationNodeForXLog(rel);

4348 4349 4350
	/*
	 * make a modifiable copy of the source tuple.
	 */
B
Bruce Momjian 已提交
4351 4352
	heap_copytuple_with_tuple(old_tup, &newtup);

4353 4354 4355
	/*
	 * register invalidation of source tuple in catcaches.
	 */
B
Bruce Momjian 已提交
4356 4357 4358 4359 4360
	CacheInvalidateHeapTuple(rel, old_tup);

	/* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
	START_CRIT_SECTION();

4361 4362 4363
	/*
	 * mark the source tuple MOVED_OFF.
	 */
B
Bruce Momjian 已提交
4364
	old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
B
Bruce Momjian 已提交
4365 4366
									 HEAP_XMIN_INVALID |
									 HEAP_MOVED_IN);
B
Bruce Momjian 已提交
4367 4368 4369 4370 4371 4372
	old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
	HeapTupleHeaderSetXvac(old_tup->t_data, myXID);

	/*
	 * If this page was not used before - clean it.
	 *
B
Bruce Momjian 已提交
4373 4374 4375 4376 4377 4378 4379
	 * NOTE: a nasty bug used to lurk here.  It is possible for the source and
	 * destination pages to be the same (since this tuple-chain member can be
	 * on a page lower than the one we're currently processing in the outer
	 * loop).  If that's true, then after vacuum_page() the source tuple will
	 * have been moved, and tuple.t_data will be pointing at garbage.
	 * Therefore we must do everything that uses old_tup->t_data BEFORE this
	 * step!!
B
Bruce Momjian 已提交
4380
	 *
4381 4382
	 * This path is different from the other callers of vacuum_page, because
	 * we have already incremented the vacpage's offsets_used field to account
B
Bruce Momjian 已提交
4383 4384 4385 4386
	 * for the tuple(s) we expect to move onto the page. Therefore
	 * vacuum_page's check for offsets_used == 0 is wrong. But since that's a
	 * good debugging check for all other callers, we work around it here
	 * rather than remove it.
B
Bruce Momjian 已提交
4387 4388 4389
	 */
	if (!PageIsEmpty(dst_page) && cleanVpd)
	{
B
Bruce Momjian 已提交
4390
		int			sv_offsets_used = dst_vacpage->offsets_used;
B
Bruce Momjian 已提交
4391 4392 4393 4394 4395 4396 4397

		dst_vacpage->offsets_used = 0;
		vacuum_page(rel, dst_buf, dst_vacpage);
		dst_vacpage->offsets_used = sv_offsets_used;
	}

	/*
B
Bruce Momjian 已提交
4398
	 * Update the state of the copied tuple, and store it on the destination
4399
	 * page.  The copied tuple is never part of a HOT chain.
B
Bruce Momjian 已提交
4400 4401 4402 4403 4404
	 */
	newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
								   HEAP_XMIN_INVALID |
								   HEAP_MOVED_OFF);
	newtup.t_data->t_infomask |= HEAP_MOVED_IN;
4405 4406
	HeapTupleHeaderClearHotUpdated(newtup.t_data);
	HeapTupleHeaderClearHeapOnly(newtup.t_data);
B
Bruce Momjian 已提交
4407 4408
	HeapTupleHeaderSetXvac(newtup.t_data, myXID);
	newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
4409
						 InvalidOffsetNumber, false, true);
B
Bruce Momjian 已提交
4410 4411
	if (newoff == InvalidOffsetNumber)
		elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
B
Bruce Momjian 已提交
4412
			 (unsigned long) tuple_len, dst_vacpage->blkno);
B
Bruce Momjian 已提交
4413
	newitemid = PageGetItemId(dst_page, newoff);
4414
	/* drop temporary copy, and point to the version on the dest page */
B
Bruce Momjian 已提交
4415 4416
	pfree(newtup.t_data);
	newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
4417

B
Bruce Momjian 已提交
4418 4419
	ItemPointerSet(&(newtup.t_self), dst_vacpage->blkno, newoff);

4420
	/*
B
Bruce Momjian 已提交
4421 4422 4423
	 * Set new tuple's t_ctid pointing to itself if last tuple in chain, and
	 * to next tuple in chain otherwise.  (Since we move the chain in reverse
	 * order, this is actually the previously processed tuple.)
4424 4425 4426 4427 4428 4429 4430
	 */
	if (!ItemPointerIsValid(ctid))
		newtup.t_data->t_ctid = newtup.t_self;
	else
		newtup.t_data->t_ctid = *ctid;
	*ctid = newtup.t_self;

4431 4432 4433 4434
	MarkBufferDirty(dst_buf);
	if (dst_buf != old_buf)
		MarkBufferDirty(old_buf);

B
Bruce Momjian 已提交
4435 4436 4437 4438 4439 4440 4441 4442 4443
	/* XLOG stuff */
	if (!rel->rd_istemp)
	{
		XLogRecPtr	recptr = log_heap_move(rel, old_buf, old_tup->t_self,
										   dst_buf, &newtup);

		if (old_buf != dst_buf)
		{
			PageSetLSN(old_page, recptr);
4444
			PageSetTLI(old_page, ThisTimeLineID);
B
Bruce Momjian 已提交
4445 4446
		}
		PageSetLSN(dst_page, recptr);
4447
		PageSetTLI(dst_page, ThisTimeLineID);
B
Bruce Momjian 已提交
4448 4449 4450 4451 4452 4453 4454 4455 4456 4457 4458
	}

	END_CRIT_SECTION();

	LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
	if (dst_buf != old_buf)
		LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);

	/* Create index entries for the moved tuple */
	if (ec->resultRelInfo->ri_NumIndices > 0)
	{
4459
		ExecStoreGenericTuple(&newtup, ec->slot, false);
B
Bruce Momjian 已提交
4460
		ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
4461
		ResetPerTupleExprContext(ec->estate);
B
Bruce Momjian 已提交
4462 4463 4464 4465 4466 4467 4468 4469 4470 4471 4472 4473 4474 4475 4476 4477 4478 4479 4480 4481 4482
	}
}

/*
 *	move_plain_tuple() -- move one tuple that is not part of a chain
 *
 *		This routine moves old_tup from old_page to dst_page.
 *		On entry old_buf and dst_buf are locked exclusively, both locks are
 *		released before exit.
 *
 *		Yes, a routine with eight parameters is ugly, but it's still better
 *		than having these 90 lines of code in repair_frag() which is already
 *		too long and almost unreadable.
 */
static void
move_plain_tuple(Relation rel,
				 Buffer old_buf, Page old_page, HeapTuple old_tup,
				 Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
				 ExecContext ec)
{
	TransactionId myXID = GetCurrentTransactionId();
B
Bruce Momjian 已提交
4483 4484 4485 4486
	HeapTupleData newtup;
	OffsetNumber newoff;
	ItemId		newitemid;
	Size		tuple_len = old_tup->t_len;
B
Bruce Momjian 已提交
4487

4488 4489 4490 4491 4492
	MIRROREDLOCK_BUFMGR_MUST_ALREADY_BE_HELD;

	// Fetch gp_persistent_relation_node information that will be added to XLOG record.
	RelationFetchGpRelationNodeForXLog(rel);

B
Bruce Momjian 已提交
4493 4494 4495 4496 4497 4498
	/* copy tuple */
	heap_copytuple_with_tuple(old_tup, &newtup);

	/*
	 * register invalidation of source tuple in catcaches.
	 *
B
Bruce Momjian 已提交
4499
	 * (Note: we do not need to register the copied tuple, because we are not
B
Bruce Momjian 已提交
4500 4501
	 * changing the tuple contents and so there cannot be any need to flush
	 * negative catcache entries.)
B
Bruce Momjian 已提交
4502 4503 4504 4505 4506 4507
	 */
	CacheInvalidateHeapTuple(rel, old_tup);

	/* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
	START_CRIT_SECTION();

4508
	/*
4509
	 * Mark new tuple as MOVED_IN by me; also mark it not HOT.
4510
	 */
B
Bruce Momjian 已提交
4511 4512 4513 4514
	newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
								   HEAP_XMIN_INVALID |
								   HEAP_MOVED_OFF);
	newtup.t_data->t_infomask |= HEAP_MOVED_IN;
4515 4516
	HeapTupleHeaderClearHotUpdated(newtup.t_data);
	HeapTupleHeaderClearHeapOnly(newtup.t_data);
B
Bruce Momjian 已提交
4517 4518 4519 4520
	HeapTupleHeaderSetXvac(newtup.t_data, myXID);

	/* add tuple to the page */
	newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
4521
						 InvalidOffsetNumber, false, true);
B
Bruce Momjian 已提交
4522 4523 4524 4525 4526 4527 4528 4529 4530 4531 4532
	if (newoff == InvalidOffsetNumber)
		elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
			 (unsigned long) tuple_len,
			 dst_vacpage->blkno, (unsigned long) dst_vacpage->free,
			 dst_vacpage->offsets_used, dst_vacpage->offsets_free);
	newitemid = PageGetItemId(dst_page, newoff);
	pfree(newtup.t_data);
	newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
	ItemPointerSet(&(newtup.t_data->t_ctid), dst_vacpage->blkno, newoff);
	newtup.t_self = newtup.t_data->t_ctid;

4533 4534 4535
	/*
	 * Mark old tuple as MOVED_OFF by me.
	 */
B
Bruce Momjian 已提交
4536
	old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
B
Bruce Momjian 已提交
4537 4538
									 HEAP_XMIN_INVALID |
									 HEAP_MOVED_IN);
B
Bruce Momjian 已提交
4539 4540 4541
	old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
	HeapTupleHeaderSetXvac(old_tup->t_data, myXID);

4542 4543 4544
	MarkBufferDirty(dst_buf);
	MarkBufferDirty(old_buf);

B
Bruce Momjian 已提交
4545 4546 4547 4548 4549 4550 4551
	/* XLOG stuff */
	if (!rel->rd_istemp)
	{
		XLogRecPtr	recptr = log_heap_move(rel, old_buf, old_tup->t_self,
										   dst_buf, &newtup);

		PageSetLSN(old_page, recptr);
4552
		PageSetTLI(old_page, ThisTimeLineID);
B
Bruce Momjian 已提交
4553
		PageSetLSN(dst_page, recptr);
4554
		PageSetTLI(dst_page, ThisTimeLineID);
B
Bruce Momjian 已提交
4555 4556 4557 4558
	}

	END_CRIT_SECTION();

B
Bruce Momjian 已提交
4559
	dst_vacpage->free = PageGetFreeSpaceWithFillFactor(rel, dst_page);
B
Bruce Momjian 已提交
4560 4561 4562 4563 4564 4565 4566 4567
	LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
	LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);

	dst_vacpage->offsets_used++;

	/* insert index' tuples if needed */
	if (ec->resultRelInfo->ri_NumIndices > 0)
	{
4568
		ExecStoreGenericTuple(&newtup, ec->slot, false);
B
Bruce Momjian 已提交
4569
		ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
4570
		ResetPerTupleExprContext(ec->estate);
B
Bruce Momjian 已提交
4571 4572 4573
	}
}

V
Vadim B. Mikheev 已提交
4574
/*
B
Bruce Momjian 已提交
4575
 *	vacuum_heap() -- free dead tuples
V
Vadim B. Mikheev 已提交
4576
 *
4577 4578
 *		This routine marks dead tuples as unused and truncates relation
 *		if there are "empty" end-blocks.
V
Vadim B. Mikheev 已提交
4579 4580
 */
static void
B
Bruce Momjian 已提交
4581
vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
V
Vadim B. Mikheev 已提交
4582
{
4583 4584
	MIRROREDLOCK_BUFMGR_DECLARE;

4585
	Buffer		buf;
B
Bruce Momjian 已提交
4586
	VacPage    *vacpage;
4587
	int			nblocks;
4588
	int			i;
4589

4590 4591 4592
	// Fetch gp_persistent_relation_node information that will be added to XLOG record.
	RelationFetchGpRelationNodeForXLog(onerel);

B
Bruce Momjian 已提交
4593
	nblocks = vacuum_pages->num_pages;
B
Bruce Momjian 已提交
4594
	nblocks -= vacuum_pages->empty_end_pages;	/* nothing to do with them */
4595

B
Bruce Momjian 已提交
4596
	for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
4597
	{
4598 4599
		vacuum_delay_point();

B
Bruce Momjian 已提交
4600
		if ((*vacpage)->offsets_free > 0)
4601
		{
4602 4603 4604 4605

			// -------- MirroredLock ----------
			MIRROREDLOCK_BUFMGR_LOCK;

4606 4607 4608
			buf = ReadBufferWithStrategy(onerel,
										 (*vacpage)->blkno,
										 vac_strategy);
4609 4610
			LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
			vacuum_page(onerel, buf, *vacpage);
4611
			UnlockReleaseBuffer(buf);
V
Vadim B. Mikheev 已提交
4612

4613 4614 4615 4616
			MIRROREDLOCK_BUFMGR_UNLOCK;
			// -------- MirroredLock ----------

		}
4617
	}
B
Bruce Momjian 已提交
4618
}
V
Vadim B. Mikheev 已提交
4619 4620

/*
B
Bruce Momjian 已提交
4621
 *	vacuum_page() -- free dead tuples on a page
4622
 *					 and repair its fragmentation.
4623 4624
 *
 * Caller must hold pin and lock on buffer.
V
Vadim B. Mikheev 已提交
4625 4626
 */
static void
4627
vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
V
Vadim B. Mikheev 已提交
4628
{
B
Bruce Momjian 已提交
4629 4630
	Page		page = BufferGetPage(buffer);
	int			i;
4631

4632 4633
	MIRROREDLOCK_BUFMGR_MUST_ALREADY_BE_HELD;

4634
	/* There shouldn't be any tuples moved onto the page yet! */
B
Bruce Momjian 已提交
4635
	Assert(vacpage->offsets_used == 0);
4636

4637
	START_CRIT_SECTION();
4638

B
Bruce Momjian 已提交
4639
	for (i = 0; i < vacpage->offsets_free; i++)
V
Vadim B. Mikheev 已提交
4640
	{
4641 4642
		ItemId		itemid = PageGetItemId(page, vacpage->offsets[i]);

4643
		ItemIdSetUnused(itemid);
V
Vadim B. Mikheev 已提交
4644
	}
4645

4646
	PageRepairFragmentation(page);
4647

4648 4649
	MarkBufferDirty(buffer);

4650 4651
	/* XLOG stuff */
	if (!onerel->rd_istemp)
4652
	{
4653
		XLogRecPtr	recptr;
B
Bruce Momjian 已提交
4654

4655 4656 4657 4658
		recptr = log_heap_clean(onerel, buffer,
								NULL, 0, NULL, 0,
								vacpage->offsets, vacpage->offsets_free,
								false);
4659
		PageSetLSN(page, recptr);
4660
		PageSetTLI(page, ThisTimeLineID);
4661
	}
4662

4663
	END_CRIT_SECTION();
B
Bruce Momjian 已提交
4664
}
4665

4666
/*
4667
 *	scan_index() -- scan one index relation to update pg_class statistics.
4668 4669
 *
 * We use this when we have no deletions to do.
4670 4671
 */
static void
4672
scan_index(Relation indrel, double num_tuples, List *updated_stats, bool isfull, bool check_stats)
4673
{
4674
	IndexBulkDeleteResult *stats;
4675
	IndexVacuumInfo ivinfo;
4676
	PGRUsage	ru0;
4677

4678
	pg_rusage_init(&ru0);
4679

4680
	ivinfo.index = indrel;
4681
	ivinfo.vacuum_full = isfull;
4682 4683
	ivinfo.message_level = elevel;
	ivinfo.num_heap_tuples = num_tuples;
4684
	ivinfo.strategy = vac_strategy;
4685

4686
	stats = index_vacuum_cleanup(&ivinfo, NULL);
4687

4688 4689
	if (!stats)
		return;
4690

4691
	/* now update statistics in pg_class */
H
Heikki Linnakangas 已提交
4692
	vac_update_relstats_from_list(indrel,
4693
						stats->num_pages, stats->num_index_tuples,
4694
						false, InvalidTransactionId, updated_stats);
4695

4696
	ereport(elevel,
B
Bruce Momjian 已提交
4697 4698
			(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
					RelationGetRelationName(indrel),
4699
					stats->num_index_tuples,
B
Bruce Momjian 已提交
4700 4701 4702 4703 4704
					stats->num_pages),
	errdetail("%u index pages have been deleted, %u are currently reusable.\n"
			  "%s.",
			  stats->pages_deleted, stats->pages_free,
			  pg_rusage_show(&ru0))));
4705

4706
	/*
B
Bruce Momjian 已提交
4707 4708
	 * Check for tuple count mismatch.	If the index is partial, then it's OK
	 * for it to have fewer tuples than the heap; else we got trouble.
4709
	 */
4710
	if (check_stats && stats->num_index_tuples != num_tuples)
4711
	{
4712
		if (stats->num_index_tuples > num_tuples ||
4713
			!vac_is_partial_index(indrel))
4714
			ereport(WARNING,
4715
					(errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
4716 4717 4718
							RelationGetRelationName(indrel),
							stats->num_index_tuples, num_tuples),
					 errhint("Rebuild the index with REINDEX.")));
4719
	}
4720 4721

	pfree(stats);
B
Bruce Momjian 已提交
4722
}
4723

4724 4725 4726 4727 4728 4729 4730 4731 4732 4733 4734 4735 4736 4737 4738 4739 4740 4741 4742 4743 4744 4745 4746 4747 4748 4749 4750
/*
 * Vacuums an index on an append-only table.
 *
 * This is called after an append-only segment file compaction to move
 * all tuples from the compacted segment files.
 * The segmentFileList is an
 */
static void
vacuum_appendonly_index(Relation indexRelation,
		AppendOnlyIndexVacuumState* vacuumIndexState,
		List *updated_stats,
		double rel_tuple_count,
		bool isfull)
{
	Assert(RelationIsValid(indexRelation));
	Assert(vacuumIndexState);

	IndexBulkDeleteResult *stats;
	IndexVacuumInfo ivinfo;
	PGRUsage	ru0;

	pg_rusage_init(&ru0);

	ivinfo.index = indexRelation;
	ivinfo.vacuum_full = isfull;
	ivinfo.message_level = elevel;
	ivinfo.num_heap_tuples = rel_tuple_count;
4751
	ivinfo.strategy = vac_strategy;
4752 4753 4754 4755 4756 4757 4758 4759 4760 4761 4762 4763

	/* Do bulk deletion */
	stats = index_bulk_delete(&ivinfo, NULL, appendonly_tid_reaped,
			(void *) vacuumIndexState);

	/* Do post-VACUUM cleanup */
	stats = index_vacuum_cleanup(&ivinfo, stats);

	if (!stats)
		return;

	/* now update statistics in pg_class */
H
Heikki Linnakangas 已提交
4764
	vac_update_relstats_from_list(indexRelation,
4765 4766 4767 4768 4769 4770 4771 4772 4773 4774 4775 4776 4777 4778 4779 4780 4781 4782 4783
						stats->num_pages, stats->num_index_tuples,
						false, InvalidTransactionId, updated_stats);

	ereport(elevel,
			(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
					RelationGetRelationName(indexRelation),
					stats->num_index_tuples,
					stats->num_pages),
			 errdetail("%.0f index row versions were removed.\n"
			 "%u index pages have been deleted, %u are currently reusable.\n"
					   "%s.",
					   stats->tuples_removed,
					   stats->pages_deleted, stats->pages_free,
					   pg_rusage_show(&ru0))));

	pfree(stats);

}

4784
/*
B
Bruce Momjian 已提交
4785
 *	vacuum_index() -- vacuum one index relation.
4786
 *
B
Bruce Momjian 已提交
4787
 *		Vpl is the VacPageList of the heap we're currently vacuuming.
4788
 *		It's locked. Indrel is an index relation on the vacuumed heap.
4789 4790 4791
 *
 *		We don't bother to set locks on the index relation here, since
 *		the parent table is exclusive-locked already.
4792
 *
4793 4794
 *		Finally, we arrange to update the index relation's statistics in
 *		pg_class.
4795 4796
 */
static void
4797
vacuum_index(VacPageList vacpagelist, Relation indrel,
4798
			 double num_tuples, int keep_tuples, List *updated_stats,
4799
			 bool check_stats)
4800
{
4801
	IndexBulkDeleteResult *stats;
4802
	IndexVacuumInfo ivinfo;
4803
	PGRUsage	ru0;
4804

4805
	pg_rusage_init(&ru0);
4806

4807 4808 4809 4810
	ivinfo.index = indrel;
	ivinfo.vacuum_full = true;
	ivinfo.message_level = elevel;
	ivinfo.num_heap_tuples = num_tuples + keep_tuples;
4811
	ivinfo.strategy = vac_strategy;
4812

4813
	/* Do bulk deletion */
4814
	stats = index_bulk_delete(&ivinfo, NULL, tid_reaped, (void *) vacpagelist);
4815

4816
	/* Do post-VACUUM cleanup */
4817
	stats = index_vacuum_cleanup(&ivinfo, stats);
4818

4819 4820
	if (!stats)
		return;
4821

4822
	/* now update statistics in pg_class */
H
Heikki Linnakangas 已提交
4823
	vac_update_relstats_from_list(indrel,
4824
						stats->num_pages, stats->num_index_tuples,
4825
						false, InvalidTransactionId, updated_stats);
4826

4827
	ereport(elevel,
B
Bruce Momjian 已提交
4828 4829 4830 4831 4832 4833 4834 4835 4836 4837
			(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
					RelationGetRelationName(indrel),
					stats->num_index_tuples,
					stats->num_pages),
			 errdetail("%.0f index row versions were removed.\n"
			 "%u index pages have been deleted, %u are currently reusable.\n"
					   "%s.",
					   stats->tuples_removed,
					   stats->pages_deleted, stats->pages_free,
					   pg_rusage_show(&ru0))));
4838

4839
	/*
B
Bruce Momjian 已提交
4840 4841
	 * Check for tuple count mismatch.	If the index is partial, then it's OK
	 * for it to have fewer tuples than the heap; else we got trouble.
4842
	 */
4843
	if (check_stats && stats->num_index_tuples != num_tuples + keep_tuples)
4844
	{
4845
		if (stats->num_index_tuples > num_tuples + keep_tuples ||
4846
			!vac_is_partial_index(indrel))
4847
			ereport(WARNING,
4848
					(errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
4849
							RelationGetRelationName(indrel),
B
Bruce Momjian 已提交
4850
						  stats->num_index_tuples, num_tuples + keep_tuples),
4851
					 errhint("Rebuild the index with REINDEX.")));
4852
	}
4853 4854

	pfree(stats);
B
Bruce Momjian 已提交
4855
}
V
Vadim B. Mikheev 已提交
4856

4857 4858 4859 4860 4861 4862 4863 4864 4865 4866 4867 4868 4869 4870 4871 4872 4873 4874 4875 4876 4877 4878 4879 4880 4881 4882 4883 4884 4885 4886 4887 4888 4889 4890 4891 4892 4893 4894 4895 4896 4897 4898 4899 4900 4901 4902 4903 4904 4905 4906 4907 4908 4909 4910 4911 4912 4913 4914 4915 4916 4917 4918
static bool
appendonly_tid_reapded_check_block_directory(AppendOnlyIndexVacuumState* vacuumState,
		AOTupleId* aoTupleId)
{
	if (vacuumState->blockDirectory.currentSegmentFileNum ==
			AOTupleIdGet_segmentFileNum(aoTupleId) &&
			AppendOnlyBlockDirectoryEntry_RangeHasRow(&vacuumState->blockDirectoryEntry,
				AOTupleIdGet_rowNum(aoTupleId)))
	{
		return true;
	}

	if (!AppendOnlyBlockDirectory_GetEntry(&vacuumState->blockDirectory,
		aoTupleId,
		0,
		&vacuumState->blockDirectoryEntry))
	{
		return false;
	}
	return (vacuumState->blockDirectory.currentSegmentFileNum ==
			AOTupleIdGet_segmentFileNum(aoTupleId) &&
			AppendOnlyBlockDirectoryEntry_RangeHasRow(&vacuumState->blockDirectoryEntry,
				AOTupleIdGet_rowNum(aoTupleId)));
}

/*
 * appendonly_tid_reaped()
 *
 * Is a particular tid for an appendonly reaped?
 * state should contain an integer list of all compacted
 * segment files.
 *
 * This has the right signature to be an IndexBulkDeleteCallback.
 */
static bool
appendonly_tid_reaped(ItemPointer itemptr, void *state)
{
	AOTupleId* aoTupleId;
	AppendOnlyIndexVacuumState* vacuumState;
	bool reaped;

	Assert(itemptr);
	Assert(state);

	aoTupleId = (AOTupleId *)itemptr;
	vacuumState = (AppendOnlyIndexVacuumState *)state;

	reaped = !appendonly_tid_reapded_check_block_directory(vacuumState,
			aoTupleId);
	if (!reaped)
	{
		/* Also check visi map */
		reaped = !AppendOnlyVisimap_IsVisible(&vacuumState->visiMap,
		aoTupleId);
	}

	elogif(Debug_appendonly_print_compaction, DEBUG3,
			"Index vacuum %s %d",
			AOTupleIdToString(aoTupleId), reaped);
	return reaped;
}

V
Vadim B. Mikheev 已提交
4919
/*
B
Bruce Momjian 已提交
4920
 *	tid_reaped() -- is a particular tid reaped?
V
Vadim B. Mikheev 已提交
4921
 *
4922 4923
 *		This has the right signature to be an IndexBulkDeleteCallback.
 *
B
Bruce Momjian 已提交
4924
 *		vacpagelist->VacPage_array is sorted in right order.
V
Vadim B. Mikheev 已提交
4925
 */
4926 4927
static bool
tid_reaped(ItemPointer itemptr, void *state)
V
Vadim B. Mikheev 已提交
4928
{
4929
	VacPageList vacpagelist = (VacPageList) state;
4930 4931
	OffsetNumber ioffno;
	OffsetNumber *voff;
B
Bruce Momjian 已提交
4932
	VacPage		vp,
4933
			   *vpp;
B
Bruce Momjian 已提交
4934
	VacPageData vacpage;
V
Vadim B. Mikheev 已提交
4935

B
Bruce Momjian 已提交
4936
	vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
4937
	ioffno = ItemPointerGetOffsetNumber(itemptr);
V
Vadim B. Mikheev 已提交
4938

B
Bruce Momjian 已提交
4939
	vp = &vacpage;
4940 4941 4942 4943
	vpp = (VacPage *) vac_bsearch((void *) &vp,
								  (void *) (vacpagelist->pagedesc),
								  vacpagelist->num_pages,
								  sizeof(VacPage),
B
Bruce Momjian 已提交
4944
								  vac_cmp_blk);
V
Vadim B. Mikheev 已提交
4945

4946 4947
	if (vpp == NULL)
		return false;
4948

4949 4950
	/* ok - we are on a partially or fully reaped page */
	vp = *vpp;
4951

B
Bruce Momjian 已提交
4952
	if (vp->offsets_free == 0)
4953 4954
	{
		/* this is EmptyPage, so claim all tuples on it are reaped!!! */
4955
		return true;
4956 4957
	}

4958 4959 4960 4961
	voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
										(void *) (vp->offsets),
										vp->offsets_free,
										sizeof(OffsetNumber),
B
Bruce Momjian 已提交
4962
										vac_cmp_offno);
4963

4964 4965
	if (voff == NULL)
		return false;
4966

4967
	/* tid is reaped */
4968
	return true;
B
Bruce Momjian 已提交
4969
}
4970

4971 4972 4973 4974 4975 4976 4977 4978 4979
/*
 * Update the shared Free Space Map with the info we now have about
 * free space in the relation, discarding any old info the map may have.
 */
static void
vac_update_fsm(Relation onerel, VacPageList fraged_pages,
			   BlockNumber rel_pages)
{
	int			nPages = fraged_pages->num_pages;
4980 4981
	VacPage    *pagedesc = fraged_pages->pagedesc;
	Size		threshold;
4982
	PageFreeSpaceInfo *pageSpaces;
4983 4984 4985 4986 4987
	int			outPages;
	int			i;

	/*
	 * We only report pages with free space at least equal to the average
B
Bruce Momjian 已提交
4988 4989 4990 4991 4992 4993 4994
	 * request size --- this avoids cluttering FSM with uselessly-small bits
	 * of space.  Although FSM would discard pages with little free space
	 * anyway, it's important to do this prefiltering because (a) it reduces
	 * the time spent holding the FSM lock in RecordRelationFreeSpace, and (b)
	 * FSM uses the number of pages reported as a statistic for guiding space
	 * management.	If we didn't threshold our reports the same way
	 * vacuumlazy.c does, we'd be skewing that statistic.
4995 4996
	 */
	threshold = GetAvgFSMRequestSize(&onerel->rd_node);
4997

4998
	pageSpaces = (PageFreeSpaceInfo *)
4999
		palloc(nPages * sizeof(PageFreeSpaceInfo));
5000
	outPages = 0;
5001 5002 5003 5004

	for (i = 0; i < nPages; i++)
	{
		/*
B
Bruce Momjian 已提交
5005 5006 5007
		 * fraged_pages may contain entries for pages that we later decided to
		 * truncate from the relation; don't enter them into the free space
		 * map!
5008
		 */
5009
		if (pagedesc[i]->blkno >= rel_pages)
5010
			break;
5011 5012 5013 5014 5015 5016

		if (pagedesc[i]->free >= threshold)
		{
			pageSpaces[outPages].blkno = pagedesc[i]->blkno;
			pageSpaces[outPages].avail = pagedesc[i]->free;
			outPages++;
5017 5018 5019
		}
	}

5020
	RecordRelationFreeSpace(&onerel->rd_node, outPages, outPages, pageSpaces);
5021 5022

	pfree(pageSpaces);
5023 5024
}

5025 5026 5027
/* Copy a VacPage structure */
static VacPage
copy_vac_page(VacPage vacpage)
5028
{
B
Bruce Momjian 已提交
5029
	VacPage		newvacpage;
5030

B
Bruce Momjian 已提交
5031
	/* allocate a VacPageData entry */
5032
	newvacpage = (VacPage) palloc(sizeof(VacPageData) +
B
Bruce Momjian 已提交
5033
							   vacpage->offsets_free * sizeof(OffsetNumber));
5034

5035
	/* fill it in */
B
Bruce Momjian 已提交
5036
	if (vacpage->offsets_free > 0)
5037 5038
		memcpy(newvacpage->offsets, vacpage->offsets,
			   vacpage->offsets_free * sizeof(OffsetNumber));
B
Bruce Momjian 已提交
5039 5040 5041 5042
	newvacpage->blkno = vacpage->blkno;
	newvacpage->free = vacpage->free;
	newvacpage->offsets_used = vacpage->offsets_used;
	newvacpage->offsets_free = vacpage->offsets_free;
5043

5044
	return newvacpage;
B
Bruce Momjian 已提交
5045
}
V
Vadim B. Mikheev 已提交
5046

5047 5048 5049 5050 5051 5052 5053
/*
 * Add a VacPage pointer to a VacPageList.
 *
 *		As a side effect of the way that scan_heap works,
 *		higher pages come after lower pages in the array
 *		(and highest tid on a page is last).
 */
B
Bruce Momjian 已提交
5054 5055
static void
vpage_insert(VacPageList vacpagelist, VacPage vpnew)
V
Vadim B. Mikheev 已提交
5056
{
T
Tatsuo Ishii 已提交
5057
#define PG_NPAGEDESC 1024
V
Vadim B. Mikheev 已提交
5058

B
Bruce Momjian 已提交
5059 5060
	/* allocate a VacPage entry if needed */
	if (vacpagelist->num_pages == 0)
T
Tatsuo Ishii 已提交
5061
	{
B
Bruce Momjian 已提交
5062 5063
		vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
		vacpagelist->num_allocated_pages = PG_NPAGEDESC;
T
Tatsuo Ishii 已提交
5064
	}
B
Bruce Momjian 已提交
5065
	else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
T
Tatsuo Ishii 已提交
5066
	{
B
Bruce Momjian 已提交
5067 5068
		vacpagelist->num_allocated_pages *= 2;
		vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
T
Tatsuo Ishii 已提交
5069
	}
B
Bruce Momjian 已提交
5070 5071
	vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
	(vacpagelist->num_pages)++;
5072 5073
}

5074 5075 5076
/*
 * vac_bsearch: just like standard C library routine bsearch(),
 * except that we first test to see whether the target key is outside
5077
 * the range of the table entries.	This case is handled relatively slowly
5078 5079 5080
 * by the normal binary search algorithm (ie, no faster than any other key)
 * but it occurs often enough in VACUUM to be worth optimizing.
 */
5081
static void *
5082 5083
vac_bsearch(const void *key, const void *base,
			size_t nelem, size_t size,
B
Bruce Momjian 已提交
5084
			int (*compar) (const void *, const void *))
5085
{
5086
	int			res;
5087 5088 5089 5090 5091 5092 5093 5094 5095 5096
	const void *last;

	if (nelem == 0)
		return NULL;
	res = compar(key, base);
	if (res < 0)
		return NULL;
	if (res == 0)
		return (void *) base;
	if (nelem > 1)
5097
	{
5098 5099 5100
		last = (const void *) ((const char *) base + (nelem - 1) * size);
		res = compar(key, last);
		if (res > 0)
5101
			return NULL;
5102 5103
		if (res == 0)
			return (void *) last;
5104
	}
5105 5106 5107
	if (nelem <= 2)
		return NULL;			/* already checked 'em all */
	return bsearch(key, base, nelem, size, compar);
B
Bruce Momjian 已提交
5108
}
5109

5110 5111 5112
/*
 * Comparator routines for use with qsort() and bsearch().
 */
5113
static int
B
Bruce Momjian 已提交
5114
vac_cmp_blk(const void *left, const void *right)
5115
{
5116 5117
	BlockNumber lblk,
				rblk;
5118

B
Bruce Momjian 已提交
5119 5120
	lblk = (*((VacPage *) left))->blkno;
	rblk = (*((VacPage *) right))->blkno;
5121

5122
	if (lblk < rblk)
5123
		return -1;
5124
	if (lblk == rblk)
5125 5126
		return 0;
	return 1;
B
Bruce Momjian 已提交
5127
}
5128

5129
static int
B
Bruce Momjian 已提交
5130
vac_cmp_offno(const void *left, const void *right)
5131
{
5132
	if (*(OffsetNumber *) left < *(OffsetNumber *) right)
5133
		return -1;
5134
	if (*(OffsetNumber *) left == *(OffsetNumber *) right)
5135 5136
		return 0;
	return 1;
B
Bruce Momjian 已提交
5137
}
V
Vadim B. Mikheev 已提交
5138

5139
static int
B
Bruce Momjian 已提交
5140
vac_cmp_vtlinks(const void *left, const void *right)
5141
{
B
Bruce Momjian 已提交
5142 5143
	if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
		((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
5144
		return -1;
B
Bruce Momjian 已提交
5145 5146
	if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
		((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
5147 5148
		return 1;
	/* bi_hi-es are equal */
B
Bruce Momjian 已提交
5149 5150
	if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
		((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
5151
		return -1;
B
Bruce Momjian 已提交
5152 5153
	if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
		((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
5154 5155
		return 1;
	/* bi_lo-es are equal */
B
Bruce Momjian 已提交
5156 5157
	if (((VTupleLink) left)->new_tid.ip_posid <
		((VTupleLink) right)->new_tid.ip_posid)
5158
		return -1;
B
Bruce Momjian 已提交
5159 5160
	if (((VTupleLink) left)->new_tid.ip_posid >
		((VTupleLink) right)->new_tid.ip_posid)
5161 5162 5163
		return 1;
	return 0;
}
V
Vadim B. Mikheev 已提交
5164

5165

5166
/*
5167 5168 5169 5170 5171 5172 5173 5174 5175 5176
 * Open all the vacuumable indexes of the given relation, obtaining the
 * specified kind of lock on each.	Return an array of Relation pointers for
 * the indexes into *Irel, and the number of indexes into *nindexes.
 *
 * We consider an index vacuumable if it is marked insertable (IndexIsReady).
 * If it isn't, probably a CREATE INDEX CONCURRENTLY command failed early in
 * execution, and what we have is too corrupt to be processable.  We will
 * vacuum even if the index isn't indisvalid; this is important because in a
 * unique index, uniqueness checks will be performed anyway and had better not
 * hit dangling index pointers.
5177
 */
5178
void
5179 5180
vac_open_indexes(Relation relation, LOCKMODE lockmode,
				 int *nindexes, Relation **Irel)
V
Vadim B. Mikheev 已提交
5181
{
5182 5183
	List	   *indexoidlist;
	ListCell   *indexoidscan;
5184
	int			i;
V
Vadim B. Mikheev 已提交
5185

5186 5187
	Assert(lockmode != NoLock);

5188
	indexoidlist = RelationGetIndexList(relation);
5189

5190 5191
	/* allocate enough memory for all indexes */
	i = list_length(indexoidlist);
5192

5193 5194
	if (i > 0)
		*Irel = (Relation *) palloc(i * sizeof(Relation));
5195 5196
	else
		*Irel = NULL;
5197

5198
	/* collect just the ready indexes */
5199 5200
	i = 0;
	foreach(indexoidscan, indexoidlist)
5201
	{
5202
		Oid			indexoid = lfirst_oid(indexoidscan);
5203
		Relation	indrel;
V
Vadim B. Mikheev 已提交
5204

5205 5206 5207 5208 5209
		indrel = index_open(indexoid, lockmode);
		if (IndexIsReady(indrel->rd_index))
			(*Irel)[i++] = indrel;
		else
			index_close(indrel, lockmode);
5210 5211
	}

5212 5213
	*nindexes = i;

5214
	list_free(indexoidlist);
B
Bruce Momjian 已提交
5215
}
V
Vadim B. Mikheev 已提交
5216

5217
/*
B
Bruce Momjian 已提交
5218
 * Release the resources acquired by vac_open_indexes.	Optionally release
5219 5220
 * the locks (say NoLock to keep 'em).
 */
5221
void
5222
vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
V
Vadim B. Mikheev 已提交
5223
{
5224
	if (Irel == NULL)
5225
		return;
V
Vadim B. Mikheev 已提交
5226

5227
	while (nindexes--)
5228 5229 5230
	{
		Relation	ind = Irel[nindexes];

5231
		index_close(ind, lockmode);
5232
	}
5233
	pfree(Irel);
B
Bruce Momjian 已提交
5234
}
V
Vadim B. Mikheev 已提交
5235 5236


5237 5238 5239 5240 5241
/*
 * Is an index partial (ie, could it contain fewer tuples than the heap?)
 */
bool
vac_is_partial_index(Relation indrel)
V
Vadim B. Mikheev 已提交
5242
{
5243
	/*
B
Bruce Momjian 已提交
5244
	 * If the index's AM doesn't support nulls, it's partial for our purposes
5245
	 */
5246
	if (!indrel->rd_am->amindexnulls)
5247 5248 5249
		return true;

	/* Otherwise, look to see if there's a partial-index predicate */
5250 5251 5252 5253
	if (!heap_attisnull(indrel->rd_indextuple, Anum_pg_index_indpred))
		return true;

	return false;
B
Bruce Momjian 已提交
5254
}
5255 5256


5257
static bool
B
Bruce Momjian 已提交
5258
enough_space(VacPage vacpage, Size len)
V
Vadim B. Mikheev 已提交
5259
{
5260
	len = MAXALIGN(len);
5261

B
Bruce Momjian 已提交
5262
	if (len > vacpage->free)
5263
		return false;
5264

5265 5266 5267
	/* if there are free itemid(s) and len <= free_space... */
	if (vacpage->offsets_used < vacpage->offsets_free)
		return true;
5268

5269 5270
	/* noff_used >= noff_free and so we'll have to allocate new itemid */
	if (len + sizeof(ItemIdData) <= vacpage->free)
5271
		return true;
5272

5273
	return false;
B
Bruce Momjian 已提交
5274
}
5275

B
Bruce Momjian 已提交
5276 5277 5278
static Size
PageGetFreeSpaceWithFillFactor(Relation relation, Page page)
{
5279 5280 5281 5282 5283 5284 5285 5286 5287 5288 5289 5290 5291
	/*
	 * It is correct to use PageGetExactFreeSpace() here, *not*
	 * PageGetHeapFreeSpace().  This is because (a) we do our own, exact
	 * accounting for whether line pointers must be added, and (b) we will
	 * recycle any LP_DEAD line pointers before starting to add rows to a
	 * page, but that may not have happened yet at the time this function is
	 * applied to a page, which means PageGetHeapFreeSpace()'s protection
	 * against too many line pointers on a page could fire incorrectly.  We do
	 * not need that protection here: since VACUUM FULL always recycles all
	 * dead line pointers first, it'd be physically impossible to insert more
	 * than MaxHeapTuplesPerPage tuples anyway.
	 */
	Size		freespace = PageGetExactFreeSpace(page);
5292
	Size		targetfree;
B
Bruce Momjian 已提交
5293

5294 5295 5296 5297
	targetfree = RelationGetTargetPageFreeSpace(relation,
												HEAP_DEFAULT_FILLFACTOR);
	if (freespace > targetfree)
		return freespace - targetfree;
B
Bruce Momjian 已提交
5298 5299 5300
	else
		return 0;
}
5301

5302 5303 5304 5305 5306 5307 5308 5309 5310 5311 5312 5313 5314 5315 5316 5317
/*
 * vacuum_delay_point --- check for interrupts and cost-based delay.
 *
 * This should be called in each major loop of VACUUM processing,
 * typically once per page processed.
 */
void
vacuum_delay_point(void)
{
	/* Always check for interrupts */
	CHECK_FOR_INTERRUPTS();

	/* Nap if appropriate */
	if (VacuumCostActive && !InterruptPending &&
		VacuumCostBalance >= VacuumCostLimit)
	{
B
Bruce Momjian 已提交
5318
		int			msec;
5319

5320 5321 5322
		msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit;
		if (msec > VacuumCostDelay * 4)
			msec = VacuumCostDelay * 4;
5323 5324 5325 5326 5327

		pg_usleep(msec * 1000L);

		VacuumCostBalance = 0;

5328 5329 5330
		/* update balance values for workers */
		AutoVacuumUpdateDelay();

5331 5332 5333 5334
		/* Might have gotten an interrupt while sleeping */
		CHECK_FOR_INTERRUPTS();
	}
}
5335 5336 5337 5338 5339 5340 5341

/*
 * Dispatch a Vacuum command.
 */
static void
dispatchVacuum(VacuumStmt *vacstmt, VacuumStatsContext *ctx)
{
5342
	CdbPgResults cdb_pgresults;
5343 5344 5345 5346 5347 5348 5349 5350

	/* should these be marked volatile ? */

	Assert(Gp_role == GP_ROLE_DISPATCH);
	Assert(vacstmt);
	Assert(vacstmt->vacuum);
	Assert(!vacstmt->analyze);

5351
	/* XXX: Some kinds of VACUUM assign a new relfilenode. bitmap indexes maybe? */
5352
	CdbDispatchUtilityStatement((Node *) vacstmt,
5353 5354 5355 5356 5357
								DF_CANCEL_ON_ERROR|
								DF_WITH_SNAPSHOT|
								DF_NEED_TWO_PHASE,
								GetAssignedOidsForDispatch(),
								&cdb_pgresults);
5358

5359
	vacuum_combine_stats(ctx, &cdb_pgresults);
5360

5361
	cdbdisp_clearCdbPgResults(&cdb_pgresults);
5362 5363 5364 5365 5366 5367 5368 5369 5370 5371 5372 5373 5374 5375 5376 5377 5378 5379 5380 5381 5382 5383 5384 5385 5386 5387 5388 5389
}

/*
 * open_relation_and_check_permission -- open the relation with an appropriate
 * lock based on the vacuum statement, and check for the permissions on this
 * relation.
 */
static Relation
open_relation_and_check_permission(VacuumStmt *vacstmt,
								   Oid relid,
								   char expected_relkind,
								   bool isDropTransaction)
{
	Relation onerel;
	LOCKMODE lmode;

	/*
	 * If this is a drop transaction and there is another parallel drop transaction
	 * (on any relation) active. We drop out there. The other drop transaction
	 * might be on the same relation and that would be upgrade deadlock.
	 *
	 * Note: By the time we would have reached try_relation_open the other
	 * drop transaction might already be completed, but we don't take that
	 * risk here.
	 *
	 * My marking the drop transaction as busy before checking, the worst
	 * thing that can happen is that both transaction see each other and
	 * both cancel the drop.
5390 5391 5392 5393
	 *
	 * The upgrade deadlock is not applicable to vacuum full because
	 * it begins with an AccessExclusive lock and doesn't need to
	 * upgrade it.
5394 5395
	 */

5396
	if (isDropTransaction && !vacstmt->full)
5397 5398 5399 5400 5401 5402 5403 5404 5405 5406 5407 5408 5409 5410 5411 5412 5413 5414 5415 5416 5417 5418 5419 5420 5421 5422 5423 5424 5425 5426 5427 5428 5429 5430 5431 5432 5433 5434 5435 5436 5437 5438 5439 5440 5441 5442 5443 5444 5445 5446 5447 5448 5449 5450 5451 5452 5453 5454 5455 5456
	{
		MyProc->inDropTransaction = true;
		if (HasDropTransaction(false))
		{
			elogif(Debug_appendonly_print_compaction, LOG,
					"Skip drop because of concurrent drop transaction");

			return NULL;
		}
	}

	/*
	 * Determine the type of lock we want --- hard exclusive lock for a FULL
	 * vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either
	 * way, we can be sure that no other backend is vacuuming the same table.
	 * For analyze, we use ShareUpdateExclusiveLock.
	 */
	if (isDropTransaction)
		lmode = AccessExclusiveLock;
	else if (!vacstmt->vacuum)
		lmode = ShareUpdateExclusiveLock;
	else
		lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;

	/*
	 * Open the relation and get the appropriate lock on it.
	 *
	 * There's a race condition here: the rel may have gone away since the
	 * last time we saw it.  If so, we don't need to vacuum it.
	 */
	onerel = try_relation_open(relid, lmode, false);

	if (!onerel)
		return NULL;

	/*
	 * Check permissions.
	 *
	 * We allow the user to vacuum a table if he is superuser, the table
	 * owner, or the database owner (but in the latter case, only if it's not
	 * a shared relation).	pg_class_ownercheck includes the superuser case.
	 *
	 * Note we choose to treat permissions failure as a WARNING and keep
	 * trying to vacuum the rest of the DB --- is this appropriate?
	 */
	if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
		  (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
	{
		if (Gp_role != GP_ROLE_EXECUTE)
			ereport(WARNING,
					(errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
							RelationGetRelationName(onerel))));
		relation_close(onerel, lmode);
		return NULL;
	}

	/*
	 * Check that it's a plain table; we used to do this in get_rel_oids() but
	 * seems safer to check after we've locked the relation.
	 */
5457 5458
	if (onerel->rd_rel->relkind != expected_relkind ||
		RelationIsExternal(onerel) ||
5459
		(vacstmt->full && GpPersistent_IsPersistentRelation(RelationGetRelid(onerel))))
5460 5461 5462 5463 5464 5465 5466 5467 5468 5469 5470 5471 5472 5473 5474 5475 5476 5477 5478 5479 5480 5481 5482 5483 5484 5485 5486 5487 5488 5489 5490 5491 5492 5493
	{
		ereport(WARNING,
				(errmsg("skipping \"%s\" --- cannot vacuum indexes, views, external tables, or special system tables",
						RelationGetRelationName(onerel))));
		relation_close(onerel, lmode);
		return NULL;
	}

	/*
	 * Silently ignore tables that are temp tables of other backends ---
	 * trying to vacuum these will lead to great unhappiness, since their
	 * contents are probably not up-to-date on disk.  (We don't throw a
	 * warning here; it would just lead to chatter during a database-wide
	 * VACUUM.)
	 */
	if (isOtherTempNamespace(RelationGetNamespace(onerel)))
	{
		relation_close(onerel, lmode);
		return NULL;
	}

	/*
	 * We can ANALYZE any table except pg_statistic. See update_attstats
	 */
	if (vacstmt->analyze && RelationGetRelid(onerel) == StatisticRelationId)
	{
		relation_close(onerel, ShareUpdateExclusiveLock);
		return NULL;
	}

	return onerel;
}

/*
5494
 * vacuum_combine_stats
5495 5496 5497 5498 5499
 * This function combine the stats information sent by QEs to generate
 * the final stats for QD relations.
 *
 * Note that the mirrorResults is ignored by this function.
 */
5500
static void
5501
vacuum_combine_stats(VacuumStatsContext *stats_context, CdbPgResults* cdb_pgresults)
5502 5503 5504 5505 5506
{
	int result_no;

	Assert(Gp_role == GP_ROLE_DISPATCH);

5507
	if (cdb_pgresults == NULL || cdb_pgresults->numResults <= 0)
5508 5509 5510 5511 5512 5513 5514 5515 5516 5517 5518 5519 5520
		return;

	/*
	 * Process the dispatch results from the primary. Note that the QE
	 * processes also send back the new stats info, such as stats on
	 * pg_class, for the relevant table and its
	 * indexes. We parse this information, and compute the final stats
	 * for the QD.
	 *
	 * For pg_class stats, we compute the maximum number of tuples and
	 * maximum number of pages after processing the stats from each QE.
	 *
	 */
5521
	for(result_no = 0; result_no < cdb_pgresults->numResults; result_no++)
5522
	{
5523

5524
		VPgClassStats *pgclass_stats = NULL;
5525 5526
		ListCell *lc = NULL;
		struct pg_result *pgresult = cdb_pgresults->pg_results[result_no];
5527

5528 5529
		if (pgresult->extras == NULL)
			continue;
5530

5531
		Assert(pgresult->extraslen > sizeof(int));
5532

5533 5534 5535 5536 5537 5538 5539 5540
		/*
		 * Process the stats for pg_class. We simple compute the maximum
		 * number of rel_tuples and rel_pages.
		 */
		pgclass_stats = (VPgClassStats *) pgresult->extras;
		foreach (lc, stats_context->updated_stats)
		{
			VPgClassStats *tmp_stats = (VPgClassStats *) lfirst(lc);
5541

5542
			if (tmp_stats->relid == pgclass_stats->relid)
5543
			{
5544 5545 5546
				tmp_stats->rel_pages += pgclass_stats->rel_pages;
				tmp_stats->rel_tuples += pgclass_stats->rel_tuples;
				break;
5547
			}
5548
		}
5549

5550 5551 5552
		if (lc == NULL)
		{
			Assert(pgresult->extraslen == sizeof(VPgClassStats));
5553

5554 5555
			pgclass_stats = palloc(sizeof(VPgClassStats));
			memcpy(pgclass_stats, pgresult->extras, pgresult->extraslen);
5556

5557 5558
			stats_context->updated_stats =
					lappend(stats_context->updated_stats, pgclass_stats);
5559 5560 5561
		}
	}
}