vacuumlazy.c 33.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12
/*-------------------------------------------------------------------------
 *
 * vacuumlazy.c
 *	  Concurrent ("lazy") vacuuming.
 *
 *
 * The major space usage for LAZY VACUUM is storage for the array of dead
 * tuple TIDs, with the next biggest need being storage for per-disk-page
 * free space info.  We want to ensure we can vacuum even the very largest
 * relations with finite memory space usage.  To do that, we set upper bounds
 * on the number of tuples and pages we will keep track of at once.
 *
13 14 15 16 17
 * We are willing to use at most maintenance_work_mem memory space to keep
 * track of dead tuples.  We initially allocate an array of TIDs of that size.
 * If the array threatens to overflow, we suspend the heap scan phase and
 * perform a pass of index cleanup and page compaction, then resume the heap
 * scan with an empty TID array.
18 19 20
 *
 * We can limit the storage for page free space to MaxFSMPages entries,
 * since that's the most the free space map will be willing to remember
21 22
 * anyway.	If the relation has fewer than that many pages with free space,
 * life is easy: just build an array of per-page info.	If it has more,
23 24 25 26 27
 * we store the free space info as a heap ordered by amount of free space,
 * so that we can discard the pages with least free space to ensure we never
 * have more than MaxFSMPages entries in all.  The surviving page entries
 * are passed to the free space map at conclusion of the scan.
 *
28 29 30 31 32
 * If we're processing a table with no indexes, we can just vacuum each page
 * as we go; there's no need to save up multiple tuples to minimize the number
 * of index scans performed.  So we don't use maintenance_work_mem memory for
 * the TID array, just enough to hold as many heap tuples as fit on one page.
 *
33
 *
34
 * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
35 36 37 38
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
39
 *	  $PostgreSQL: pgsql/src/backend/commands/vacuumlazy.c,v 1.78 2006/09/13 17:47:08 tgl Exp $
40 41 42 43 44
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

45 46
#include <math.h>

47 48
#include "access/genam.h"
#include "access/heapam.h"
49
#include "access/transam.h"
50 51
#include "commands/vacuum.h"
#include "miscadmin.h"
52
#include "pgstat.h"
53
#include "storage/freespace.h"
54
#include "utils/lsyscache.h"
55
#include "utils/memutils.h"
56
#include "utils/pg_rusage.h"
57 58 59 60 61 62


/*
 * Space/time tradeoff parameters: do these need to be user-tunable?
 *
 * To consider truncating the relation, we want there to be at least
63 64
 * REL_TRUNCATE_MINIMUM or (relsize / REL_TRUNCATE_FRACTION) (whichever
 * is less) potentially-freeable pages.
65
 */
66
#define REL_TRUNCATE_MINIMUM	1000
67 68 69 70 71
#define REL_TRUNCATE_FRACTION	16


typedef struct LVRelStats
{
72 73
	/* hasindex = true means two-pass strategy; false means one-pass */
	bool		hasindex;
74
	/* Overall statistics about rel */
75
	BlockNumber rel_pages;
76
	double		rel_tuples;
B
Bruce Momjian 已提交
77
	BlockNumber pages_removed;
78
	double		tuples_deleted;
79
	BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
80
	Size		threshold;		/* minimum interesting free space */
81
	TransactionId minxid;		/* minimum Xid present anywhere in table */
82 83
	/* List of TIDs of tuples we intend to delete */
	/* NB: this list is ordered by TID address */
84 85
	int			num_dead_tuples;	/* current # of entries */
	int			max_dead_tuples;	/* # slots allocated in array */
86
	ItemPointer dead_tuples;	/* array of ItemPointerData */
87 88
	/* Array or heap of per-page info about free space */
	/* We use a simple array until it fills up, then convert to heap */
89 90
	bool		fs_is_heap;		/* are we using heap organization? */
	int			num_free_pages; /* current # of entries */
91
	int			max_free_pages; /* # slots allocated in array */
B
Bruce Momjian 已提交
92
	PageFreeSpaceInfo *free_pages;		/* array or heap of blkno/avail */
93 94 95
} LVRelStats;


B
Bruce Momjian 已提交
96
static int	elevel = -1;
97 98 99 100


/* non-export function prototypes */
static void lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
101 102
			   Relation *Irel, int nindexes, TransactionId FreezeLimit,
			   TransactionId OldestXmin);
103
static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
104
static void lazy_vacuum_index(Relation indrel,
105 106 107 108 109
							  IndexBulkDeleteResult **stats,
							  LVRelStats *vacrelstats);
static void lazy_cleanup_index(Relation indrel,
							   IndexBulkDeleteResult *stats,
							   LVRelStats *vacrelstats);
110 111
static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
				 int tupindex, LVRelStats *vacrelstats);
112 113
static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats,
							   TransactionId OldestXmin);
114
static BlockNumber count_nondeletable_pages(Relation onerel,
115
						 LVRelStats *vacrelstats, TransactionId OldestXmin);
116
static void lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks);
117
static void lazy_record_dead_tuple(LVRelStats *vacrelstats,
118
					   ItemPointer itemptr);
119
static void lazy_record_free_space(LVRelStats *vacrelstats,
120
					   BlockNumber page, Size avail);
121
static bool lazy_tid_reaped(ItemPointer itemptr, void *state);
122 123
static void lazy_update_fsm(Relation onerel, LVRelStats *vacrelstats);
static int	vac_cmp_itemptr(const void *left, const void *right);
124
static int	vac_cmp_page_spaces(const void *left, const void *right);
125 126 127 128 129 130


/*
 *	lazy_vacuum_rel() -- perform LAZY VACUUM for one heap relation
 *
 *		This routine vacuums a single heap, cleans out its indexes, and
131 132
 *		updates its relpages and reltuples statistics, as well as the
 *		relminxid and relvacuumxid information.
133 134 135 136 137 138 139 140 141 142
 *
 *		At entry, we have already established a transaction and opened
 *		and locked the relation.
 */
void
lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
{
	LVRelStats *vacrelstats;
	Relation   *Irel;
	int			nindexes;
143
	BlockNumber possibly_freeable;
144 145
	TransactionId OldestXmin,
				  FreezeLimit;
146 147

	if (vacstmt->verbose)
148
		elevel = INFO;
149
	else
150
		elevel = DEBUG2;
B
Bruce Momjian 已提交
151

152 153
	vacuum_set_xid_limits(vacstmt, onerel->rd_rel->relisshared,
						  &OldestXmin, &FreezeLimit);
154

155
	vacrelstats = (LVRelStats *) palloc0(sizeof(LVRelStats));
156

157 158 159 160
	/* Set threshold for interesting free space = average request size */
	/* XXX should we scale it up or down?  Adjust vacuum.c too, if so */
	vacrelstats->threshold = GetAvgFSMRequestSize(&onerel->rd_node);

161 162 163 164 165 166 167 168 169 170 171
	/*
	 * Set initial minimum Xid, which will be updated if a smaller Xid is found
	 * in the relation by lazy_scan_heap.
	 *
	 * We use RecentXmin here (the minimum Xid that belongs to a transaction
	 * that is still open according to our snapshot), because it is the
	 * earliest transaction that could concurrently insert new tuples in the
	 * table.
	 */
	vacrelstats->minxid = RecentXmin;

172
	/* Open all indexes of the relation */
173
	vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel);
174
	vacrelstats->hasindex = (nindexes > 0);
175 176

	/* Do the vacuuming */
177
	lazy_scan_heap(onerel, vacrelstats, Irel, nindexes, FreezeLimit, OldestXmin);
178 179

	/* Done with indexes */
180
	vac_close_indexes(nindexes, Irel, NoLock);
181 182 183 184

	/*
	 * Optionally truncate the relation.
	 *
185 186
	 * Don't even think about it unless we have a shot at releasing a goodly
	 * number of pages.  Otherwise, the time taken isn't worth it.
187 188
	 */
	possibly_freeable = vacrelstats->rel_pages - vacrelstats->nonempty_pages;
189
	if (possibly_freeable >= REL_TRUNCATE_MINIMUM ||
B
Bruce Momjian 已提交
190
		possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION)
191
		lazy_truncate_heap(onerel, vacrelstats, OldestXmin);
192 193 194 195 196

	/* Update shared free space map with final free space info */
	lazy_update_fsm(onerel, vacrelstats);

	/* Update statistics in pg_class */
197 198 199
	vac_update_relstats(RelationGetRelid(onerel),
						vacrelstats->rel_pages,
						vacrelstats->rel_tuples,
200
						vacrelstats->hasindex,
201
						vacrelstats->minxid, OldestXmin);
202 203

	/* report results to the stats collector, too */
204
	pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
B
Bruce Momjian 已提交
205
						 vacstmt->analyze, vacrelstats->rel_tuples);
206 207 208 209 210 211 212 213 214
}


/*
 *	lazy_scan_heap() -- scan an open heap relation
 *
 *		This routine sets commit status bits, builds lists of dead tuples
 *		and pages with free space, and calculates statistics on the number
 *		of live tuples in the heap.  When done, or when we run low on space
215
 *		for dead-tuple TIDs, invoke vacuuming of indexes and heap.
216 217 218
 *
 *		It also updates the minimum Xid found anywhere on the table in
 *		vacrelstats->minxid, for later storing it in pg_class.relminxid.
219 220 221
 *
 *		If there are no indexes then we just vacuum each dirty page as we
 *		process it, since there's no point in gathering many tuples.
222 223 224
 */
static void
lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
225 226
			   Relation *Irel, int nindexes, TransactionId FreezeLimit,
			   TransactionId OldestXmin)
227 228 229 230 231
{
	BlockNumber nblocks,
				blkno;
	HeapTupleData tuple;
	char	   *relname;
232 233
	BlockNumber empty_pages,
				vacuumed_pages;
234 235 236 237
	double		num_tuples,
				tups_vacuumed,
				nkeep,
				nunused;
238
	IndexBulkDeleteResult **indstats;
239
	int			i;
240
	PGRUsage	ru0;
241

242
	pg_rusage_init(&ru0);
243 244

	relname = RelationGetRelationName(onerel);
245 246 247 248
	ereport(elevel,
			(errmsg("vacuuming \"%s.%s\"",
					get_namespace_name(RelationGetNamespace(onerel)),
					relname)));
249

250
	empty_pages = vacuumed_pages = 0;
251 252
	num_tuples = tups_vacuumed = nkeep = nunused = 0;

253 254
	indstats = (IndexBulkDeleteResult **)
		palloc0(nindexes * sizeof(IndexBulkDeleteResult *));
255

256 257 258 259
	nblocks = RelationGetNumberOfBlocks(onerel);
	vacrelstats->rel_pages = nblocks;
	vacrelstats->nonempty_pages = 0;

260
	lazy_space_alloc(vacrelstats, nblocks);
261 262 263 264 265 266 267 268 269 270 271 272

	for (blkno = 0; blkno < nblocks; blkno++)
	{
		Buffer		buf;
		Page		page;
		OffsetNumber offnum,
					maxoff;
		bool		pgchanged,
					tupgone,
					hastup;
		int			prev_dead_count;

273
		vacuum_delay_point();
J
Jan Wieck 已提交
274

275
		/*
B
Bruce Momjian 已提交
276 277
		 * If we are close to overrunning the available space for dead-tuple
		 * TIDs, pause and do a cycle of vacuuming before we tackle this page.
278
		 */
279
		if ((vacrelstats->max_dead_tuples - vacrelstats->num_dead_tuples) < MaxHeapTuplesPerPage &&
280 281 282 283
			vacrelstats->num_dead_tuples > 0)
		{
			/* Remove index entries */
			for (i = 0; i < nindexes; i++)
284
				lazy_vacuum_index(Irel[i],
285
								  &indstats[i],
286
								  vacrelstats);
287 288 289 290 291 292 293 294
			/* Remove tuples from heap */
			lazy_vacuum_heap(onerel, vacrelstats);
			/* Forget the now-vacuumed tuples, and press on */
			vacrelstats->num_dead_tuples = 0;
		}

		buf = ReadBuffer(onerel, blkno);

295 296
		/* In this phase we only need shared access to the buffer */
		LockBuffer(buf, BUFFER_LOCK_SHARE);
297 298 299 300 301

		page = BufferGetPage(buf);

		if (PageIsNew(page))
		{
302
			/*
B
Bruce Momjian 已提交
303 304 305
			 * An all-zeroes page could be left over if a backend extends the
			 * relation but crashes before initializing the page. Reclaim such
			 * pages for use.
306
			 *
307 308 309
			 * We have to be careful here because we could be looking at a
			 * page that someone has just added to the relation and not yet
			 * been able to initialize (see RelationGetBufferForTuple). To
B
Bruce Momjian 已提交
310 311 312 313 314
			 * interlock against that, release the buffer read lock (which we
			 * must do anyway) and grab the relation extension lock before
			 * re-locking in exclusive mode.  If the page is still
			 * uninitialized by then, it must be left over from a crashed
			 * backend, and we can initialize it.
315
			 *
316 317 318
			 * We don't really need the relation lock when this is a new or
			 * temp relation, but it's probably not worth the code space to
			 * check that, since this surely isn't a critical path.
319
			 *
320 321
			 * Note: the comparable code in vacuum.c need not worry because
			 * it's got exclusive lock on the whole relation.
322
			 */
323
			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
324 325
			LockRelationForExtension(onerel, ExclusiveLock);
			UnlockRelationForExtension(onerel, ExclusiveLock);
326 327 328
			LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
			if (PageIsNew(page))
			{
329
				ereport(WARNING,
B
Bruce Momjian 已提交
330 331
				(errmsg("relation \"%s\" page %u is uninitialized --- fixing",
						relname, blkno)));
332
				PageInit(page, BufferGetPageSize(buf), 0);
333
				empty_pages++;
334 335 336
				lazy_record_free_space(vacrelstats, blkno,
									   PageGetFreeSpace(page));
			}
337 338
			MarkBufferDirty(buf);
			UnlockReleaseBuffer(buf);
339 340 341 342 343 344 345 346
			continue;
		}

		if (PageIsEmpty(page))
		{
			empty_pages++;
			lazy_record_free_space(vacrelstats, blkno,
								   PageGetFreeSpace(page));
347
			UnlockReleaseBuffer(buf);
348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374
			continue;
		}

		pgchanged = false;
		hastup = false;
		prev_dead_count = vacrelstats->num_dead_tuples;
		maxoff = PageGetMaxOffsetNumber(page);
		for (offnum = FirstOffsetNumber;
			 offnum <= maxoff;
			 offnum = OffsetNumberNext(offnum))
		{
			ItemId		itemid;

			itemid = PageGetItemId(page, offnum);

			if (!ItemIdIsUsed(itemid))
			{
				nunused += 1;
				continue;
			}

			tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
			tuple.t_len = ItemIdGetLength(itemid);
			ItemPointerSet(&(tuple.t_self), blkno, offnum);

			tupgone = false;

375
			switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin, buf))
376 377
			{
				case HEAPTUPLE_DEAD:
378
					tupgone = true;		/* we can delete the tuple */
379 380
					break;
				case HEAPTUPLE_LIVE:
381

382
					/*
B
Bruce Momjian 已提交
383 384
					 * Tuple is good.  Consider whether to replace its xmin
					 * value with FrozenTransactionId.
T
Tom Lane 已提交
385
					 *
386 387 388 389 390 391
					 * NB: Since we hold only a shared buffer lock here, we
					 * are assuming that TransactionId read/write is atomic.
					 * This is not the only place that makes such an
					 * assumption. It'd be possible to avoid the assumption by
					 * momentarily acquiring exclusive lock, but for the
					 * moment I see no need to.
392
					 */
393 394
					if (TransactionIdIsNormal(HeapTupleHeaderGetXmin(tuple.t_data)) &&
						TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
395 396
											  FreezeLimit))
					{
397
						HeapTupleHeaderSetXmin(tuple.t_data, FrozenTransactionId);
T
Tom Lane 已提交
398 399
						/* infomask should be okay already */
						Assert(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED);
400 401
						pgchanged = true;
					}
402 403 404 405 406 407 408 409

					/*
					 * Other checks...
					 */
					if (onerel->rd_rel->relhasoids &&
						!OidIsValid(HeapTupleGetOid(&tuple)))
						elog(WARNING, "relation \"%s\" TID %u/%u: OID is invalid",
							 relname, blkno, offnum);
410 411
					break;
				case HEAPTUPLE_RECENTLY_DEAD:
412

413
					/*
B
Bruce Momjian 已提交
414 415
					 * If tuple is recently deleted then we must not remove it
					 * from relation.
416 417 418 419 420 421 422 423 424 425
					 */
					nkeep += 1;
					break;
				case HEAPTUPLE_INSERT_IN_PROGRESS:
					/* This is an expected case during concurrent vacuum */
					break;
				case HEAPTUPLE_DELETE_IN_PROGRESS:
					/* This is an expected case during concurrent vacuum */
					break;
				default:
426
					elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
427 428 429 430 431 432 433 434 435 436
					break;
			}

			if (tupgone)
			{
				lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
				tups_vacuumed += 1;
			}
			else
			{
437 438
				TransactionId min;

439 440
				num_tuples += 1;
				hastup = true;
441 442 443 444 445 446 447 448 449

				/* 
				 * If the tuple is alive, we consider it for the "minxid"
				 * calculations.
				 */
				min = vactuple_get_minxid(&tuple);
				if (TransactionIdIsValid(min) &&
					TransactionIdPrecedes(min, vacrelstats->minxid))
					vacrelstats->minxid = min;
450
			}
451
		}						/* scan along page */
452

453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469
		/*
		 * If there are no indexes then we can vacuum the page right now
		 * instead of doing a second scan.
		 */
		if (nindexes == 0 &&
			vacrelstats->num_dead_tuples > 0)
		{
			/* Trade in buffer share lock for super-exclusive lock */
			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
			LockBufferForCleanup(buf);
			/* Remove tuples from heap */
			lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats);
			/* Forget the now-vacuumed tuples, and press on */
			vacrelstats->num_dead_tuples = 0;
			vacuumed_pages++;
		}

470
		/*
471
		 * If we remembered any tuples for deletion, then the page will be
B
Bruce Momjian 已提交
472 473
		 * visited again by lazy_vacuum_heap, which will compute and record
		 * its post-compaction free space.	If not, then we're done with this
474 475
		 * page, so remember its free space as-is.  (This path will always
		 * be taken if there are no indexes.)
476 477 478 479 480 481 482 483 484 485 486 487
		 */
		if (vacrelstats->num_dead_tuples == prev_dead_count)
		{
			lazy_record_free_space(vacrelstats, blkno,
								   PageGetFreeSpace(page));
		}

		/* Remember the location of the last page with nonremovable tuples */
		if (hastup)
			vacrelstats->nonempty_pages = blkno + 1;

		if (pgchanged)
488 489
			MarkBufferDirty(buf);
		UnlockReleaseBuffer(buf);
490 491
	}

492 493
	/* save stats for use later */
	vacrelstats->rel_tuples = num_tuples;
494
	vacrelstats->tuples_deleted = tups_vacuumed;
495

496
	/* If any tuples need to be deleted, perform final vacuum cycle */
497
	/* XXX put a threshold on min number of tuples here? */
498 499 500 501
	if (vacrelstats->num_dead_tuples > 0)
	{
		/* Remove index entries */
		for (i = 0; i < nindexes; i++)
502
			lazy_vacuum_index(Irel[i],
503
							  &indstats[i],
504
							  vacrelstats);
505 506 507
		/* Remove tuples from heap */
		lazy_vacuum_heap(onerel, vacrelstats);
	}
508 509 510 511

	/* Do post-vacuum cleanup and statistics update for each index */
	for (i = 0; i < nindexes; i++)
		lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);
512

513 514 515 516 517 518 519
	/* If no indexes, make log report that lazy_vacuum_heap would've made */
	if (vacuumed_pages)
		ereport(elevel,
				(errmsg("\"%s\": removed %.0f row versions in %u pages",
						RelationGetRelationName(onerel),
						tups_vacuumed, vacuumed_pages)));

520
	ereport(elevel,
521
			(errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u pages",
522 523
					RelationGetRelationName(onerel),
					tups_vacuumed, num_tuples, nblocks),
524
			 errdetail("%.0f dead row versions cannot be removed yet.\n"
525 526
					   "There were %.0f unused item pointers.\n"
					   "%u pages are entirely empty.\n"
527
					   "%s.",
528 529 530
					   nkeep,
					   nunused,
					   empty_pages,
531
					   pg_rusage_show(&ru0))));
532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550
}


/*
 *	lazy_vacuum_heap() -- second pass over the heap
 *
 *		This routine marks dead tuples as unused and compacts out free
 *		space on their pages.  Pages not having dead tuples recorded from
 *		lazy_scan_heap are not visited at all.
 *
 * Note: the reason for doing this as a second pass is we cannot remove
 * the tuples until we've removed their index entries, and we want to
 * process index entry removal in batches as large as possible.
 */
static void
lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
{
	int			tupindex;
	int			npages;
551
	PGRUsage	ru0;
552

553
	pg_rusage_init(&ru0);
554 555 556 557 558
	npages = 0;

	tupindex = 0;
	while (tupindex < vacrelstats->num_dead_tuples)
	{
559
		BlockNumber tblk;
560 561 562
		Buffer		buf;
		Page		page;

563
		vacuum_delay_point();
J
Jan Wieck 已提交
564

565 566 567 568 569 570 571 572
		tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
		buf = ReadBuffer(onerel, tblk);
		LockBufferForCleanup(buf);
		tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats);
		/* Now that we've compacted the page, record its available space */
		page = BufferGetPage(buf);
		lazy_record_free_space(vacrelstats, tblk,
							   PageGetFreeSpace(page));
573
		UnlockReleaseBuffer(buf);
574 575 576
		npages++;
	}

577
	ereport(elevel,
578
			(errmsg("\"%s\": removed %d row versions in %d pages",
579 580
					RelationGetRelationName(onerel),
					tupindex, npages),
581 582
			 errdetail("%s.",
					   pg_rusage_show(&ru0))));
583 584 585 586 587 588
}

/*
 *	lazy_vacuum_page() -- free dead tuples on a page
 *					 and repair its fragmentation.
 *
589
 * Caller must hold pin and lock on the buffer.
590 591 592 593 594 595 596 597 598
 *
 * tupindex is the index in vacrelstats->dead_tuples of the first dead
 * tuple for this page.  We assume the rest follow sequentially.
 * The return value is the first tupindex after the tuples of this page.
 */
static int
lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
				 int tupindex, LVRelStats *vacrelstats)
{
599
	OffsetNumber unused[MaxOffsetNumber];
600 601 602 603 604
	int			uncnt;
	Page		page = BufferGetPage(buffer);
	ItemId		itemid;

	START_CRIT_SECTION();
605

606 607
	for (; tupindex < vacrelstats->num_dead_tuples; tupindex++)
	{
608 609
		BlockNumber tblk;
		OffsetNumber toff;
610 611 612 613 614 615 616 617 618 619 620

		tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
		if (tblk != blkno)
			break;				/* past end of tuples for this block */
		toff = ItemPointerGetOffsetNumber(&vacrelstats->dead_tuples[tupindex]);
		itemid = PageGetItemId(page, toff);
		itemid->lp_flags &= ~LP_USED;
	}

	uncnt = PageRepairFragmentation(page, unused);

621 622
	MarkBufferDirty(buffer);

623 624
	/* XLOG stuff */
	if (!onerel->rd_istemp)
625 626 627
	{
		XLogRecPtr	recptr;

628
		recptr = log_heap_clean(onerel, buffer, unused, uncnt);
629
		PageSetLSN(page, recptr);
630
		PageSetTLI(page, ThisTimeLineID);
631
	}
632 633 634 635 636 637
	else
	{
		/* No XLOG record, but still need to flag that XID exists on disk */
		MyXactMadeTempRelUpdate = true;
	}

638 639 640 641 642
	END_CRIT_SECTION();

	return tupindex;
}

643
/*
644
 *	lazy_vacuum_index() -- vacuum one index relation.
645
 *
646 647
 *		Delete all the index entries pointing to tuples listed in
 *		vacrelstats->dead_tuples, and update running statistics.
648 649
 */
static void
650 651 652
lazy_vacuum_index(Relation indrel,
				  IndexBulkDeleteResult **stats,
				  LVRelStats *vacrelstats)
653
{
654
	IndexVacuumInfo ivinfo;
655
	PGRUsage	ru0;
656

657
	pg_rusage_init(&ru0);
658

659 660 661 662 663
	ivinfo.index = indrel;
	ivinfo.vacuum_full = false;
	ivinfo.message_level = elevel;
	/* We don't yet know rel_tuples, so pass -1 */
	ivinfo.num_heap_tuples = -1;
664

665 666 667
	/* Do bulk deletion */
	*stats = index_bulk_delete(&ivinfo, *stats,
							   lazy_tid_reaped, (void *) vacrelstats);
668

669
	ereport(elevel,
670
			(errmsg("scanned index \"%s\" to remove %d row versions",
B
Bruce Momjian 已提交
671
					RelationGetRelationName(indrel),
672 673
					vacrelstats->num_dead_tuples),
			 errdetail("%s.", pg_rusage_show(&ru0))));
674 675
}

676
/*
677
 *	lazy_cleanup_index() -- do post-vacuum cleanup for one index relation.
678 679
 */
static void
680 681 682
lazy_cleanup_index(Relation indrel,
				   IndexBulkDeleteResult *stats,
				   LVRelStats *vacrelstats)
683
{
684
	IndexVacuumInfo ivinfo;
685
	PGRUsage	ru0;
686

687
	pg_rusage_init(&ru0);
688

689 690 691 692
	ivinfo.index = indrel;
	ivinfo.vacuum_full = false;
	ivinfo.message_level = elevel;
	ivinfo.num_heap_tuples = vacrelstats->rel_tuples;
693

694
	stats = index_vacuum_cleanup(&ivinfo, stats);
695 696 697 698

	if (!stats)
		return;

699
	/* now update statistics in pg_class */
700 701 702
	vac_update_relstats(RelationGetRelid(indrel),
						stats->num_pages,
						stats->num_index_tuples,
703
						false, InvalidTransactionId, InvalidTransactionId);
704

705
	ereport(elevel,
B
Bruce Momjian 已提交
706 707 708 709 710 711 712 713 714 715
			(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
					RelationGetRelationName(indrel),
					stats->num_index_tuples,
					stats->num_pages),
			 errdetail("%.0f index row versions were removed.\n"
			 "%u index pages have been deleted, %u are currently reusable.\n"
					   "%s.",
					   stats->tuples_removed,
					   stats->pages_deleted, stats->pages_free,
					   pg_rusage_show(&ru0))));
716

717
	pfree(stats);
718 719 720 721 722 723
}

/*
 * lazy_truncate_heap - try to truncate off any empty pages at the end
 */
static void
724 725
lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats,
				   TransactionId OldestXmin)
726
{
727 728
	BlockNumber old_rel_pages = vacrelstats->rel_pages;
	BlockNumber new_rel_pages;
729
	PageFreeSpaceInfo *pageSpaces;
730 731 732
	int			n;
	int			i,
				j;
733
	PGRUsage	ru0;
734

735
	pg_rusage_init(&ru0);
736 737

	/*
B
Bruce Momjian 已提交
738 739 740 741
	 * We need full exclusive lock on the relation in order to do truncation.
	 * If we can't get it, give up rather than waiting --- we don't want to
	 * block other backends, and we don't want to deadlock (which is quite
	 * possible considering we already hold a lower-grade lock).
742
	 */
743
	if (!ConditionalLockRelation(onerel, AccessExclusiveLock))
744 745 746 747
		return;

	/*
	 * Now that we have exclusive lock, look to see if the rel has grown
B
Bruce Momjian 已提交
748 749
	 * whilst we were vacuuming with non-exclusive lock.  If so, give up; the
	 * newly added pages presumably contain non-deletable tuples.
750 751 752 753 754 755 756 757 758 759 760 761
	 */
	new_rel_pages = RelationGetNumberOfBlocks(onerel);
	if (new_rel_pages != old_rel_pages)
	{
		/* might as well use the latest news when we update pg_class stats */
		vacrelstats->rel_pages = new_rel_pages;
		UnlockRelation(onerel, AccessExclusiveLock);
		return;
	}

	/*
	 * Scan backwards from the end to verify that the end pages actually
B
Bruce Momjian 已提交
762 763 764
	 * contain nothing we need to keep.  This is *necessary*, not optional,
	 * because other backends could have added tuples to these pages whilst we
	 * were vacuuming.
765
	 */
766
	new_rel_pages = count_nondeletable_pages(onerel, vacrelstats, OldestXmin);
767 768 769 770 771 772 773 774 775 776 777

	if (new_rel_pages >= old_rel_pages)
	{
		/* can't do anything after all */
		UnlockRelation(onerel, AccessExclusiveLock);
		return;
	}

	/*
	 * Okay to truncate.
	 */
778
	RelationTruncate(onerel, new_rel_pages);
779 780 781 782 783

	/*
	 * Drop free-space info for removed blocks; these must not get entered
	 * into the FSM!
	 */
784
	pageSpaces = vacrelstats->free_pages;
785 786 787 788
	n = vacrelstats->num_free_pages;
	j = 0;
	for (i = 0; i < n; i++)
	{
789
		if (pageSpaces[i].blkno < new_rel_pages)
790
		{
791
			pageSpaces[j] = pageSpaces[i];
792 793 794 795
			j++;
		}
	}
	vacrelstats->num_free_pages = j;
796 797
	/* We destroyed the heap ordering, so mark array unordered */
	vacrelstats->fs_is_heap = false;
798

799 800 801 802
	/* update statistics */
	vacrelstats->rel_pages = new_rel_pages;
	vacrelstats->pages_removed = old_rel_pages - new_rel_pages;

803 804 805 806
	/*
	 * We keep the exclusive lock until commit (perhaps not necessary)?
	 */

807 808 809 810
	ereport(elevel,
			(errmsg("\"%s\": truncated %u to %u pages",
					RelationGetRelationName(onerel),
					old_rel_pages, new_rel_pages),
811 812
			 errdetail("%s.",
					   pg_rusage_show(&ru0))));
813 814 815 816 817 818 819 820
}

/*
 * Rescan end pages to verify that they are (still) empty of needed tuples.
 *
 * Returns number of nondeletable pages (last nonempty page + 1).
 */
static BlockNumber
821 822
count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats,
						 TransactionId OldestXmin)
823 824 825 826 827 828 829 830 831 832 833 834
{
	BlockNumber blkno;
	HeapTupleData tuple;

	/* Strange coding of loop control is needed because blkno is unsigned */
	blkno = vacrelstats->rel_pages;
	while (blkno > vacrelstats->nonempty_pages)
	{
		Buffer		buf;
		Page		page;
		OffsetNumber offnum,
					maxoff;
835
		bool		tupgone,
836 837
					hastup;

838
		vacuum_delay_point();
J
Jan Wieck 已提交
839

840 841 842 843 844 845 846 847 848 849 850
		blkno--;

		buf = ReadBuffer(onerel, blkno);

		/* In this phase we only need shared access to the buffer */
		LockBuffer(buf, BUFFER_LOCK_SHARE);

		page = BufferGetPage(buf);

		if (PageIsNew(page) || PageIsEmpty(page))
		{
851
			/* PageIsNew probably shouldn't happen... */
852
			UnlockReleaseBuffer(buf);
853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874
			continue;
		}

		hastup = false;
		maxoff = PageGetMaxOffsetNumber(page);
		for (offnum = FirstOffsetNumber;
			 offnum <= maxoff;
			 offnum = OffsetNumberNext(offnum))
		{
			ItemId		itemid;

			itemid = PageGetItemId(page, offnum);

			if (!ItemIdIsUsed(itemid))
				continue;

			tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
			tuple.t_len = ItemIdGetLength(itemid);
			ItemPointerSet(&(tuple.t_self), blkno, offnum);

			tupgone = false;

875
			switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin, buf))
876 877
			{
				case HEAPTUPLE_DEAD:
878
					tupgone = true;		/* we can delete the tuple */
879 880
					break;
				case HEAPTUPLE_LIVE:
881
					/* Shouldn't be necessary to re-freeze anything */
882 883
					break;
				case HEAPTUPLE_RECENTLY_DEAD:
884

885
					/*
B
Bruce Momjian 已提交
886 887
					 * If tuple is recently deleted then we must not remove it
					 * from relation.
888 889 890 891 892 893 894 895 896
					 */
					break;
				case HEAPTUPLE_INSERT_IN_PROGRESS:
					/* This is an expected case during concurrent vacuum */
					break;
				case HEAPTUPLE_DELETE_IN_PROGRESS:
					/* This is an expected case during concurrent vacuum */
					break;
				default:
897
					elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
898 899 900 901 902 903 904 905
					break;
			}

			if (!tupgone)
			{
				hastup = true;
				break;			/* can stop scanning */
			}
906
		}						/* scan along page */
907

908
		UnlockReleaseBuffer(buf);
909 910 911 912 913 914 915 916

		/* Done scanning if we found a tuple here */
		if (hastup)
			return blkno + 1;
	}

	/*
	 * If we fall out of the loop, all the previously-thought-to-be-empty
B
Bruce Momjian 已提交
917 918
	 * pages really are; we need not bother to look at the last known-nonempty
	 * page.
919 920 921 922 923 924 925 926 927 928
	 */
	return vacrelstats->nonempty_pages;
}

/*
 * lazy_space_alloc - space allocation decisions for lazy vacuum
 *
 * See the comments at the head of this file for rationale.
 */
static void
929
lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
930
{
931
	long		maxtuples;
932 933
	int			maxpages;

934 935
	if (vacrelstats->hasindex)
	{
936 937 938 939 940
		maxtuples = (maintenance_work_mem * 1024L) / sizeof(ItemPointerData);
		maxtuples = Min(maxtuples, INT_MAX);
		maxtuples = Min(maxtuples, MaxAllocSize / sizeof(ItemPointerData));
		/* stay sane if small maintenance_work_mem */
		maxtuples = Max(maxtuples, MaxHeapTuplesPerPage);
941 942 943
	}
	else
	{
944 945
		maxtuples = MaxHeapTuplesPerPage;
	}
946 947

	vacrelstats->num_dead_tuples = 0;
948
	vacrelstats->max_dead_tuples = (int) maxtuples;
949 950 951 952
	vacrelstats->dead_tuples = (ItemPointer)
		palloc(maxtuples * sizeof(ItemPointerData));

	maxpages = MaxFSMPages;
953
	maxpages = Min(maxpages, MaxAllocSize / sizeof(PageFreeSpaceInfo));
954 955 956 957 958 959 960
	/* No need to allocate more pages than the relation has blocks */
	if (relblocks < (BlockNumber) maxpages)
		maxpages = (int) relblocks;

	vacrelstats->fs_is_heap = false;
	vacrelstats->num_free_pages = 0;
	vacrelstats->max_free_pages = maxpages;
961 962
	vacrelstats->free_pages = (PageFreeSpaceInfo *)
		palloc(maxpages * sizeof(PageFreeSpaceInfo));
963 964 965 966 967 968 969 970 971 972
}

/*
 * lazy_record_dead_tuple - remember one deletable tuple
 */
static void
lazy_record_dead_tuple(LVRelStats *vacrelstats,
					   ItemPointer itemptr)
{
	/*
973
	 * The array shouldn't overflow under normal behavior, but perhaps it
974 975
	 * could if we are given a really small maintenance_work_mem. In that
	 * case, just forget the last few tuples.
976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991
	 */
	if (vacrelstats->num_dead_tuples < vacrelstats->max_dead_tuples)
	{
		vacrelstats->dead_tuples[vacrelstats->num_dead_tuples] = *itemptr;
		vacrelstats->num_dead_tuples++;
	}
}

/*
 * lazy_record_free_space - remember free space on one page
 */
static void
lazy_record_free_space(LVRelStats *vacrelstats,
					   BlockNumber page,
					   Size avail)
{
992
	PageFreeSpaceInfo *pageSpaces;
993 994
	int			n;

995 996 997
	/*
	 * A page with less than stats->threshold free space will be forgotten
	 * immediately, and never passed to the free space map.  Removing the
B
Bruce Momjian 已提交
998 999 1000 1001
	 * uselessly small entries early saves cycles, and in particular reduces
	 * the amount of time we spend holding the FSM lock when we finally call
	 * RecordRelationFreeSpace.  Since the FSM will probably drop pages with
	 * little free space anyway, there's no point in making this really small.
1002
	 *
B
Bruce Momjian 已提交
1003 1004 1005 1006 1007
	 * XXX Is it worth trying to measure average tuple size, and using that to
	 * adjust the threshold?  Would be worthwhile if FSM has no stats yet for
	 * this relation.  But changing the threshold as we scan the rel might
	 * lead to bizarre behavior, too.  Also, it's probably better if vacuum.c
	 * has the same thresholding behavior as we do here.
1008 1009
	 */
	if (avail < vacrelstats->threshold)
1010 1011 1012
		return;

	/* Copy pointers to local variables for notational simplicity */
1013
	pageSpaces = vacrelstats->free_pages;
1014 1015 1016 1017 1018
	n = vacrelstats->max_free_pages;

	/* If we haven't filled the array yet, just keep adding entries */
	if (vacrelstats->num_free_pages < n)
	{
1019 1020
		pageSpaces[vacrelstats->num_free_pages].blkno = page;
		pageSpaces[vacrelstats->num_free_pages].avail = avail;
1021 1022 1023 1024 1025 1026 1027
		vacrelstats->num_free_pages++;
		return;
	}

	/*----------
	 * The rest of this routine works with "heap" organization of the
	 * free space arrays, wherein we maintain the heap property
B
Bruce Momjian 已提交
1028
	 *			avail[(j-1) div 2] <= avail[j]	for 0 < j < n.
1029 1030 1031 1032 1033 1034 1035 1036
	 * In particular, the zero'th element always has the smallest available
	 * space and can be discarded to make room for a new page with more space.
	 * See Knuth's discussion of heap-based priority queues, sec 5.2.3;
	 * but note he uses 1-origin array subscripts, not 0-origin.
	 *----------
	 */

	/* If we haven't yet converted the array to heap organization, do it */
1037
	if (!vacrelstats->fs_is_heap)
1038 1039 1040
	{
		/*
		 * Scan backwards through the array, "sift-up" each value into its
B
Bruce Momjian 已提交
1041 1042
		 * correct position.  We can start the scan at n/2-1 since each entry
		 * above that position has no children to worry about.
1043
		 */
1044
		int			l = n / 2;
1045 1046 1047

		while (--l >= 0)
		{
1048 1049
			BlockNumber R = pageSpaces[l].blkno;
			Size		K = pageSpaces[l].avail;
1050 1051 1052 1053 1054
			int			i;		/* i is where the "hole" is */

			i = l;
			for (;;)
			{
1055
				int			j = 2 * i + 1;
1056 1057 1058

				if (j >= n)
					break;
1059
				if (j + 1 < n && pageSpaces[j].avail > pageSpaces[j + 1].avail)
1060
					j++;
1061
				if (K <= pageSpaces[j].avail)
1062
					break;
1063
				pageSpaces[i] = pageSpaces[j];
1064 1065
				i = j;
			}
1066 1067
			pageSpaces[i].blkno = R;
			pageSpaces[i].avail = K;
1068 1069 1070 1071 1072 1073
		}

		vacrelstats->fs_is_heap = true;
	}

	/* If new page has more than zero'th entry, insert it into heap */
1074
	if (avail > pageSpaces[0].avail)
1075 1076
	{
		/*
1077
		 * Notionally, we replace the zero'th entry with the new data, and
B
Bruce Momjian 已提交
1078 1079 1080
		 * then sift-up to maintain the heap property.	Physically, the new
		 * data doesn't get stored into the arrays until we find the right
		 * location for it.
1081
		 */
1082
		int			i = 0;		/* i is where the "hole" is */
1083 1084 1085

		for (;;)
		{
1086
			int			j = 2 * i + 1;
1087 1088 1089

			if (j >= n)
				break;
1090
			if (j + 1 < n && pageSpaces[j].avail > pageSpaces[j + 1].avail)
1091
				j++;
1092
			if (avail <= pageSpaces[j].avail)
1093
				break;
1094
			pageSpaces[i] = pageSpaces[j];
1095 1096
			i = j;
		}
1097 1098
		pageSpaces[i].blkno = page;
		pageSpaces[i].avail = avail;
1099 1100 1101 1102 1103 1104
	}
}

/*
 *	lazy_tid_reaped() -- is a particular tid deletable?
 *
1105 1106
 *		This has the right signature to be an IndexBulkDeleteCallback.
 *
1107 1108 1109
 *		Assumes dead_tuples array is in sorted order.
 */
static bool
1110
lazy_tid_reaped(ItemPointer itemptr, void *state)
1111
{
1112
	LVRelStats *vacrelstats = (LVRelStats *) state;
1113
	ItemPointer res;
1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130

	res = (ItemPointer) bsearch((void *) itemptr,
								(void *) vacrelstats->dead_tuples,
								vacrelstats->num_dead_tuples,
								sizeof(ItemPointerData),
								vac_cmp_itemptr);

	return (res != NULL);
}

/*
 * Update the shared Free Space Map with the info we now have about
 * free space in the relation, discarding any old info the map may have.
 */
static void
lazy_update_fsm(Relation onerel, LVRelStats *vacrelstats)
{
1131 1132 1133
	PageFreeSpaceInfo *pageSpaces = vacrelstats->free_pages;
	int			nPages = vacrelstats->num_free_pages;

1134
	/*
1135
	 * Sort data into order, as required by RecordRelationFreeSpace.
1136
	 */
1137 1138 1139 1140
	if (nPages > 1)
		qsort(pageSpaces, nPages, sizeof(PageFreeSpaceInfo),
			  vac_cmp_page_spaces);

1141
	RecordRelationFreeSpace(&onerel->rd_node, nPages, pageSpaces);
1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172
}

/*
 * Comparator routines for use with qsort() and bsearch().
 */
static int
vac_cmp_itemptr(const void *left, const void *right)
{
	BlockNumber lblk,
				rblk;
	OffsetNumber loff,
				roff;

	lblk = ItemPointerGetBlockNumber((ItemPointer) left);
	rblk = ItemPointerGetBlockNumber((ItemPointer) right);

	if (lblk < rblk)
		return -1;
	if (lblk > rblk)
		return 1;

	loff = ItemPointerGetOffsetNumber((ItemPointer) left);
	roff = ItemPointerGetOffsetNumber((ItemPointer) right);

	if (loff < roff)
		return -1;
	if (loff > roff)
		return 1;

	return 0;
}
1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185

static int
vac_cmp_page_spaces(const void *left, const void *right)
{
	PageFreeSpaceInfo *linfo = (PageFreeSpaceInfo *) left;
	PageFreeSpaceInfo *rinfo = (PageFreeSpaceInfo *) right;

	if (linfo->blkno < rinfo->blkno)
		return -1;
	else if (linfo->blkno > rinfo->blkno)
		return 1;
	return 0;
}