vacuumlazy.c 32.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12
/*-------------------------------------------------------------------------
 *
 * vacuumlazy.c
 *	  Concurrent ("lazy") vacuuming.
 *
 *
 * The major space usage for LAZY VACUUM is storage for the array of dead
 * tuple TIDs, with the next biggest need being storage for per-disk-page
 * free space info.  We want to ensure we can vacuum even the very largest
 * relations with finite memory space usage.  To do that, we set upper bounds
 * on the number of tuples and pages we will keep track of at once.
 *
13
 * We are willing to use at most maintenance_work_mem memory space to keep
14 15
 * track of dead tuples.  We initially allocate an array of TIDs of that size,
 * with an upper limit that depends on table size (this limit ensures we don't
B
Bruce Momjian 已提交
16
 * allocate a huge area uselessly for vacuuming small tables).	If the array
17 18 19
 * threatens to overflow, we suspend the heap scan phase and perform a pass of
 * index cleanup and page compaction, then resume the heap scan with an empty
 * TID array.
20
 *
21 22 23 24 25
 * If we're processing a table with no indexes, we can just vacuum each page
 * as we go; there's no need to save up multiple tuples to minimize the number
 * of index scans performed.  So we don't use maintenance_work_mem memory for
 * the TID array, just enough to hold as many heap tuples as fit on one page.
 *
26
 *
27
 * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
28 29 30 31
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
32
 *	  $PostgreSQL: pgsql/src/backend/commands/vacuumlazy.c,v 1.112 2008/12/03 13:05:22 heikki Exp $
33 34 35 36 37
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

38 39
#include <math.h>

40 41
#include "access/genam.h"
#include "access/heapam.h"
42
#include "access/transam.h"
43
#include "access/visibilitymap.h"
44
#include "catalog/storage.h"
45
#include "commands/dbcommands.h"
46 47
#include "commands/vacuum.h"
#include "miscadmin.h"
48
#include "pgstat.h"
49
#include "postmaster/autovacuum.h"
50
#include "storage/bufmgr.h"
51
#include "storage/freespace.h"
52
#include "storage/lmgr.h"
53
#include "utils/lsyscache.h"
54
#include "utils/memutils.h"
55
#include "utils/pg_rusage.h"
56
#include "utils/tqual.h"
57 58 59 60 61 62


/*
 * Space/time tradeoff parameters: do these need to be user-tunable?
 *
 * To consider truncating the relation, we want there to be at least
63 64
 * REL_TRUNCATE_MINIMUM or (relsize / REL_TRUNCATE_FRACTION) (whichever
 * is less) potentially-freeable pages.
65
 */
66
#define REL_TRUNCATE_MINIMUM	1000
67 68
#define REL_TRUNCATE_FRACTION	16

69 70 71 72 73
/*
 * Guesstimation of number of dead tuples per page.  This is used to
 * provide an upper limit to memory allocated when vacuuming small
 * tables.
 */
74
#define LAZY_ALLOC_TUPLES		MaxHeapTuplesPerPage
75 76 77

typedef struct LVRelStats
{
78 79
	/* hasindex = true means two-pass strategy; false means one-pass */
	bool		hasindex;
80
	/* Overall statistics about rel */
81
	BlockNumber rel_pages;
82
	double		rel_tuples;
B
Bruce Momjian 已提交
83
	BlockNumber pages_removed;
84
	double		tuples_deleted;
85
	BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
86 87
	/* List of TIDs of tuples we intend to delete */
	/* NB: this list is ordered by TID address */
88 89
	int			num_dead_tuples;	/* current # of entries */
	int			max_dead_tuples;	/* # slots allocated in array */
90
	ItemPointer dead_tuples;	/* array of ItemPointerData */
91
	int			num_index_scans;
92
	bool		scanned_all;	/* have we scanned all pages (this far)? */
93 94 95
} LVRelStats;


96
/* A few variables that don't seem worth passing around as parameters */
B
Bruce Momjian 已提交
97
static int	elevel = -1;
98

99 100 101
static TransactionId OldestXmin;
static TransactionId FreezeLimit;

102 103
static BufferAccessStrategy vac_strategy;

104 105 106

/* non-export function prototypes */
static void lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
107
			   Relation *Irel, int nindexes, bool scan_all);
108
static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
109
static void lazy_vacuum_index(Relation indrel,
B
Bruce Momjian 已提交
110 111
				  IndexBulkDeleteResult **stats,
				  LVRelStats *vacrelstats);
112
static void lazy_cleanup_index(Relation indrel,
B
Bruce Momjian 已提交
113 114
				   IndexBulkDeleteResult *stats,
				   LVRelStats *vacrelstats);
115 116
static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
				 int tupindex, LVRelStats *vacrelstats);
117
static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats);
118
static BlockNumber count_nondeletable_pages(Relation onerel,
119
						 LVRelStats *vacrelstats);
120
static void lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks);
121
static void lazy_record_dead_tuple(LVRelStats *vacrelstats,
122
					   ItemPointer itemptr);
123
static bool lazy_tid_reaped(ItemPointer itemptr, void *state);
124 125 126 127 128 129 130
static int	vac_cmp_itemptr(const void *left, const void *right);


/*
 *	lazy_vacuum_rel() -- perform LAZY VACUUM for one heap relation
 *
 *		This routine vacuums a single heap, cleans out its indexes, and
131
 *		updates its relpages and reltuples statistics.
132 133 134 135 136
 *
 *		At entry, we have already established a transaction and opened
 *		and locked the relation.
 */
void
137 138
lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
				BufferAccessStrategy bstrategy)
139 140 141 142
{
	LVRelStats *vacrelstats;
	Relation   *Irel;
	int			nindexes;
143
	BlockNumber possibly_freeable;
144
	PGRUsage	ru0;
B
Bruce Momjian 已提交
145
	TimestampTz starttime = 0;
146
	bool		scan_all;
147 148 149 150

	pg_rusage_init(&ru0);

	/* measure elapsed time iff autovacuum logging requires it */
151
	if (IsAutoVacuumWorkerProcess() && Log_autovacuum_min_duration > 0)
152
		starttime = GetCurrentTimestamp();
153 154

	if (vacstmt->verbose)
155
		elevel = INFO;
156
	else
157
		elevel = DEBUG2;
B
Bruce Momjian 已提交
158

159 160
	vac_strategy = bstrategy;

161
	vacuum_set_xid_limits(vacstmt->freeze_min_age, onerel->rd_rel->relisshared,
162
						  &OldestXmin, &FreezeLimit);
163

164
	vacrelstats = (LVRelStats *) palloc0(sizeof(LVRelStats));
165

166
	vacrelstats->num_index_scans = 0;
167
	vacrelstats->scanned_all = true;
168

169
	/* Open all indexes of the relation */
170
	vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel);
171
	vacrelstats->hasindex = (nindexes > 0);
172

173 174 175 176 177 178
	/* Should we use the visibility map or scan all pages? */
	if (vacstmt->freeze_min_age != -1)
		scan_all = true;
	else
		scan_all = false;
 
179
	/* Do the vacuuming */
180
	lazy_scan_heap(onerel, vacrelstats, Irel, nindexes, scan_all);
181 182

	/* Done with indexes */
183
	vac_close_indexes(nindexes, Irel, NoLock);
184 185 186 187

	/*
	 * Optionally truncate the relation.
	 *
188 189
	 * Don't even think about it unless we have a shot at releasing a goodly
	 * number of pages.  Otherwise, the time taken isn't worth it.
190 191
	 */
	possibly_freeable = vacrelstats->rel_pages - vacrelstats->nonempty_pages;
192
	if (possibly_freeable >= REL_TRUNCATE_MINIMUM ||
B
Bruce Momjian 已提交
193
		possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION)
194
		lazy_truncate_heap(onerel, vacrelstats);
195

196 197
	/* Vacuum the Free Space Map */
	FreeSpaceMapVacuum(onerel);
198

199 200 201 202
	/*
	 * Update statistics in pg_class. We can only advance relfrozenxid if we
	 * didn't skip any pages.
	 */
203 204
	vac_update_relstats(onerel,
						vacrelstats->rel_pages, vacrelstats->rel_tuples,
205 206
						vacrelstats->hasindex,
						vacrelstats->scanned_all ? FreezeLimit : InvalidOid);
207 208

	/* report results to the stats collector, too */
209
	pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
B
Bruce Momjian 已提交
210
						 vacstmt->analyze, vacrelstats->rel_tuples);
211 212

	/* and log the action if appropriate */
213
	if (IsAutoVacuumWorkerProcess() && Log_autovacuum_min_duration >= 0)
214
	{
215
		if (Log_autovacuum_min_duration == 0 ||
216
			TimestampDifferenceExceeds(starttime, GetCurrentTimestamp(),
217
									   Log_autovacuum_min_duration))
218 219 220 221 222 223 224 225 226
			ereport(LOG,
					(errmsg("automatic vacuum of table \"%s.%s.%s\": index scans: %d\n"
							"pages: %d removed, %d remain\n"
							"tuples: %.0f removed, %.0f remain\n"
							"system usage: %s",
							get_database_name(MyDatabaseId),
							get_namespace_name(RelationGetNamespace(onerel)),
							RelationGetRelationName(onerel),
							vacrelstats->num_index_scans,
B
Bruce Momjian 已提交
227 228
						  vacrelstats->pages_removed, vacrelstats->rel_pages,
						vacrelstats->tuples_deleted, vacrelstats->rel_tuples,
229 230
							pg_rusage_show(&ru0))));
	}
231 232 233 234 235 236 237 238 239
}


/*
 *	lazy_scan_heap() -- scan an open heap relation
 *
 *		This routine sets commit status bits, builds lists of dead tuples
 *		and pages with free space, and calculates statistics on the number
 *		of live tuples in the heap.  When done, or when we run low on space
240
 *		for dead-tuple TIDs, invoke vacuuming of indexes and heap.
241
 *
242 243
 *		If there are no indexes then we just vacuum each dirty page as we
 *		process it, since there's no point in gathering many tuples.
244 245 246
 */
static void
lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
247
			   Relation *Irel, int nindexes, bool scan_all)
248 249 250 251 252
{
	BlockNumber nblocks,
				blkno;
	HeapTupleData tuple;
	char	   *relname;
253
	BlockNumber empty_pages,
254
				scanned_pages,
255
				vacuumed_pages;
256 257 258 259
	double		num_tuples,
				tups_vacuumed,
				nkeep,
				nunused;
260
	IndexBulkDeleteResult **indstats;
261
	int			i;
262
	PGRUsage	ru0;
263
	Buffer		vmbuffer = InvalidBuffer;
264

265
	pg_rusage_init(&ru0);
266 267

	relname = RelationGetRelationName(onerel);
268 269 270 271
	ereport(elevel,
			(errmsg("vacuuming \"%s.%s\"",
					get_namespace_name(RelationGetNamespace(onerel)),
					relname)));
272

273
	empty_pages = vacuumed_pages = scanned_pages = 0;
274 275
	num_tuples = tups_vacuumed = nkeep = nunused = 0;

276 277
	indstats = (IndexBulkDeleteResult **)
		palloc0(nindexes * sizeof(IndexBulkDeleteResult *));
278

279 280 281 282
	nblocks = RelationGetNumberOfBlocks(onerel);
	vacrelstats->rel_pages = nblocks;
	vacrelstats->nonempty_pages = 0;

283
	lazy_space_alloc(vacrelstats, nblocks);
284 285 286 287 288 289 290

	for (blkno = 0; blkno < nblocks; blkno++)
	{
		Buffer		buf;
		Page		page;
		OffsetNumber offnum,
					maxoff;
291
		bool		tupgone,
292 293
					hastup;
		int			prev_dead_count;
294 295
		OffsetNumber frozen[MaxOffsetNumber];
		int			nfrozen;
296
		Size		freespace;
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
		bool		all_visible_according_to_vm = false;
		bool		all_visible;

		/*
		 * Skip pages that don't require vacuuming according to the
		 * visibility map.
		 */
		if (!scan_all)
		{
			all_visible_according_to_vm =
				visibilitymap_test(onerel, blkno, &vmbuffer);
			if (all_visible_according_to_vm)
			{
				vacrelstats->scanned_all = false;
				continue;
			}
		}
314

315
		vacuum_delay_point();
J
Jan Wieck 已提交
316

317 318
		scanned_pages++;

319
		/*
B
Bruce Momjian 已提交
320 321
		 * If we are close to overrunning the available space for dead-tuple
		 * TIDs, pause and do a cycle of vacuuming before we tackle this page.
322
		 */
323
		if ((vacrelstats->max_dead_tuples - vacrelstats->num_dead_tuples) < MaxHeapTuplesPerPage &&
324 325 326 327
			vacrelstats->num_dead_tuples > 0)
		{
			/* Remove index entries */
			for (i = 0; i < nindexes; i++)
328
				lazy_vacuum_index(Irel[i],
329
								  &indstats[i],
330
								  vacrelstats);
331 332 333 334
			/* Remove tuples from heap */
			lazy_vacuum_heap(onerel, vacrelstats);
			/* Forget the now-vacuumed tuples, and press on */
			vacrelstats->num_dead_tuples = 0;
335
			vacrelstats->num_index_scans++;
336 337
		}

338 339
		buf = ReadBufferExtended(onerel, MAIN_FORKNUM, blkno,
								 RBM_NORMAL, vac_strategy);
340

341 342
		/* We need buffer cleanup lock so that we can prune HOT chains. */
		LockBufferForCleanup(buf);
343 344 345 346 347

		page = BufferGetPage(buf);

		if (PageIsNew(page))
		{
348
			/*
B
Bruce Momjian 已提交
349 350 351
			 * An all-zeroes page could be left over if a backend extends the
			 * relation but crashes before initializing the page. Reclaim such
			 * pages for use.
352
			 *
353 354 355
			 * We have to be careful here because we could be looking at a
			 * page that someone has just added to the relation and not yet
			 * been able to initialize (see RelationGetBufferForTuple). To
356
			 * protect against that, release the buffer lock, grab the
B
Bruce Momjian 已提交
357 358 359
			 * relation extension lock momentarily, and re-lock the buffer. If
			 * the page is still uninitialized by then, it must be left over
			 * from a crashed backend, and we can initialize it.
360
			 *
361 362 363
			 * We don't really need the relation lock when this is a new or
			 * temp relation, but it's probably not worth the code space to
			 * check that, since this surely isn't a critical path.
364
			 *
365 366
			 * Note: the comparable code in vacuum.c need not worry because
			 * it's got exclusive lock on the whole relation.
367
			 */
368
			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
369 370
			LockRelationForExtension(onerel, ExclusiveLock);
			UnlockRelationForExtension(onerel, ExclusiveLock);
371
			LockBufferForCleanup(buf);
372 373
			if (PageIsNew(page))
			{
374
				ereport(WARNING,
B
Bruce Momjian 已提交
375 376
				(errmsg("relation \"%s\" page %u is uninitialized --- fixing",
						relname, blkno)));
377
				PageInit(page, BufferGetPageSize(buf), 0);
378
				empty_pages++;
379
			}
380
			freespace = PageGetHeapFreeSpace(page);
381 382
			MarkBufferDirty(buf);
			UnlockReleaseBuffer(buf);
383 384

			RecordPageWithFreeSpace(onerel, blkno, freespace);
385 386 387 388 389 390
			continue;
		}

		if (PageIsEmpty(page))
		{
			empty_pages++;
391
			freespace = PageGetHeapFreeSpace(page);
392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411

			if (!PageIsAllVisible(page))
			{
				SetBufferCommitInfoNeedsSave(buf);
				PageSetAllVisible(page);
			}

			LockBuffer(buf, BUFFER_LOCK_UNLOCK);

			/* Update the visibility map */
			if (!all_visible_according_to_vm)
			{
				visibilitymap_pin(onerel, blkno, &vmbuffer);
				LockBuffer(buf, BUFFER_LOCK_SHARE);
				if (PageIsAllVisible(page))
					visibilitymap_set(onerel, blkno, PageGetLSN(page), &vmbuffer);
				LockBuffer(buf, BUFFER_LOCK_UNLOCK);
			}

			ReleaseBuffer(buf);
412
			RecordPageWithFreeSpace(onerel, blkno, freespace);
413 414 415
			continue;
		}

B
Bruce Momjian 已提交
416
		/*
417 418 419 420 421 422 423 424
		 * Prune all HOT-update chains in this page.
		 *
		 * We count tuples removed by the pruning step as removed by VACUUM.
		 */
		tups_vacuumed += heap_page_prune(onerel, buf, OldestXmin,
										 false, false);

		/*
B
Bruce Momjian 已提交
425 426
		 * Now scan the page to collect vacuumable items and check for tuples
		 * requiring freezing.
427
		 */
428
		all_visible = true;
429
		nfrozen = 0;
430 431 432 433 434 435 436 437 438 439 440
		hastup = false;
		prev_dead_count = vacrelstats->num_dead_tuples;
		maxoff = PageGetMaxOffsetNumber(page);
		for (offnum = FirstOffsetNumber;
			 offnum <= maxoff;
			 offnum = OffsetNumberNext(offnum))
		{
			ItemId		itemid;

			itemid = PageGetItemId(page, offnum);

441
			/* Unused items require no processing, but we count 'em */
442 443 444 445 446 447
			if (!ItemIdIsUsed(itemid))
			{
				nunused += 1;
				continue;
			}

448
			/* Redirect items mustn't be touched */
B
Bruce Momjian 已提交
449 450
			if (ItemIdIsRedirected(itemid))
			{
451
				hastup = true;	/* this page won't be truncatable */
B
Bruce Momjian 已提交
452 453
				continue;
			}
454

B
Bruce Momjian 已提交
455
			ItemPointerSet(&(tuple.t_self), blkno, offnum);
456 457 458

			/*
			 * DEAD item pointers are to be vacuumed normally; but we don't
B
Bruce Momjian 已提交
459 460 461
			 * count them in tups_vacuumed, else we'd be double-counting (at
			 * least in the common case where heap_page_prune() just freed up
			 * a non-HOT tuple).
462 463 464 465
			 */
			if (ItemIdIsDead(itemid))
			{
				lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
466
				all_visible = false;
467 468 469 470 471
				continue;
			}

			Assert(ItemIdIsNormal(itemid));

472 473 474 475 476
			tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
			tuple.t_len = ItemIdGetLength(itemid);

			tupgone = false;

477
			switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin, buf))
478 479
			{
				case HEAPTUPLE_DEAD:
B
Bruce Momjian 已提交
480

481 482 483 484 485 486 487 488 489
					/*
					 * Ordinarily, DEAD tuples would have been removed by
					 * heap_page_prune(), but it's possible that the tuple
					 * state changed since heap_page_prune() looked.  In
					 * particular an INSERT_IN_PROGRESS tuple could have
					 * changed to DEAD if the inserter aborted.  So this
					 * cannot be considered an error condition.
					 *
					 * If the tuple is HOT-updated then it must only be
B
Bruce Momjian 已提交
490 491 492 493 494
					 * removed by a prune operation; so we keep it just as if
					 * it were RECENTLY_DEAD.  Also, if it's a heap-only
					 * tuple, we choose to keep it, because it'll be a lot
					 * cheaper to get rid of it in the next pruning pass than
					 * to treat it like an indexed tuple.
495 496 497 498 499
					 */
					if (HeapTupleIsHotUpdated(&tuple) ||
						HeapTupleIsHeapOnly(&tuple))
						nkeep += 1;
					else
B
Bruce Momjian 已提交
500
						tupgone = true; /* we can delete the tuple */
501
					all_visible = false;
502 503
					break;
				case HEAPTUPLE_LIVE:
504
					/* Tuple is good --- but let's do some validity checks */
505 506 507 508
					if (onerel->rd_rel->relhasoids &&
						!OidIsValid(HeapTupleGetOid(&tuple)))
						elog(WARNING, "relation \"%s\" TID %u/%u: OID is invalid",
							 relname, blkno, offnum);
509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538

					/*
					 * Is the tuple definitely visible to all transactions?
					 *
					 * NB: Like with per-tuple hint bits, we can't set the
					 * PD_ALL_VISIBLE flag if the inserter committed
					 * asynchronously. See SetHintBits for more info. Check
					 * that the HEAP_XMIN_COMMITTED hint bit is set because of
					 * that.
					 */
					if (all_visible)
					{
						TransactionId xmin;

						if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
						{
							all_visible = false;
							break;
						}
						/*
						 * The inserter definitely committed. But is it
						 * old enough that everyone sees it as committed?
						 */
						xmin = HeapTupleHeaderGetXmin(tuple.t_data);
						if (!TransactionIdPrecedes(xmin, OldestXmin))
						{
							all_visible = false;
							break;
						}
					}
539 540
					break;
				case HEAPTUPLE_RECENTLY_DEAD:
541

542
					/*
B
Bruce Momjian 已提交
543 544
					 * If tuple is recently deleted then we must not remove it
					 * from relation.
545 546
					 */
					nkeep += 1;
547
					all_visible = false;
548 549 550
					break;
				case HEAPTUPLE_INSERT_IN_PROGRESS:
					/* This is an expected case during concurrent vacuum */
551
					all_visible = false;
552 553 554
					break;
				case HEAPTUPLE_DELETE_IN_PROGRESS:
					/* This is an expected case during concurrent vacuum */
555
					all_visible = false;
556 557
					break;
				default:
558
					elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
559 560 561 562 563 564 565 566 567 568 569 570
					break;
			}

			if (tupgone)
			{
				lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
				tups_vacuumed += 1;
			}
			else
			{
				num_tuples += 1;
				hastup = true;
571

B
Bruce Momjian 已提交
572
				/*
B
Bruce Momjian 已提交
573 574
				 * Each non-removable tuple must be checked to see if it needs
				 * freezing.  Note we already have exclusive buffer lock.
575
				 */
576
				if (heap_freeze_tuple(tuple.t_data, FreezeLimit,
577
									  InvalidBuffer))
578
					frozen[nfrozen++] = offnum;
579
			}
580
		}						/* scan along page */
581

582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601
		/*
		 * If we froze any tuples, mark the buffer dirty, and write a WAL
		 * record recording the changes.  We must log the changes to be
		 * crash-safe against future truncation of CLOG.
		 */
		if (nfrozen > 0)
		{
			MarkBufferDirty(buf);
			/* no XLOG for temp tables, though */
			if (!onerel->rd_istemp)
			{
				XLogRecPtr	recptr;

				recptr = log_heap_freeze(onerel, buf, FreezeLimit,
										 frozen, nfrozen);
				PageSetLSN(page, recptr);
				PageSetTLI(page, ThisTimeLineID);
			}
		}

602 603 604 605 606 607 608 609 610 611 612 613 614 615
		/*
		 * If there are no indexes then we can vacuum the page right now
		 * instead of doing a second scan.
		 */
		if (nindexes == 0 &&
			vacrelstats->num_dead_tuples > 0)
		{
			/* Remove tuples from heap */
			lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats);
			/* Forget the now-vacuumed tuples, and press on */
			vacrelstats->num_dead_tuples = 0;
			vacuumed_pages++;
		}

616 617
		freespace = PageGetHeapFreeSpace(page);

618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651
		/* Update the all-visible flag on the page */
		if (!PageIsAllVisible(page) && all_visible)
		{
			SetBufferCommitInfoNeedsSave(buf);
			PageSetAllVisible(page);
		}
		else if (PageIsAllVisible(page) && !all_visible)
		{
			elog(WARNING, "PD_ALL_VISIBLE flag was incorrectly set");
			SetBufferCommitInfoNeedsSave(buf);
			PageClearAllVisible(page);

			/*
			 * Normally, we would drop the lock on the heap page before
			 * updating the visibility map, but since this is a can't-happen
			 * case anyway, don't bother.
			 */
			visibilitymap_clear(onerel, blkno);
		}

		LockBuffer(buf, BUFFER_LOCK_UNLOCK);

		/* Update the visibility map */
		if (!all_visible_according_to_vm && all_visible)
		{
			visibilitymap_pin(onerel, blkno, &vmbuffer);
			LockBuffer(buf, BUFFER_LOCK_SHARE);
			if (PageIsAllVisible(page))
				visibilitymap_set(onerel, blkno, PageGetLSN(page), &vmbuffer);
			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
		}

		ReleaseBuffer(buf);

652 653 654 655
		/* Remember the location of the last page with nonremovable tuples */
		if (hastup)
			vacrelstats->nonempty_pages = blkno + 1;

656
		/*
657
		 * If we remembered any tuples for deletion, then the page will be
B
Bruce Momjian 已提交
658 659
		 * visited again by lazy_vacuum_heap, which will compute and record
		 * its post-compaction free space.	If not, then we're done with this
B
Bruce Momjian 已提交
660 661
		 * page, so remember its free space as-is.	(This path will always be
		 * taken if there are no indexes.)
662 663
		 */
		if (vacrelstats->num_dead_tuples == prev_dead_count)
664
			RecordPageWithFreeSpace(onerel, blkno, freespace);
665 666
	}

667 668
	/* save stats for use later */
	vacrelstats->rel_tuples = num_tuples;
669
	vacrelstats->tuples_deleted = tups_vacuumed;
670

671
	/* If any tuples need to be deleted, perform final vacuum cycle */
672
	/* XXX put a threshold on min number of tuples here? */
673 674 675 676
	if (vacrelstats->num_dead_tuples > 0)
	{
		/* Remove index entries */
		for (i = 0; i < nindexes; i++)
677
			lazy_vacuum_index(Irel[i],
678
							  &indstats[i],
679
							  vacrelstats);
680 681
		/* Remove tuples from heap */
		lazy_vacuum_heap(onerel, vacrelstats);
682
		vacrelstats->num_index_scans++;
683
	}
684

685 686 687 688 689 690 691
	/* Release the pin on the visibility map page */
	if (BufferIsValid(vmbuffer))
	{
		ReleaseBuffer(vmbuffer);
		vmbuffer = InvalidBuffer;
	}

692 693 694
	/* Do post-vacuum cleanup and statistics update for each index */
	for (i = 0; i < nindexes; i++)
		lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);
695

696 697 698 699 700 701 702
	/* If no indexes, make log report that lazy_vacuum_heap would've made */
	if (vacuumed_pages)
		ereport(elevel,
				(errmsg("\"%s\": removed %.0f row versions in %u pages",
						RelationGetRelationName(onerel),
						tups_vacuumed, vacuumed_pages)));

703
	ereport(elevel,
704
			(errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u out of %u pages",
705
					RelationGetRelationName(onerel),
706
					tups_vacuumed, num_tuples, scanned_pages, nblocks),
707
			 errdetail("%.0f dead row versions cannot be removed yet.\n"
708 709
					   "There were %.0f unused item pointers.\n"
					   "%u pages are entirely empty.\n"
710
					   "%s.",
711 712 713
					   nkeep,
					   nunused,
					   empty_pages,
714
					   pg_rusage_show(&ru0))));
715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733
}


/*
 *	lazy_vacuum_heap() -- second pass over the heap
 *
 *		This routine marks dead tuples as unused and compacts out free
 *		space on their pages.  Pages not having dead tuples recorded from
 *		lazy_scan_heap are not visited at all.
 *
 * Note: the reason for doing this as a second pass is we cannot remove
 * the tuples until we've removed their index entries, and we want to
 * process index entry removal in batches as large as possible.
 */
static void
lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
{
	int			tupindex;
	int			npages;
734
	PGRUsage	ru0;
735

736
	pg_rusage_init(&ru0);
737 738 739 740 741
	npages = 0;

	tupindex = 0;
	while (tupindex < vacrelstats->num_dead_tuples)
	{
742
		BlockNumber tblk;
743 744
		Buffer		buf;
		Page		page;
745
		Size		freespace;
746

747
		vacuum_delay_point();
J
Jan Wieck 已提交
748

749
		tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
750 751
		buf = ReadBufferExtended(onerel, MAIN_FORKNUM, tblk, RBM_NORMAL,
								 vac_strategy);
752 753
		LockBufferForCleanup(buf);
		tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats);
754

755 756
		/* Now that we've compacted the page, record its available space */
		page = BufferGetPage(buf);
757 758
		freespace = PageGetHeapFreeSpace(page);

759
		UnlockReleaseBuffer(buf);
760
		RecordPageWithFreeSpace(onerel, tblk, freespace);
761 762 763
		npages++;
	}

764
	ereport(elevel,
765
			(errmsg("\"%s\": removed %d row versions in %d pages",
766 767
					RelationGetRelationName(onerel),
					tupindex, npages),
768 769
			 errdetail("%s.",
					   pg_rusage_show(&ru0))));
770 771 772 773 774 775
}

/*
 *	lazy_vacuum_page() -- free dead tuples on a page
 *					 and repair its fragmentation.
 *
776
 * Caller must hold pin and buffer cleanup lock on the buffer.
777 778 779 780 781 782 783 784 785 786
 *
 * tupindex is the index in vacrelstats->dead_tuples of the first dead
 * tuple for this page.  We assume the rest follow sequentially.
 * The return value is the first tupindex after the tuples of this page.
 */
static int
lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
				 int tupindex, LVRelStats *vacrelstats)
{
	Page		page = BufferGetPage(buffer);
787 788
	OffsetNumber unused[MaxOffsetNumber];
	int			uncnt = 0;
789 790

	START_CRIT_SECTION();
791

792 793
	for (; tupindex < vacrelstats->num_dead_tuples; tupindex++)
	{
794 795
		BlockNumber tblk;
		OffsetNumber toff;
796
		ItemId		itemid;
797 798 799 800 801 802

		tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
		if (tblk != blkno)
			break;				/* past end of tuples for this block */
		toff = ItemPointerGetOffsetNumber(&vacrelstats->dead_tuples[tupindex]);
		itemid = PageGetItemId(page, toff);
803
		ItemIdSetUnused(itemid);
804
		unused[uncnt++] = toff;
805 806
	}

807
	PageRepairFragmentation(page);
808

809 810
	MarkBufferDirty(buffer);

811 812
	/* XLOG stuff */
	if (!onerel->rd_istemp)
813 814 815
	{
		XLogRecPtr	recptr;

816 817 818 819
		recptr = log_heap_clean(onerel, buffer,
								NULL, 0, NULL, 0,
								unused, uncnt,
								false);
820
		PageSetLSN(page, recptr);
821
		PageSetTLI(page, ThisTimeLineID);
822
	}
823

824 825 826 827 828
	END_CRIT_SECTION();

	return tupindex;
}

829
/*
830
 *	lazy_vacuum_index() -- vacuum one index relation.
831
 *
832 833
 *		Delete all the index entries pointing to tuples listed in
 *		vacrelstats->dead_tuples, and update running statistics.
834 835
 */
static void
836 837 838
lazy_vacuum_index(Relation indrel,
				  IndexBulkDeleteResult **stats,
				  LVRelStats *vacrelstats)
839
{
840
	IndexVacuumInfo ivinfo;
841
	PGRUsage	ru0;
842

843
	pg_rusage_init(&ru0);
844

845 846 847 848 849
	ivinfo.index = indrel;
	ivinfo.vacuum_full = false;
	ivinfo.message_level = elevel;
	/* We don't yet know rel_tuples, so pass -1 */
	ivinfo.num_heap_tuples = -1;
850
	ivinfo.strategy = vac_strategy;
851

852 853 854
	/* Do bulk deletion */
	*stats = index_bulk_delete(&ivinfo, *stats,
							   lazy_tid_reaped, (void *) vacrelstats);
855

856
	ereport(elevel,
857
			(errmsg("scanned index \"%s\" to remove %d row versions",
B
Bruce Momjian 已提交
858
					RelationGetRelationName(indrel),
859 860
					vacrelstats->num_dead_tuples),
			 errdetail("%s.", pg_rusage_show(&ru0))));
861 862
}

863
/*
864
 *	lazy_cleanup_index() -- do post-vacuum cleanup for one index relation.
865 866
 */
static void
867 868 869
lazy_cleanup_index(Relation indrel,
				   IndexBulkDeleteResult *stats,
				   LVRelStats *vacrelstats)
870
{
871
	IndexVacuumInfo ivinfo;
872
	PGRUsage	ru0;
873

874
	pg_rusage_init(&ru0);
875

876 877 878 879
	ivinfo.index = indrel;
	ivinfo.vacuum_full = false;
	ivinfo.message_level = elevel;
	ivinfo.num_heap_tuples = vacrelstats->rel_tuples;
880
	ivinfo.strategy = vac_strategy;
881

882
	stats = index_vacuum_cleanup(&ivinfo, stats);
883 884 885 886

	if (!stats)
		return;

887
	/* now update statistics in pg_class */
888 889
	vac_update_relstats(indrel,
						stats->num_pages, stats->num_index_tuples,
890
						false, InvalidTransactionId);
891

892
	ereport(elevel,
B
Bruce Momjian 已提交
893 894 895 896 897 898 899 900 901 902
			(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
					RelationGetRelationName(indrel),
					stats->num_index_tuples,
					stats->num_pages),
			 errdetail("%.0f index row versions were removed.\n"
			 "%u index pages have been deleted, %u are currently reusable.\n"
					   "%s.",
					   stats->tuples_removed,
					   stats->pages_deleted, stats->pages_free,
					   pg_rusage_show(&ru0))));
903

904
	pfree(stats);
905 906 907 908 909 910
}

/*
 * lazy_truncate_heap - try to truncate off any empty pages at the end
 */
static void
911
lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats)
912
{
913 914
	BlockNumber old_rel_pages = vacrelstats->rel_pages;
	BlockNumber new_rel_pages;
915
	PGRUsage	ru0;
916

917
	pg_rusage_init(&ru0);
918 919

	/*
B
Bruce Momjian 已提交
920 921 922 923
	 * We need full exclusive lock on the relation in order to do truncation.
	 * If we can't get it, give up rather than waiting --- we don't want to
	 * block other backends, and we don't want to deadlock (which is quite
	 * possible considering we already hold a lower-grade lock).
924
	 */
925
	if (!ConditionalLockRelation(onerel, AccessExclusiveLock))
926 927 928 929
		return;

	/*
	 * Now that we have exclusive lock, look to see if the rel has grown
B
Bruce Momjian 已提交
930 931
	 * whilst we were vacuuming with non-exclusive lock.  If so, give up; the
	 * newly added pages presumably contain non-deletable tuples.
932 933 934 935 936 937 938 939 940 941 942 943
	 */
	new_rel_pages = RelationGetNumberOfBlocks(onerel);
	if (new_rel_pages != old_rel_pages)
	{
		/* might as well use the latest news when we update pg_class stats */
		vacrelstats->rel_pages = new_rel_pages;
		UnlockRelation(onerel, AccessExclusiveLock);
		return;
	}

	/*
	 * Scan backwards from the end to verify that the end pages actually
944 945 946
	 * contain no tuples.  This is *necessary*, not optional, because other
	 * backends could have added tuples to these pages whilst we were
	 * vacuuming.
947
	 */
948
	new_rel_pages = count_nondeletable_pages(onerel, vacrelstats);
949 950 951 952 953 954 955 956 957 958 959

	if (new_rel_pages >= old_rel_pages)
	{
		/* can't do anything after all */
		UnlockRelation(onerel, AccessExclusiveLock);
		return;
	}

	/*
	 * Okay to truncate.
	 */
960
	RelationTruncate(onerel, new_rel_pages);
961

962
	/*
B
Bruce Momjian 已提交
963 964 965 966 967
	 * Note: once we have truncated, we *must* keep the exclusive lock until
	 * commit.	The sinval message that will be sent at commit (as a result of
	 * vac_update_relstats()) must be received by other backends, to cause
	 * them to reset their rd_targblock values, before they can safely access
	 * the table again.
968
	 */
969

970 971 972 973
	/* update statistics */
	vacrelstats->rel_pages = new_rel_pages;
	vacrelstats->pages_removed = old_rel_pages - new_rel_pages;

974 975 976 977
	ereport(elevel,
			(errmsg("\"%s\": truncated %u to %u pages",
					RelationGetRelationName(onerel),
					old_rel_pages, new_rel_pages),
978 979
			 errdetail("%s.",
					   pg_rusage_show(&ru0))));
980 981 982
}

/*
983
 * Rescan end pages to verify that they are (still) empty of tuples.
984 985 986 987
 *
 * Returns number of nondeletable pages (last nonempty page + 1).
 */
static BlockNumber
988
count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
989 990 991 992 993 994 995 996 997 998 999
{
	BlockNumber blkno;

	/* Strange coding of loop control is needed because blkno is unsigned */
	blkno = vacrelstats->rel_pages;
	while (blkno > vacrelstats->nonempty_pages)
	{
		Buffer		buf;
		Page		page;
		OffsetNumber offnum,
					maxoff;
1000
		bool		hastup;
1001

1002 1003
		/*
		 * We don't insert a vacuum delay point here, because we have an
B
Bruce Momjian 已提交
1004 1005
		 * exclusive lock on the table which we want to hold for as short a
		 * time as possible.  We still need to check for interrupts however.
1006
		 */
1007
		CHECK_FOR_INTERRUPTS();
J
Jan Wieck 已提交
1008

1009 1010
		blkno--;

1011 1012
		buf = ReadBufferExtended(onerel, MAIN_FORKNUM, blkno,
								 RBM_NORMAL, vac_strategy);
1013 1014 1015 1016 1017 1018 1019 1020

		/* In this phase we only need shared access to the buffer */
		LockBuffer(buf, BUFFER_LOCK_SHARE);

		page = BufferGetPage(buf);

		if (PageIsNew(page) || PageIsEmpty(page))
		{
1021
			/* PageIsNew probably shouldn't happen... */
1022
			UnlockReleaseBuffer(buf);
1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035
			continue;
		}

		hastup = false;
		maxoff = PageGetMaxOffsetNumber(page);
		for (offnum = FirstOffsetNumber;
			 offnum <= maxoff;
			 offnum = OffsetNumberNext(offnum))
		{
			ItemId		itemid;

			itemid = PageGetItemId(page, offnum);

1036 1037 1038 1039 1040 1041 1042
			/*
			 * Note: any non-unused item should be taken as a reason to keep
			 * this page.  We formerly thought that DEAD tuples could be
			 * thrown away, but that's not so, because we'd not have cleaned
			 * out their index entries.
			 */
			if (ItemIdIsUsed(itemid))
1043 1044 1045 1046
			{
				hastup = true;
				break;			/* can stop scanning */
			}
1047
		}						/* scan along page */
1048

1049
		UnlockReleaseBuffer(buf);
1050 1051 1052 1053 1054 1055 1056 1057

		/* Done scanning if we found a tuple here */
		if (hastup)
			return blkno + 1;
	}

	/*
	 * If we fall out of the loop, all the previously-thought-to-be-empty
1058
	 * pages still are; we need not bother to look at the last known-nonempty
B
Bruce Momjian 已提交
1059
	 * page.
1060 1061 1062 1063 1064 1065 1066 1067 1068 1069
	 */
	return vacrelstats->nonempty_pages;
}

/*
 * lazy_space_alloc - space allocation decisions for lazy vacuum
 *
 * See the comments at the head of this file for rationale.
 */
static void
1070
lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
1071
{
1072
	long		maxtuples;
1073

1074 1075
	if (vacrelstats->hasindex)
	{
1076 1077 1078
		maxtuples = (maintenance_work_mem * 1024L) / sizeof(ItemPointerData);
		maxtuples = Min(maxtuples, INT_MAX);
		maxtuples = Min(maxtuples, MaxAllocSize / sizeof(ItemPointerData));
1079 1080 1081 1082 1083

		/* curious coding here to ensure the multiplication can't overflow */
		if ((BlockNumber) (maxtuples / LAZY_ALLOC_TUPLES) > relblocks)
			maxtuples = relblocks * LAZY_ALLOC_TUPLES;

1084 1085
		/* stay sane if small maintenance_work_mem */
		maxtuples = Max(maxtuples, MaxHeapTuplesPerPage);
1086 1087 1088
	}
	else
	{
1089 1090
		maxtuples = MaxHeapTuplesPerPage;
	}
1091 1092

	vacrelstats->num_dead_tuples = 0;
1093
	vacrelstats->max_dead_tuples = (int) maxtuples;
1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105
	vacrelstats->dead_tuples = (ItemPointer)
		palloc(maxtuples * sizeof(ItemPointerData));
}

/*
 * lazy_record_dead_tuple - remember one deletable tuple
 */
static void
lazy_record_dead_tuple(LVRelStats *vacrelstats,
					   ItemPointer itemptr)
{
	/*
1106
	 * The array shouldn't overflow under normal behavior, but perhaps it
1107
	 * could if we are given a really small maintenance_work_mem. In that
1108
	 * case, just forget the last few tuples (we'll get 'em next time).
1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119
	 */
	if (vacrelstats->num_dead_tuples < vacrelstats->max_dead_tuples)
	{
		vacrelstats->dead_tuples[vacrelstats->num_dead_tuples] = *itemptr;
		vacrelstats->num_dead_tuples++;
	}
}

/*
 *	lazy_tid_reaped() -- is a particular tid deletable?
 *
1120 1121
 *		This has the right signature to be an IndexBulkDeleteCallback.
 *
1122 1123 1124
 *		Assumes dead_tuples array is in sorted order.
 */
static bool
1125
lazy_tid_reaped(ItemPointer itemptr, void *state)
1126
{
1127
	LVRelStats *vacrelstats = (LVRelStats *) state;
1128
	ItemPointer res;
1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167

	res = (ItemPointer) bsearch((void *) itemptr,
								(void *) vacrelstats->dead_tuples,
								vacrelstats->num_dead_tuples,
								sizeof(ItemPointerData),
								vac_cmp_itemptr);

	return (res != NULL);
}

/*
 * Comparator routines for use with qsort() and bsearch().
 */
static int
vac_cmp_itemptr(const void *left, const void *right)
{
	BlockNumber lblk,
				rblk;
	OffsetNumber loff,
				roff;

	lblk = ItemPointerGetBlockNumber((ItemPointer) left);
	rblk = ItemPointerGetBlockNumber((ItemPointer) right);

	if (lblk < rblk)
		return -1;
	if (lblk > rblk)
		return 1;

	loff = ItemPointerGetOffsetNumber((ItemPointer) left);
	roff = ItemPointerGetOffsetNumber((ItemPointer) right);

	if (loff < roff)
		return -1;
	if (loff > roff)
		return 1;

	return 0;
}